1. create Idea project for AsyncHbaseEventSerializer
創(chuàng)新互聯(lián)-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價比大城網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式大城網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋大城地區(qū)。費用合理售后完善,十余年實體公司更值得信賴。
添加dependency 到pom.xml
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-ng-hbase-sink</artifactId>
<version>1.6.0</version>
</dependency>
Implements AsyncHbaseEventSerializer according to the business.
importorg.apache.flume.Context; importorg.apache.flume.Event; importorg.apache.flume.conf.ComponentConfiguration; importorg.apache.flume.sink.hbase.AsyncHbaseEventSerializer; importorg.hbase.async.AtomicIncrementRequest; importorg.hbase.async.PutRequest; importjava.util.ArrayList; importjava.util.List; /** * Created by root on 12/5/17. */ public classSplittingSerializerimplementsAsyncHbaseEventSerializer { private byte[]table; private byte[]colFam; privateEventcurrentEvent; private byte[][]rentRowKey; private final byte[]eventCountCol="eventCount".getBytes(); columnNames; private finalList<PutRequest>puts=newArrayList<PutRequest>(); private finalList<AtomicIncrementRequest>incs=newArrayList<AtomicIncrementRequest>(); private byte[] cur public voidinitialize(byte[] table, byte[] cf) { this.table= table; this.colFam= cf; //Can not get the columns from context in configure method. Had to hard coded here. columnNames =new byte[3][]; columnNames[0] ="name".getBytes(); columnNames[1] ="id".getBytes(); columnNames[2] ="phone".getBytes(); } public voidsetEvent(Event event) { // Set the event and verify that the rowKey is not present this.currentEvent= event; /* //Don't know how to set the key of event header. String rowKeyStr = currentEvent.getHeaders().get("rowKey"); if (rowKeyStr == null) { throw new FlumeException("No row key found in headers!"); } currentRowKey = rowKeyStr.getBytes();*/ } publicList<PutRequest>getActions() { // Split the event body and get the values for the columns String eventStr =newString(currentEvent.getBody()); String[] cols = eventStr.split(","); Long currTime = System.currentTimeMillis(); longrevTs = Long.MAX_VALUE- currTime; currentRowKey = (Long.toString(revTs) + cols[0]).getBytes(); puts.clear(); for(inti =0;i < cols.length;i++) { //Generate a PutRequest for each column. PutRequest req =newPutRequest(table,currentRowKey,colFam, columnNames[i],cols[i].getBytes()); puts.add(req); } returnputs; } publicList<AtomicIncrementRequest>getIncrements() { incs.clear(); //Increment the number of events received incs.add(newAtomicIncrementRequest(table,"totalEvents".getBytes(),colFam,eventCountCol)); returnincs; } public voidcleanUp() { table=null; colFam=null; currentEvent=null; columnNames =null; currentRowKey =null; } public voidconfigure(Context context) { //Get the column names from the configuration //Did not work. Don't know how to use it. String cols =newString(context.getString("columns")); String[] names = cols.split(","); byte[][] columnNames =new byte[names.length][]; inti =0; System.out.println("getting columnNames"); for(String name : names) { columnNames[i++] = name.getBytes(); } } public voidconfigure(ComponentConfiguration componentConfiguration) { } } |
build and deploy the jar file
build --> build artifacts
copy to the lib directory of flume. Here I use scp to upload to the flume of another host.
2. configure flume
a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 sink2 a1.source.s1.selector.type = replicating #NetCat TCP source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 a1.sources.r1.channels = c1 c2 #channel a1.channels.c2.type = memory a1.channels.c2.capacity = 10000 a1.channels.c2.transactionCapacity = 1000 #HBase sink a1.sinks.sink2.type = org.apache.flume.sink.hbase.AsyncHBaseSink a1.sinks.sink2.channel = c2 a1.sinks.sink2.table = law a1.sinks.sink2.columnFamily = lawfile a1.sinks.sink2.batchSize = 5000 #The serializer to use a1.sinks.sink2.serializer = ifre.flume.hbase.SplittingSerializer #List of columns each event writes to. a1.sinks.sink2.serializer.columns = name,id,phone |
3. create hbase table
# hbase shell create "law" "lawfile" |
4. run flume agent
[root@ifrebigsearch2 apache-flume-1.6.0-bin]# bin/flume-ng agent --conf conf --conf-file conf/crawler-hdfs-conf.properties --name a1 -Dflume.root.logger=INFO,console |
5. run nc
[root@ifrebigsearch0 dkh]# nc ifrebigsearch2 6666 zhangsan,10110198806054561,13812345678 OK |
6.result
hbase(main):002:0> scan 'law' ROW COLUMN+CELL 9223370524386395508z column=lawfile:id, timestamp=1512468380362, value=10110198 hangsan 806054561 9223370524386395508z column=lawfile:name, timestamp=1512468380361, value=zhangs hangsan an 9223370524386395508z column=lawfile:phone, timestamp=1512468380363, value=13812 hangsan 345678 |
網(wǎng)站欄目:實戰(zhàn):StreamingdataintoHBaseusingFlum
網(wǎng)頁地址:http://chinadenli.net/article30/iegiso.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站建設(shè)、手機網(wǎng)站建設(shè)、響應(yīng)式網(wǎng)站、微信公眾號、外貿(mào)網(wǎng)站建設(shè)、自適應(yīng)網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)