本篇內(nèi)容介紹了“Storm MongoDB接口怎么使用”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

成都創(chuàng)新互聯(lián)-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價(jià)比廣德網(wǎng)站開(kāi)發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫(kù),直接使用。一站式廣德網(wǎng)站制作公司更省心,省錢(qián),快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋廣德地區(qū)。費(fèi)用合理售后完善,10多年實(shí)體公司更值得信賴。
整體的Storn接口分為以下的幾個(gè)class
1:MongoBolt.java
2 : MongoSpout.java
3 : MongoTailableCursorTopology.java
4 : SimpleMongoBolt.java
看代碼說(shuō)話:
1
package storm.mongo;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.mongodb.DB;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;
import com.mongodb.WriteConcern;
/**
*
* 注意在這里,沒(méi)有實(shí)現(xiàn)批處理的調(diào)用,并且只是一個(gè)抽象類,對(duì)于Mongo的Storm交互做了一次封裝
*
* @author Adrian Petrescu <apetresc@gmail.com>
*
*/
public abstract class MongoBolt extends BaseRichBolt {
private OutputCollector collector;
// MOngDB的DB對(duì)象
private DB mongoDB;
//記錄我們的主機(jī),端口,和MongoDB的數(shù)據(jù)DB民粹
private final String mongoHost;
private final int mongoPort;
private final String mongoDbName;
/**
* @param mongoHost The host on which Mongo is running.
* @param mongoPort The port on which Mongo is running.
* @param mongoDbName The Mongo database containing all collections being
* written to.
*/
protected MongoBolt(String mongoHost, int mongoPort, String mongoDbName) {
this.mongoHost = mongoHost;
this.mongoPort = mongoPort;
this.mongoDbName = mongoDbName;
}
@Override
public void prepare(
@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
try {
//prepare方法目前在初始化的過(guò)程之中得到了一個(gè)Mongo的連接
this.mongoDB = new MongoClient(mongoHost, mongoPort).getDB(mongoDbName);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void execute(Tuple input) {
//注意我們?cè)谶@里還有一個(gè)判斷,判斷當(dāng)前是否該發(fā)射
if (shouldActOnInput(input)) {
String collectionName = getMongoCollectionForInput(input);
DBObject dbObject = getDBObjectForInput(input);
if (dbObject != null) {
try {
mongoDB.getCollection(collectionName).save(dbObject, new WriteConcern(1));
collector.ack(input);
} catch (MongoException me) {
collector.fail(input);
}
}
} else {
collector.ack(input);
}
}
/**
* Decide whether or not this input tuple should trigger a Mongo write.
*
* @param input the input tuple under consideration
* @return {@code true} iff this input tuple should trigger a Mongo write
*/
public abstract boolean shouldActOnInput(Tuple input);
/**
* Returns the Mongo collection which the input tuple should be written to.
*
* @param input the input tuple under consideration
* @return the Mongo collection which the input tuple should be written to
*/
public abstract String getMongoCollectionForInput(Tuple input);
/**
* Returns the DBObject to store in Mongo for the specified input tuple.
*
拿到DBObject的一個(gè)抽象類
* @param input the input tuple under consideration
* @return the DBObject to be written to Mongo
*/
public abstract DBObject getDBObjectForInput(Tuple input);
//注意這里隨著計(jì)算的終結(jié)被關(guān)閉了。
@Override
public void cleanup() {
this.mongoDB.getMongo().close();
}
}2 :
package storm.mongo;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.utils.Utils;
import com.mongodb.BasicDBObject;
import com.mongodb.Bytes;
import com.mongodb.DB;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoException;
/**
* A Spout which consumes documents from a Mongodb tailable cursor.
*
* Subclasses should simply override two methods:
* <ul>
* <li>{@link #declareOutputFields(OutputFieldsDeclarer) declareOutputFields}
* <li>{@link #dbObjectToStormTuple(DBObject) dbObjectToStormTuple}, which turns
* a Mongo document into a Storm tuple matching the declared output fields.
* </ul>
*
** <p>
* <b>WARNING:</b> You can only use tailable cursors on capped collections.
*
* @author Dan Beaulieu <danjacob.beaulieu@gmail.com>
*
*/
// 在這里,抽象的過(guò)程中,依舊保持了第一層的Spout為一個(gè)抽象類,MongoSpout為abstract的一個(gè)抽象類,子類在繼承這// 個(gè)類的過(guò)程之中實(shí)現(xiàn)特定的方法即可
// 這里還有一個(gè)類似Cursor的操作。
public abstract class MongoSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private LinkedBlockingQueue<DBObject> queue;
private final AtomicBoolean opened = new AtomicBoolean(false);
private DB mongoDB;
private final DBObject query;
private final String mongoHost;
private final int mongoPort;
private final String mongoDbName;
private final String mongoCollectionName;
public MongoSpout(String mongoHost, int mongoPort, String mongoDbName, String mongoCollectionName, DBObject query) {
this.mongoHost = mongoHost;
this.mongoPort = mongoPort;
this.mongoDbName = mongoDbName;
this.mongoCollectionName = mongoCollectionName;
this.query = query;
}
class TailableCursorThread extends Thread {
// 內(nèi)部類 TailableCursorThread線程
//注意在其中我們使用了LinkedBlockingQueue的對(duì)象,有關(guān)java高并發(fā)的集合類,請(qǐng)參考本ID的【Java集合類型的博文】博文。
LinkedBlockingQueue<DBObject> queue;
String mongoCollectionName;
DB mongoDB;
DBObject query;
public TailableCursorThread(LinkedBlockingQueue<DBObject> queue, DB mongoDB, String mongoCollectionName, DBObject query) {
this.queue = queue;
this.mongoDB = mongoDB;
this.mongoCollectionName = mongoCollectionName;
this.query = query;
}
public void run() {
while(opened.get()) {
try {
// create the cursor
mongoDB.requestStart();
final DBCursor cursor = mongoDB.getCollection(mongoCollectionName)
.find(query)
.sort(new BasicDBObject("$natural", 1))
.addOption(Bytes.QUERYOPTION_TAILABLE)
.addOption(Bytes.QUERYOPTION_AWAITDATA);
try {
while (opened.get() && cursor.hasNext()) {
final DBObject doc = cursor.next();
if (doc == null) break;
queue.put(doc);
}
} finally {
try {
if (cursor != null) cursor.close();
} catch (final Throwable t) { }
try {
mongoDB.requestDone();
} catch (final Throwable t) { }
}
Utils.sleep(500);
} catch (final MongoException.CursorNotFound cnf) {
// rethrow only if something went wrong while we expect the cursor to be open.
if (opened.get()) {
throw cnf;
}
} catch (InterruptedException e) { break; }
}
};
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.queue = new LinkedBlockingQueue<DBObject>(1000);
try {
this.mongoDB = new MongoClient(this.mongoHost, this.mongoPort).getDB(this.mongoDbName);
} catch (Exception e) {
throw new RuntimeException(e);
}
TailableCursorThread listener = new TailableCursorThread(this.queue, this.mongoDB, this.mongoCollectionName, this.query);
this.opened.set(true);
listener.start();
}
@Override
public void close() {
this.opened.set(false);
}
@Override
public void nextTuple() {
DBObject dbo = this.queue.poll();
if(dbo == null) {
Utils.sleep(50);
} else {
this.collector.emit(dbObjectToStormTuple(dbo));
}
}
@Override
public void ack(Object msgId) {
// TODO Auto-generated method stub
}
@Override
public void fail(Object msgId) {
// TODO Auto-generated method stub
}
public abstract List<Object> dbObjectToStormTuple(DBObject message);
}“Storm MongoDB接口怎么使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
分享題目:StormMongoDB接口怎么使用
網(wǎng)站地址:http://chinadenli.net/article26/gphccg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App開(kāi)發(fā)、App設(shè)計(jì)、靜態(tài)網(wǎng)站、關(guān)鍵詞優(yōu)化、服務(wù)器托管、網(wǎng)站策劃
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)