欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

如何解析Flume與Kafka整合-創(chuàng)新互聯(lián)

這篇文章給大家介紹如何解析Flume與Kafka整合,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。

創(chuàng)新互聯(lián)公司主營(yíng)東陽(yáng)網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營(yíng)網(wǎng)站建設(shè)方案,成都APP應(yīng)用開(kāi)發(fā),東陽(yáng)h5成都微信小程序搭建,東陽(yáng)網(wǎng)站營(yíng)銷推廣歡迎東陽(yáng)等地區(qū)企業(yè)咨詢

Flume與Kafka整合


一、概念
1、Flume:Cloudera 開(kāi)發(fā)的分布式日志收集系統(tǒng),是一種分布式,可靠且可用的服務(wù),用于高效地收集,匯總和移動(dòng)大量日志數(shù)據(jù)。 它具有基于流式數(shù)據(jù)流的簡(jiǎn)單而靈活的架構(gòu)。它具有可靠的可靠性機(jī)制和許多故障轉(zhuǎn)移和恢復(fù)機(jī)制,具有強(qiáng)大的容錯(cuò)性和容錯(cuò)能力。它使用一個(gè)簡(jiǎn)單的可擴(kuò)展數(shù)據(jù)模型,允許在線分析應(yīng)用程序。Flume分為OG、NG版本,其中Flume OG 的最后一個(gè)發(fā)行版本 0.94.0,之后為NG版本。
 
2、Kafka:作為一個(gè)集群運(yùn)行在一臺(tái)或多臺(tái)可以跨越多個(gè)數(shù)據(jù)中心的服務(wù)器上。在Kafka中,客戶端和服務(wù)器之間的通信是通過(guò)一種簡(jiǎn)單的,高性能的,語(yǔ)言不可知的TCP協(xié)議完成的。協(xié)議是版本控制的,并保持與舊版本的向后兼容性。Kafka提供Java客戶端,但客戶端可以使用多種語(yǔ)言。

3、Kafka通常用于兩大類應(yīng)用,如下:
   A、構(gòu)建可在系統(tǒng)或應(yīng)用程序之間可靠獲取數(shù)據(jù)的實(shí)時(shí)流數(shù)據(jù)管道
   B、構(gòu)建實(shí)時(shí)流應(yīng)用程序,用于轉(zhuǎn)換或響應(yīng)數(shù)據(jù)流
   C、Kafka每個(gè)記錄由一個(gè)鍵,一個(gè)值和一個(gè)時(shí)間戳組成。

二、產(chǎn)述背景
    基于大數(shù)據(jù)領(lǐng)域?qū)崿F(xiàn)日志數(shù)據(jù)時(shí)時(shí)采集及數(shù)據(jù)傳遞等需要,據(jù)此需求下試著完成flume+kafka扇入、扇出功能整合,其中扇出包括:復(fù)制流、復(fù)用流等功能性測(cè)試。后續(xù)根據(jù)實(shí)際需要,將完善kafka與spark streaming進(jìn)行整合整理工作。
    注:此文檔僅限于功能性測(cè)試,性能優(yōu)化方面請(qǐng)大家根據(jù)實(shí)際情況增加。

三、部署安裝
1、測(cè)試環(huán)境說(shuō)明:
   操作系統(tǒng):CentOS 7
   Flume版本:flume-ng-1.6.0-cdh6.7.0
   Kafka版本:kafka_2.11-0.10.0.1
   JDK版本:JDK1.8.0
   Scala版本:2.11.8
2、測(cè)試步驟:
2.1、flume部署
2.1.1、下載安裝介質(zhì),并解壓:

此處)折疊或打開(kāi)

此處)折疊或打開(kāi)

此處)折疊或打開(kāi)

  1. cd /app/apache-flume-1.6.0-cdh6.7.0-bin

  2. vi netcatOrKafka-memory-logger.conf

  3.     netcatagent.sources = netcat_sources

  4.     netcatagent.channels = c1 c2

  5.     netcatagent.sinks = logger_sinks kafka_sinks

  6.     

  7.     netcatagent.sources.netcat_sources.type = netcat

  8.     netcatagent.sources.netcat_sources.bind = 0.0.0.0

  9.     netcatagent.sources.netcat_sources.port = 44444

  10.     

  11.     netcatagent.channels.c1.type = memory

  12.     netcatagent.channels.c1.capacity = 1000

  13.     netcatagent.channels.c1.transactionCapacity = 100

  14.     

  15.     netcatagent.channels.c2.type = memory

  16.     netcatagent.channels.c2.capacity = 1000

  17.     netcatagent.channels.c2.transactionCapacity = 100

  18.     

  19.     netcatagent.sinks.logger_sinks.type = logger

  20.     

  21.     netcatagent.sinks.kafka_sinks.type = org.apache.flume.sink.kafka.KafkaSink

  22.     netcatagent.sinks.kafka_sinks.topic = test

  23.     netcatagent.sinks.kafka_sinks.brokerList = 192.168.137.132:9082,192.168.137.133:9082,192.168.137.134:9082

  24.     netcatagent.sinks.kafka_sinks.requiredAcks = 0

  25.     ##netcatagent.sinks.kafka_sinks.batchSize = 20

  26.     netcatagent.sinks.kafka_sinks.producer.type=sync

  27.     netcatagent.sinks.kafka_sinks.custom.encoding=UTF-8

  28.     netcatagent.sinks.kafka_sinks.partition.key=0

  29.     netcatagent.sinks.kafka_sinks.serializer.class=kafka.serializer.StringEncoder

  30.     netcatagent.sinks.kafka_sinks.partitioner.class=org.apache.flume.plugins.SinglePartition

  31.     netcatagent.sinks.kafka_sinks.max.message.size=1000000

  32.     

  33.     netcatagent.sources.netcat_sources.selector.type = replicating

  34.     

  35.     netcatagent.sources.netcat_sources.channels = c1 c2

  36.     netcatagent.sinks.logger_sinks.channel = c1

  37.     netcatagent.sinks.kafka_sinks.channel = c2

2.4.2、啟動(dòng)各測(cè)試命令:
   A、啟動(dòng)flume的agent(于192.168.137.130):
      flume-ng agent --name netcatagent \
       --conf $FLUME_HOME/conf \
       --conf-file $FLUME_HOME/conf/netcat-memory-loggerToKafka.conf \
       -Dflume.root.logger=INFO,console
    B、啟動(dòng)kafka消費(fèi)者(于192.168.137.132):
        kafka-console-consumer.sh \
         --zookeeper 192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181/kafka \
          --from-beginning --topic test
     C、測(cè)試發(fā)送(于192.168.137.130與于192.168.137.132)
telnet發(fā)送結(jié)果

如何解析Flume與Kafka整合

kafka消費(fèi)結(jié)果

如何解析Flume與Kafka整合

最終logger接收結(jié)果

如何解析Flume與Kafka整合

         
   至此flume+kafka扇出--復(fù)制流測(cè)試(扇入源為:netcat;輸出為:kafka+Flume的Logger)測(cè)試與驗(yàn)證完成。
   
2.5、flume+kafka扇出--復(fù)用流測(cè)試(扇入源為:netcat;輸出為:kafka+Flume的Logger)

   暫無(wú),后續(xù)補(bǔ)充



四、部署安裝及驗(yàn)證過(guò)程中出現(xiàn)的問(wèn)題

    1、做flume+kafka扇入測(cè)試(扇入源為:netcat+kafka;輸出以Flume的Logger類型輸出)時(shí),一直未收到kafka數(shù)據(jù)
        主要原因是在做kafka的配置時(shí)在配置文件(server.properties)中寫成內(nèi)容:
       zookeeper.connect=192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181
       但在創(chuàng)建topics時(shí),使用的是:
       kafka-topics.sh --create \
   --zookeeper 192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181/kafka \
--replication-factor 3 --partitions 3 --topic test
 
其中在kafka的配置文件中zookeeper配置未加/kakfa,但在創(chuàng)建topics的時(shí)增加了/kafka
最終使用:
kafka-console-producer.sh \
--broker-list 192.168.137.132:9092,192.168.137.133:9092,192.168.137.134:9092 \
--topic test
命令檢查沒(méi)有topics信息才發(fā)現(xiàn)此問(wèn)題
   
   解決辦法:將兩個(gè)信息同步即可
   
2、做flume+kafka扇入測(cè)試(扇入源為:netcat+kafka;輸出以Flume的Logger類型輸出)時(shí),啟動(dòng)flume的agent時(shí)報(bào)錯(cuò)。
2018-03-31 10:43:31,241 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:142)] Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to load source type: org.apache.flume.source.kafka,KafkaSource, class: org.apache.flume.source.kafka,KafkaSource
        at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:69)
        at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:42)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:322)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flume.source.kafka,KafkaSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:67)
        ... 11 more


   解決辦法:官網(wǎng)資料存在問(wèn)題,org.apache.flume.source.kafka,KafkaSource其中不應(yīng)該包括逗號(hào),改為:org.apache.flume.source.kafka.KafkaSource即可。詳細(xì)官網(wǎng)
如何解析Flume與Kafka整合

關(guān)于如何解析Flume與Kafka整合就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。

名稱欄目:如何解析Flume與Kafka整合-創(chuàng)新互聯(lián)
文章鏈接:http://chinadenli.net/article46/cdodeg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供關(guān)鍵詞優(yōu)化定制網(wǎng)站微信公眾號(hào)企業(yè)建站網(wǎng)站策劃網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都seo排名網(wǎng)站優(yōu)化