Storm中如何進(jìn)行Librato的Metric度量的實(shí)現(xiàn),很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。

成都網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)公司!專(zhuān)注于網(wǎng)頁(yè)設(shè)計(jì)、成都網(wǎng)站建設(shè)、微信開(kāi)發(fā)、微信小程序開(kāi)發(fā)、集團(tuán)成都定制網(wǎng)站等服務(wù)項(xiàng)目。核心團(tuán)隊(duì)均擁有互聯(lián)網(wǎng)行業(yè)多年經(jīng)驗(yàn),服務(wù)眾多知名企業(yè)客戶(hù);涵蓋的客戶(hù)類(lèi)型包括:玻璃貼膜等眾多領(lǐng)域,積累了大量豐富的經(jīng)驗(yàn),同時(shí)也獲得了客戶(hù)的一致認(rèn)可!
輻射性質(zhì)介紹一個(gè)Librato的Metric度量的實(shí)現(xiàn)
package com.digitalpebble.storm.crawler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IErrorReporter;
import backtype.storm.task.TopologyContext;
import com.librato.metrics.HttpPoster;
import com.librato.metrics.HttpPoster.Response;
import com.librato.metrics.LibratoBatch;
import com.librato.metrics.NingHttpPoster;
import com.librato.metrics.Sanitizer;
import com.librato.metrics.Versions;
/** Sends the metrics to Librato **/
public class LibratoMetricsConsumer implements IMetricsConsumer {
public static final int DEFAULT_BATCH_SIZE = 500;
private static final Logger LOG = LoggerFactory
.getLogger(LibratoMetricsConsumer.class);
private static final String LIB_VERSION = Versions.getVersion(
"META-INF/maven/com.librato.metrics/librato-java/pom.properties",
LibratoBatch.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final Sanitizer sanitizer = new Sanitizer() {
public String apply(String name) {
return Sanitizer.LAST_PASS.apply(name);
}
};
private int postBatchSize = DEFAULT_BATCH_SIZE;
private long timeout = 30;
private final TimeUnit timeoutUnit = TimeUnit.SECONDS;
private String userAgent = null;
private HttpPoster httpPoster;
private Set<String> metricsToKeep = new HashSet<String>();
public void prepare(Map stormConf, Object registrationArgument,
TopologyContext context, IErrorReporter errorReporter) {
// TODO configure timeouts
// this.timeout = timeout;
// this.timeoutUnit = timeoutUnit;
// this.postBatchSize = postBatchSize;
String agentIdentifier = (String) stormConf.get("librato.agent");
if (agentIdentifier == null)
agentIdentifier = "storm";
String token = (String) stormConf.get("librato.token");
String username = (String) stormConf.get("librato.username");
String apiUrl = (String) stormConf.get("librato.api.url");
if (apiUrl == null)
apiUrl = "https://metrics-api.librato.com/v1/metrics";
// check that the values are not null
if (StringUtils.isBlank(token))
throw new RuntimeException("librato.token not set");
if (StringUtils.isBlank(username))
throw new RuntimeException("librato.username not set");
this.userAgent = String.format("%s librato-java/%s", agentIdentifier,
LIB_VERSION);
this.httpPoster = NingHttpPoster.newPoster(username, token, apiUrl);
// get the list of metrics names to keep if any
String metrics2keep = (String) stormConf.get("librato.metrics.to.keep");
if (metrics2keep != null) {
String[] mets = metrics2keep.split(",");
for (String m : mets)
metricsToKeep.add(m.trim().toLowerCase());
}
}
// post(String source, long epoch)
public void handleDataPoints(TaskInfo taskInfo,
Collection<DataPoint> dataPoints) {
final Map<String, Object> payloadMap = new HashMap<String, Object>();
payloadMap.put("source", taskInfo.srcComponentId + "_"
+ taskInfo.srcWorkerHost + "_" + taskInfo.srcTaskId);
payloadMap.put("measure_time", taskInfo.timestamp);
final List<Map<String, Object>> gaugeData = new ArrayList<Map<String, Object>>();
final List<Map<String, Object>> counterData = new ArrayList<Map<String, Object>>();
int counter = 0;
final Iterator<DataPoint> datapointsIterator = dataPoints.iterator();
while (datapointsIterator.hasNext()) {
final DataPoint dataPoint = datapointsIterator.next();
// ignore datapoint with a value which is not a map
if (!(dataPoint.value instanceof Map))
continue;
// a counter or a gauge
// convention if its name contains '_counter'
// then treat it as a counter
boolean isCounter = false;
if (dataPoint.name.contains("_counter")) {
isCounter = true;
dataPoint.name = dataPoint.name.replaceFirst("_counter", "");
}
if (!metricsToKeep.isEmpty()) {
if (!metricsToKeep.contains(dataPoint.name.toLowerCase())) {
continue;
}
}
try {
Map<String, Number> metric = (Map<String, Number>) dataPoint.value;
for (Map.Entry<String, Number> entry : metric.entrySet()) {
String metricId = entry.getKey();
Number val = entry.getValue();
final Map<String, Object> data = new HashMap<String, Object>();
data.put("name",
sanitizer.apply(dataPoint.name + "_" + metricId));
data.put("value", val);
if (isCounter)
counterData.add(data);
else
// use as gauge
gaugeData.add(data);
counter++;
if (counter % postBatchSize == 0
|| (!datapointsIterator.hasNext() && (!counterData
.isEmpty() || !gaugeData.isEmpty()))) {
final String countersKey = "counters";
final String gaugesKey = "gauges";
payloadMap.put(countersKey, counterData);
payloadMap.put(gaugesKey, gaugeData);
postPortion(payloadMap);
payloadMap.remove(gaugesKey);
payloadMap.remove(countersKey);
gaugeData.clear();
counterData.clear();
}
}
} catch (RuntimeException e) {
LOG.error(e.getMessage());
}
}
LOG.debug("Posted {} measurements", counter);
}
public void cleanup() {
}
private void postPortion(Map<String, Object> chunk) {
try {
final String payload = OBJECT_MAPPER.writeValueAsString(chunk);
final Future<Response> future = httpPoster.post(userAgent, payload);
final Response response = future.get(timeout, timeoutUnit);
final int statusCode = response.getStatusCode();
if (statusCode < 200 || statusCode >= 300) {
LOG.error(
"Received an error from Librato API. Code : {}, Message: {}",
statusCode, response.getBody());
}
} catch (Exception e) {
LOG.error("Unable to post to Librato API", e);
}
}
}看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對(duì)創(chuàng)新互聯(lián)的支持。
當(dāng)前文章:Storm中如何進(jìn)行Librato的Metric度量的實(shí)現(xiàn)
網(wǎng)頁(yè)網(wǎng)址:http://chinadenli.net/article8/gegiip.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供手機(jī)網(wǎng)站建設(shè)、品牌網(wǎng)站設(shè)計(jì)、網(wǎng)站收錄、軟件開(kāi)發(fā)、外貿(mào)建站、ChatGPT
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)