這篇文章主要講解了“Storm排序怎么實(shí)現(xiàn)”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Storm排序怎么實(shí)現(xiàn)”吧!

創(chuàng)新互聯(lián)公司2013年開創(chuàng)至今,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目成都網(wǎng)站制作、網(wǎng)站設(shè)計(jì)網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個(gè)夢(mèng)想脫穎而出為使命,1280元潤(rùn)州做網(wǎng)站,已為上家服務(wù),為潤(rùn)州各地企業(yè)和個(gè)人服務(wù),聯(lián)系電話:18980820575
閱讀背景:
1 : 您需要對(duì)滑動(dòng)窗口要初步了解
2 : 您需要了解滑動(dòng)窗口在滑動(dòng)的過程之中,滑動(dòng)chunk的計(jì)算過程,尤其是每發(fā)射一次,就需要清空一次。
package com.cc.storm.bolt;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
/**
* 1 在這里我們需要去實(shí)現(xiàn)一個(gè)滑動(dòng)窗口,請(qǐng)注意,在我們實(shí)現(xiàn)滑動(dòng)窗口的過程之中清空的是當(dāng)前滑動(dòng)窗口的下一個(gè)
*
*
*
* @author Yin Shuai
*
*/
public class RollingCountBolt implements IRichBolt {
private static final long serialVersionUID = 1765379339552134320L;
private HashMap<Object, long[]> _objectCounts = new HashMap<Object, long[]>();
private int _numBuckets;
private transient Thread cleaner;
private OutputCollector _collector;
/**
* _trackMinute
* 是我們整個(gè)滑動(dòng)窗口的大小,滑動(dòng)窗口的大小,本質(zhì)上決定了我們的時(shí)間區(qū)間,也就是說,假設(shè)我們目前滑動(dòng)窗口的總體大小為15分鐘。
* 那我們的商品點(diǎn)擊的實(shí)時(shí)排序的指標(biāo)值,好比商品瀏覽量的計(jì)算值,也就是15分鐘
*
* 而單個(gè)窗口的大小也就是我,我們這個(gè)三十分鐘在隨著時(shí)間不斷的在推移
*
* 舉例說明:在最初的構(gòu)造過程之中,如果我們的桶的數(shù)目為10,那么單個(gè)窗口的時(shí)間長(zhǎng)度為3.
*
* [0,30],[3,33],[6,36],[9,39],[12,42] 統(tǒng)計(jì)的數(shù)值處在不斷的變化之中
*
*/
private int _trackMinutes;
public RollingCountBolt(int numBuckets, int trackMinutes) {
this._numBuckets = numBuckets;
this._trackMinutes = trackMinutes;
}
public long totalObjects(Object obj) {
long[] curr = _objectCounts.get(obj);
long total = 0;
for (long l : curr) {
total += l;
}
return total;
}
public int currentBucket(int buckets) {
return currentSecond() / secondsPerBucket(buckets) % buckets;
}
public int currentSecond() {
return (int) (System.currentTimeMillis() / 1000);
}
/**
*
* @param buckets
* 你設(shè)定的桶的數(shù)量
* @return 依據(jù)我們默認(rèn)的_trackMinutes / buckets 得到每一個(gè)桶的數(shù)量
*/
public int secondsPerBucket(int buckets) {
return _trackMinutes * 60 / buckets;
}
public long millisPerBucket(int buckets) {
return (long) 1000 * secondsPerBucket(buckets);
}
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
// TODO Auto-generated method stub
_collector = collector;
cleaner = new Thread(new Runnable() {
@SuppressWarnings("unchecked")
@Override
public void run() {
// TODO Auto-generated method stub
int lastBucket = currentBucket(_numBuckets);
while (true) {
int currBucket = currentBucket(_numBuckets);
p("線程while循環(huán): 當(dāng)前的桶為:" + currBucket);
if (currBucket != lastBucket) {
p("線程while循環(huán):之前的桶數(shù)為:" + lastBucket);
int bucketToWipe = (currBucket + 1) % _numBuckets;
p("線程while循環(huán):要擦除掉的桶為:" + bucketToWipe);
synchronized (_objectCounts) {
Set objs = new HashSet(_objectCounts.keySet());
for (Object obj : objs) {
long[] counts = _objectCounts.get(obj);
long currBucketVal = counts[bucketToWipe];
p("線程while循環(huán):擦除掉的值為:" + currBucketVal);
counts[bucketToWipe] = 0;
long total = totalObjects(obj);
if (currBucketVal != 0) {
p("線程while循環(huán):擦除掉的值為不為0:那就發(fā)射數(shù)據(jù):obj total"
+ obj + ":" + total);
_collector.emit(new Values(obj, total));
}
if (total == 0) {
p("線程while循環(huán): 總數(shù)為0以后,將obj對(duì)象刪除");
_objectCounts.remove(obj);
}
}
}
lastBucket = currBucket;
}
long delta = millisPerBucket(_numBuckets)
- (System.currentTimeMillis() % millisPerBucket(_numBuckets));
Utils.sleep(delta);
p("\n");
}
}
});
cleaner.start();
}
@Override
public void execute(Tuple input) {
Object obj1 = input.getValue(0);
Object obj = input.getValue(1);
int currentBucket = currentBucket(_numBuckets);
p("execute方法:當(dāng)前桶:bucket: " + currentBucket);
synchronized (_objectCounts) {
long[] curr = _objectCounts.get(obj);
if (curr == null) {
curr = new long[_numBuckets];
_objectCounts.put(obj, curr);
}
curr[currentBucket]++;
System.err
.print(("execute方法:接受到的merchandiseIDS:" + obj.toString() + ",long數(shù)組:"));
for (long number : curr) {
System.err.print(number + ":");
}
p("execute方法:發(fā)射的數(shù)據(jù): " + obj + ":" + totalObjects(obj));
/**
* 我們不斷的發(fā)射的也就是我們某一個(gè)商品id,在當(dāng)前滑動(dòng)窗口,也就是我們的時(shí)間周期內(nèi)的指標(biāo)計(jì)算值
* 要注意,在排序的過程之中,我們只針對(duì)key, 也就是我們的商品id,由此發(fā)射給后續(xù)的排序bolt依據(jù)包含了時(shí)間區(qū)間的信息
*/
// 每來一條數(shù)據(jù),就會(huì)發(fā)射一次
_collector.emit(new Values(obj, totalObjects(obj)));
_collector.ack(input);
}
p("\n");
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("merchandiseID", "count"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
public void p(Object o) {
System.err.println(o.toString());
}
}在這里,最需要我們關(guān)注的地方是,滑動(dòng)窗口每滑動(dòng)一次,將情況一組數(shù)據(jù)。 而發(fā)射數(shù)據(jù)的過程之中將統(tǒng)計(jì)這一組數(shù)
據(jù)。
感謝各位的閱讀,以上就是“Storm排序怎么實(shí)現(xiàn)”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)Storm排序怎么實(shí)現(xiàn)這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
網(wǎng)頁標(biāo)題:Storm排序怎么實(shí)現(xiàn)
本文來源:http://chinadenli.net/article42/gesshc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)建站、定制網(wǎng)站、自適應(yīng)網(wǎng)站、Google、電子商務(wù)、響應(yīng)式網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)