storm是grovvy寫的

kafka是scala寫的
storm-kafka storm連接kafka consumer的插件
下載地址:
https://github.com/wurstmeister/storm-kafka-0.8-plus
除了需要storm和kafka相關(guān)jar包還需要google-collections-1.0.jar
以及zookeeper相關(guān)包 curator-framework-1.3.3.jar和curator-client-1.3.3.jar
以前由com.netflix.curator組織開發(fā)現(xiàn)在歸到org.apache.curator下面
1.Kafka Consumer即Storm Spout代碼
package demo;
import java.util.ArrayList;
import java.util.List;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
public class MyKafkaSpout {
public static void main(String[] args) {
String topic ="track";
ZkHosts zkhosts = new ZkHosts("192.168.1.107:2181,192.168.1.108:2181,192.168.1.109:2181");
SpoutConfig spoutConfig = new SpoutConfig(zkhosts, topic,
"/MyKafka", //偏移量offset的根目錄
"MyTrack");//子目錄對應(yīng)一個應(yīng)用
List<String> zkServers=new ArrayList<String>();
//zkServers.add("192.168.1.107");
//zkServers.add("192.168.1.108");
for(String host:zkhosts.brokerZkStr.split(","))
{
zkServers.add(host.split(":")[0]);
}
spoutConfig.zkServers=zkServers;
spoutConfig.zkPort=2181;
spoutConfig.forceFromStart=true;//從頭開始消費(fèi),實(shí)際上是要改成false的
spoutConfig.socketTimeoutMs=60;
spoutConfig.scheme=new SchemeAsMultiScheme(new StringScheme());//定義輸出為string類型
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(spoutConfig),1);//引用spout,并發(fā)度設(shè)為1
builder.setBolt("bolt1", new MyKafkaBolt(),1).shuffleGrouping("spout");
Config config =new Config();
config.setDebug(true);//上線之前都要改成false否則日志會非常多
if(args.length>0){
try {
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} catch (AlreadyAliveException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvalidTopologyException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else{
LocalCluster localCluster=new LocalCluster();
localCluster.submitTopology("mytopology", config, builder.createTopology());
//本地模式在一個進(jìn)程里面模擬一個storm集群的所有功能
}
}
}2.Bolt代碼只是簡單打印輸出,覆寫execute方法即可
package demo;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class MyKafkaBolt implements IBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public void execute(Tuple input, BasicOutputCollector arg1) {
String kafkaMsg =input.getString(0);
System.err.println("bolt"+kafkaMsg);
}
@Override
public void prepare(Map arg0, TopologyContext arg1) {
// TODO Auto-generated method stub
}
}另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。
網(wǎng)站題目:storm-kafka(stormspout作為kafka的消費(fèi)端)-創(chuàng)新互聯(lián)
文章地址:http://chinadenli.net/article46/edehg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供域名注冊、移動網(wǎng)站建設(shè)、做網(wǎng)站、網(wǎng)站維護(hù)、網(wǎng)站營銷、關(guān)鍵詞優(yōu)化
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容