欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

Flume-1.6.0學(xué)習(xí)筆記(六)kafkasource

在洪洞等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專(zhuān)注、極致的服務(wù)理念,為客戶(hù)提供成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、外貿(mào)網(wǎng)站建設(shè) 網(wǎng)站設(shè)計(jì)制作按需定制設(shè)計(jì),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站設(shè)計(jì),成都全網(wǎng)營(yíng)銷(xiāo),外貿(mào)營(yíng)銷(xiāo)網(wǎng)站建設(shè),洪洞網(wǎng)站建設(shè)費(fèi)用合理。

魯春利的工作筆記,好記性不如爛筆頭


Flume1.6.0增加了對(duì)kafka的完全支持:

Flume Sink and Source for Apache Kafka
A new channel that uses Kafka

Kafka Source(http://flume.apache.org/FlumeUserGuide.html#kafka-source)

    Kafka Source is an Apache Kafka consumer that reads messages from a Kafka topic.

    If you have multiple Kafka sources running, you can configure them with the same Consumer Group so each will read a unique set of partitions for the topic.

File Channel(http://flume.apache.org/FlumeUserGuide.html#file-channel)

    

HBase Sink(http://flume.apache.org/FlumeUserGuide.html#hbasesink)

    The type is the FQCN: org.apache.flume.sink.hbase.HBaseSink.

Kafka生成的topic為myhbase

[hadoop@nnode kafka0.8.2.1]$ bin/kafka-topics.sh --create --zookeeper nnode:2181,dnode1:2181,dnode2:2181 --replication-factor 1 --partitions 1 --topic myhbase
Created topic "myhbase".
[hadoop@nnode kafka0.8.2.1]$ bin/kafka-topics.sh --list --zookeeper nnode:2181,dnode1:2181,dnode2:2181
myhbase
mykafka
mytopic - marked for deletion
test - marked for deletion
[hadoop@nnode kafka0.8.2.1]$

HBase表結(jié)構(gòu)

[hadoop@nnode kafka0.8.2.1]$ hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.0.1, r66a93c09df3b12ff7b86c39bc8475c60e15af82d, Fri Apr 17 22:14:06 PDT 2015

表名:t_inter_log
列族:cf

Flume配置文件

vim conf/kafka-hbase.conf

# read from kafka and write to hbase

agent.sources = kafka-source
agent.channels = mem-channel
agent.sinks = hbase-sink

# source
agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka-source.zookeeperConnect = nnode:2181,dnode1:2181,dnode2:2181
agent.sources.kafka-source.groupId = flume
agent.sources.kafka-source.topic = myhbase
agent.sources.kafka-source.kafka.consumer.timeout.ms = 100

# channel
agent.channels.mem-channel.type = memory

# sink
agent.sinks.hbase-sink.type = hbase
agent.sinks.hbase-sink.table = t_inter_log
agent.sinks.hbase-sink.columnFamily  = cf
# agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

# assemble
agent.sources.kafka-source.channels = mem-channel
agent.sinks.hbase-sink.channel = mem-channel

啟動(dòng)Kafka

[hadoop@nnode kafka0.8.2.1]# bin/kafka-server-start.sh config/server.properties

啟動(dòng)flume-ng

[hadoop@nnode flume1.6.0]$ bin/flume-ng agent --conf conf --name agent --conf-file conf/kafka-hbase.conf -Dflume.root.logger=INFO,console

通過(guò)Java Api實(shí)現(xiàn)producer

package com.lucl.kafka.simple;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.apache.log4j.Logger;

/**
 * <p> Copyright: Copyright (c) 2015 </p>
 * 
 * <p> Date : 2015-11-17 21:42:50 </p>
 * 
 * <p> Description : JavaApi for kafka producer </p>
 *
 * @author luchunli
 * 
 * @version 1.0
 *
 */
public class SimpleKafkaProducer {
    private static final Logger logger = Logger.getLogger(SimpleKafkaProducer.class);
    /**
     * 
     */
    private void execMsgSend() {
        Properties props = new Properties();
        props.put("metadata.broker.list", "192.168.137.117:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "0");
        
        ProducerConfig config = new ProducerConfig(props); 
        
        logger.info("set config info(" + config + ") ok.");
        
        Producer<String, String> procuder = new Producer<>(config);
        
        String topic = "myhbase";
        
        String columnFamily = "cf";
        String column = "count";
        for (int i = 1; i <= 10; i++) {
            String rowkey = "www.value_" + i + ".com";
            String value = "value_" + i;
            
            String event = rowkey + ", " + columnFamily + ":" + column + ", " + value;
            logger.info(event);
            KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, event);
            procuder.send(msg);
        }
        logger.info("send message over.");
            
        procuder.close();
    }
    
    /**
     * @param args
     */
    public static void main(String[] args) {
        SimpleKafkaProducer simpleProducer = new SimpleKafkaProducer();
        simpleProducer.execMsgSend();
    }

}

觀察Flume-ng控制臺(tái)輸出

2015-11-21 23:09:47,466 (flume_nnode-1448118584558-54f0a1ba-leader-finder-thread) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] [ConsumerFetcherManager-1448118585060] Added fetcher for partitions ArrayBuffer([[myhbase,0], initOffset 70 to broker id:117,host:nnode,port:9092] )
2015-11-21 23:09:59,147 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SINK, name: hbase-sink: Successfully registered new MBean.
2015-11-21 23:09:59,147 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SINK, name: hbase-sink started
2015-11-21 23:15:30,702 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:351)] Failed to commit transaction.Transaction rolled back.
java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)V
        at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:377)
        at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:372)
        at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
        at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:372)
        at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:342)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
2015-11-21 23:15:30,716 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:354)] Failed to commit transaction.Transaction rolled back.
java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)V
        at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:377)
        at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:372)
        at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
        at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:372)
        at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:342)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)V
        at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:377)
        at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:372)
        at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
        at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:372)
        at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:342)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
^X^C2015-11-21 23:15:38,090 (agent-shutdown-hook) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.stop(LifecycleSupervisor.java:79)] Stopping lifecycle supervisor 10
2015-11-21 23:15:38,103 (PollableSourceRunner-KafkaSource-kafka-source) [INFO - org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:149)] Source runner interrupted. Exiting

    寫(xiě)入失敗。

查看HBase的表

hbase(main):004:0> scan 't_inter_log'
ROW                                                COLUMN+CELL                                                                                                                                      
0 row(s) in 0.0140 seconds

hbase(main):005:0>

當(dāng)前文章:Flume-1.6.0學(xué)習(xí)筆記(六)kafkasource
文章出自:http://chinadenli.net/article18/jgjdgp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信公眾號(hào)、手機(jī)網(wǎng)站建設(shè)、網(wǎng)站維護(hù)、自適應(yīng)網(wǎng)站、定制網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站建設(shè)
亚洲午夜福利不卡片在线| 福利在线午夜绝顶三级| 亚洲性生活一区二区三区| 国产精品丝袜美腿一区二区| 国产乱久久亚洲国产精品| 99视频精品免费视频| 久久91精品国产亚洲| 欧美特色特黄一级大黄片| 日韩不卡一区二区三区色图| 国产精品久久久久久久久久久痴汉| 五月天综合网五月天综合网| 日本道播放一区二区三区| 亚洲精品国产美女久久久99| 欧美日韩国产的另类视频| 国产小青蛙全集免费看| 国产精品一区二区三区日韩av | 91人妻人人澡人人人人精品| 亚洲黄片在线免费小视频| 加勒比东京热拍拍一区二区| 91精品蜜臀一区二区三区| 免费在线观看欧美喷水黄片| 欧美国产极品一区二区| 加勒比日本欧美在线观看| 91一区国产中文字幕| 日本一本在线免费福利| 国产亚洲精品岁国产微拍精品| 日韩欧美亚洲综合在线| 欧美日本道一区二区三区| 国产精品午夜性色视频| 九九热精彩视频在线播放| 国产成人精品午夜福利| 日本91在线观看视频| 亚洲精品国产主播一区| 亚洲高清一区二区高清| 国产又长又粗又爽免费视频| 国产福利在线播放麻豆| 九九九热在线免费视频| 欧美日韩综合在线第一页| 中日韩美一级特黄大片| 东京干男人都知道的天堂| 欧美日韩黑人免费观看|