[TOC]
創(chuàng)新互聯(lián)建站主要從事成都做網(wǎng)站、網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)未央,10余年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專(zhuān)業(yè),歡迎來(lái)電咨詢(xún)建站服務(wù):028-86922220
Flume是一個(gè)分布式、可靠、和高可用的海量日志采集、聚合和傳輸?shù)南到y(tǒng)。
支持在日志系統(tǒng)中定制各類(lèi)數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù);
同時(shí),F(xiàn)lume提供對(duì)數(shù)據(jù)進(jìn)行簡(jiǎn)單處理,并寫(xiě)到各種數(shù)據(jù)接受方(比如文本、HDFS、Hbase等)的能力。
名詞介紹:
Flume OG:Flume original generation,即Flume0.9x版本
Flume NG:Flume next generation,即Flume1.x版本
官網(wǎng):http://flume.apache.org
1、Flume有一個(gè)簡(jiǎn)單、靈活的基于流的數(shù)據(jù)流結(jié)構(gòu)
2、Flume具有故障轉(zhuǎn)移機(jī)制和負(fù)載均衡機(jī)制
3、Flume使用一個(gè)簡(jiǎn)單可擴(kuò)展的數(shù)據(jù)模型(source、channel、sink)
目前,flume-ng處理數(shù)據(jù)有兩種方式:avro-client、agent
avro-client:一次性將數(shù)據(jù)傳輸?shù)街付ǖ腶vro服務(wù)的客戶(hù)端
agent:一個(gè)持續(xù)傳輸數(shù)據(jù)的服務(wù)
Agent主要的組件包括:Source、Channel、Sink
Source:完成對(duì)日志數(shù)據(jù)的手機(jī),分成transtion和event打入到channel之中。
Channel:主要提供一個(gè)隊(duì)列的功能,對(duì)source提供的數(shù)據(jù)進(jìn)行簡(jiǎn)單的緩存。
Sink:取出Channel中的數(shù)據(jù),進(jìn)行相應(yīng)的存儲(chǔ)文件系統(tǒng),數(shù)據(jù)庫(kù)或是提交到遠(yuǎn)程服務(wù)器。
數(shù)據(jù)在組件傳輸?shù)膯挝皇荅vent。
source意為來(lái)源、源頭。
主要作用:從外界采集各種類(lèi)型的數(shù)據(jù),將數(shù)據(jù)傳遞給Channel。
比如:監(jiān)控某個(gè)文件只要增加數(shù)據(jù)就立即采集新增的數(shù)據(jù)、監(jiān)控某個(gè)目錄一旦有新文件產(chǎn)生就采集新文件的內(nèi)容、監(jiān)控某個(gè)端口等等。
常見(jiàn)采集的數(shù)據(jù)類(lèi)型:
Exec Source、Avro Source、NetCat Source、Spooling Directory Source等
詳細(xì)查看:
http://flume.apache.org/FlumeUserGuide.html#flume-sources
或者自帶的文檔查看。
Source具體作用:
AvroSource:監(jiān)聽(tīng)一個(gè)avro服務(wù)端口,采集Avro數(shù)據(jù)序列化后的數(shù)據(jù);
Thrift Source:監(jiān)聽(tīng)一個(gè)Thrift 服務(wù)端口,采集Thrift數(shù)據(jù)序列化后的數(shù)據(jù);
Exec Source:基于Unix的command在標(biāo)準(zhǔn)輸出上采集數(shù)據(jù);
tail -F 和tail -f 區(qū)別?;趌og4j切割文件時(shí)的能否讀取問(wèn)題。
JMS Source:Java消息服務(wù)數(shù)據(jù)源,Java消息服務(wù)是一個(gè)與具體平臺(tái)無(wú)關(guān)的API,這是支持jms規(guī)范的數(shù)據(jù)源采集;
Spooling Directory Source:通過(guò)文件夾里的新增的文件作為數(shù)據(jù)源的采集;
Kafka Source:從kafka服務(wù)中采集數(shù)據(jù)。
NetCat Source: 綁定的端口(tcp、udp),將流經(jīng)端口的每一個(gè)文本行數(shù)據(jù)作為Event輸入
HTTP Source:監(jiān)聽(tīng)HTTP POST和 GET產(chǎn)生的數(shù)據(jù)的采集
Channel
一個(gè)數(shù)據(jù)的存儲(chǔ)池,中間通道。
主要作用
接受source傳出的數(shù)據(jù),向sink指定的目的地傳輸。Channel中的數(shù)據(jù)直到進(jìn)入到下一個(gè)channel中或者進(jìn)入終端才會(huì)被刪除。當(dāng)sink寫(xiě)入失敗后,可以自動(dòng)重寫(xiě),不會(huì)造成數(shù)據(jù)丟失,因此很可靠。
channel的類(lèi)型很多比如:內(nèi)存中、jdbc數(shù)據(jù)源中、文件形式存儲(chǔ)等。
常見(jiàn)采集的數(shù)據(jù)類(lèi)型:
Memory Channel
File Channel
Spillable Memory Channel等
詳細(xì)查看:
http://flume.apache.org/FlumeUserGuide.html#flume-channels
Channel具體作用:
Memory Channel:使用內(nèi)存作為數(shù)據(jù)的存儲(chǔ)。速度快
File Channel:使用文件來(lái)作為數(shù)據(jù)的存儲(chǔ)。安全可靠
Spillable Memory Channel:使用內(nèi)存和文件作為數(shù)據(jù)的存儲(chǔ),即:先存在內(nèi)存中,如果內(nèi)存中數(shù)據(jù)達(dá)到閥值則flush到文件中。
JDBC Channel:使用jdbc數(shù)據(jù)源來(lái)作為數(shù)據(jù)的存儲(chǔ)。
Kafka Channel:使用kafka服務(wù)來(lái)作為數(shù)據(jù)的存儲(chǔ)。
Sink:數(shù)據(jù)的最終的目的地。
主要作用:接受channel寫(xiě)入的數(shù)據(jù)以指定的形式表現(xiàn)出來(lái)(或存儲(chǔ)或展示)。
sink的表現(xiàn)形式很多比如:打印到控制臺(tái)、hdfs上、avro服務(wù)中、文件中等。
常見(jiàn)采集的數(shù)據(jù)類(lèi)型:
HDFS Sink
Hive Sink
Logger Sink
Avro Sink
Thrift Sink
File Roll Sink
HBaseSink
Kafka Sink等
詳細(xì)查看:
http://flume.apache.org/FlumeUserGuide.html#flume-sinks
HDFSSink需要有hdfs的配置文件和類(lèi)庫(kù)。一般采取多個(gè)sink匯聚到一臺(tái)采集機(jī)器負(fù)責(zé)推送到hdfs。
Sink具體作用:
Logger Sink:將數(shù)據(jù)作為日志處理(根據(jù)flume中的設(shè)置的日志的級(jí)別顯示)。
HDFS Sink:將數(shù)據(jù)傳輸?shù)絟dfs集群中。
Avro Sink:數(shù)據(jù)被轉(zhuǎn)換成Avro Event,然后發(fā)送到指定的服務(wù)端口上。
Thrift Sink:數(shù)據(jù)被轉(zhuǎn)換成Thrift Event,然后發(fā)送到指定的的服務(wù)端口上。
File Roll Sink:數(shù)據(jù)傳輸?shù)奖镜匚募小?Hive Sink:將數(shù)據(jù)傳輸?shù)絟ive的表中。
IRC Sink:數(shù)據(jù)向指定的IRC服務(wù)和端口中發(fā)送。
Null Sink:取消數(shù)據(jù)的傳輸,即不發(fā)送到任何目的地。
HBaseSink:將數(shù)據(jù)發(fā)往hbase數(shù)據(jù)庫(kù)中。
MorphlineSolrSink:數(shù)據(jù)發(fā)送到Solr搜索服務(wù)器(集群)。
ElasticSearchSink:數(shù)據(jù)發(fā)送到Elastic Search搜索服務(wù)器(集群)。
Kafka Sink:將數(shù)據(jù)發(fā)送到kafka服務(wù)中。(注意依賴(lài)類(lèi)庫(kù))
event是Flume NG傳輸?shù)臄?shù)據(jù)的基本單位,也是事務(wù)的基本單位。
在文本文件,通常是一行記錄就是一個(gè)event。
網(wǎng)絡(luò)消息傳輸系統(tǒng)中,一條消息就是一個(gè)event。
event里有header、body
Event里面的header類(lèi)型:Map<String, String>
我們可以在source中自定義header的key:value,在某些channel和sink中使用header。
練習(xí)1:
一個(gè)需求:怎么實(shí)時(shí)監(jiān)聽(tīng)一個(gè)文件的數(shù)據(jù)增加呢?打印到控制臺(tái)上。
如果這個(gè)文件增加的量特別大呢?
avro客戶(hù)端:
往指定接收方相應(yīng)的主機(jī)名:端口 發(fā)送本機(jī)要監(jiān)聽(tīng)發(fā)送的源文件或者文件夾。
bin/flume-ng avro-client --conf conf/ -H master -p 41414 -F /opt/logs/access.log
需要提供 avro-source
注意:--headerFile選項(xiàng):追加header信息,文件以空格隔開(kāi)。
bin/flume-ng avro-client --conf conf/ --host slave01 --port 41414 --filename /opt/logs/access.log --headerFile /opt/logs/kv.log
如果指定了--dirname。則傳輸后此文件夾里的文件會(huì)加上fileSuffix后綴。
練習(xí)02:
監(jiān)控文件的新增內(nèi)容向另一臺(tái)機(jī)器的source發(fā)送數(shù)據(jù)。怎么處理?
系統(tǒng)要求:
1、JRE:JDK1.6+(推薦使用1.7)
2、內(nèi)存:沒(méi)有上限和下限,能夠配置滿(mǎn)足source、channel以及sink即可
3、磁盤(pán)空間:同2
4、目錄權(quán)限:一般的agent操作的目錄必須要有讀寫(xiě)權(quán)限
這里采用的Flume版本為1.8.0,也是目前最新的版本,下載地址為:
http://archive.apache.org/dist/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
安裝步驟:
解壓縮:[uplooking@uplooking01 ~]$ tar -zxvf soft/apache-flume-1.8.0-bin.tar.gz -C app/
重命名:[uplooking@uplooking01 ~]$ mv app/apache-flume-1.8.0-bin/ app/flume
添加到環(huán)境變量中
vim ~/.bash_profile
export FLUME_HOME=/home/uplooking/app/flume
export PATH=$PATH:$FLUME_HOME/bin
修改配置文件
conf]# cp flume-env.sh.template flume-env.sh
添加JAVA_HOME
export JAVA_HOME=/opt/jdk
定義flume agent配置文件:
#####################################################################
## this's flume log purpose is listenning a socket port which product
## data of stream
## this agent is consists of source which is r1 , sinks which is k1,
## channel which is c1
##
## 這里面的a1 是flume一個(gè)實(shí)例agent的名字
#####################################################################
#定義了當(dāng)前agent的名字叫做a1
a1.sources = r1 ##定了該agent中的sources組件叫做r1
a1.sinks = k1 ##定了該agent中的sinks組件叫做k1
a1.channels = c1 ##定了該agent中的channels組件叫做c1
# 監(jiān)聽(tīng)數(shù)據(jù)源的方式,這里采用監(jiān)聽(tīng)網(wǎng)絡(luò)端口
a1.sources.r1.type = netcat #source的類(lèi)型為網(wǎng)絡(luò)字節(jié)流
a1.sources.r1.bind = uplooking01 #source監(jiān)聽(tīng)的網(wǎng)絡(luò)的hostname
a1.sources.r1.port = 52019 #source監(jiān)聽(tīng)的網(wǎng)絡(luò)的port
# 采集的數(shù)據(jù)的下沉(落地)方式 通過(guò)日志
a1.sinks.k1.type = logger #sink的類(lèi)型為logger日志方式,log4j的級(jí)別有INFO、Console、file。。。
# 描述channel的部分,使用內(nèi)存做數(shù)據(jù)的臨時(shí)存儲(chǔ)
a1.channels.c1.type = memory #channel的類(lèi)型使用內(nèi)存進(jìn)行數(shù)據(jù)緩存,這是最常見(jiàn)的一種channel
a1.channels.c1.capacity = 1000 #定義了channel對(duì)的容量
a1.channels.c1.transactionCapacity = 100 #定義channel的最大的事務(wù)容量
# 使用channel將source和sink連接起來(lái)
# 需要將source和sink使用channel連接起來(lái),組成一個(gè)類(lèi)似流水管道
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動(dòng)flume agent:
flume-ng agent -c conf -n a1 -f conf/flume-nc.conf -Dflume.root.logger=INFO,console
-c conf:使用配置文件的方式
-n a1:指定agent的名稱(chēng)為a1
-f:指定配置文件
因?yàn)閿?shù)據(jù)落地是通過(guò)日志,所以后面需要指定日志的相關(guān)配置選項(xiàng)。
通過(guò)telnet或者nc向端口發(fā)送數(shù)據(jù)
安裝telnet或nc:
yum isntall -y telent
yum install -y nc
向端口發(fā)送數(shù)據(jù):
# 使用telnet
[uplooking@uplooking01 ~]$ telnet uplooking01 52019
Trying 192.168.43.101...
Connected to uplooking01.
Escape character is '^]'.
wo ai ni
OK
sai bei de xue
OK
# 使用nc
[uplooking@uplooking01 ~]$ nc uplooking01 52019
heihei
OK
xpleaf
OK
此時(shí)可以查看flume agent啟動(dòng)終端的輸出:
2018-03-24 20:09:34,390 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.43.101:52019]
2018-03-24 20:10:13,022 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 6F 20 61 69 20 6E 69 0D wo ai ni. }
2018-03-24 20:10:24,139 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 73 61 69 20 62 65 69 20 64 65 20 78 75 65 0D sai bei de xue. }
2018-03-24 20:13:26,190 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 69 68 65 69 heihei }
2018-03-24 20:13:26,463 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 78 70 6C 65 61 66 xpleaf }
2018-03-24 20:17:01,694 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F hello }
配置文件如下:
#####################################################################
## 監(jiān)聽(tīng)目錄中的新增文件
## this agent is consists of source which is r1 , sinks which is k1,
## channel which is c1
##
## 這里面的a1 是flume一個(gè)實(shí)例agent的名字
#####################################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 監(jiān)聽(tīng)數(shù)據(jù)源的方式,這里采用監(jiān)聽(tīng)目錄中的新增文件
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/uplooking/data/flume
a1.sources.r1.fileSuffix = .ok
# a1.sources.r1.deletePolicy = immediate
a1.sources.r1.deletePolicy = never
a1.sources.r1.fileHeader = true
# 采集的數(shù)據(jù)的下沉(落地)方式 通過(guò)日志
a1.sinks.k1.type = logger
# 描述channel的部分,使用內(nèi)存做數(shù)據(jù)的臨時(shí)存儲(chǔ)
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 使用channel將source和sink連接起來(lái)
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動(dòng)flume agent:
flume-ng agent -c conf -n a1 -f conf/flume-dir.conf -Dflume.root.logger=INFO,console
在監(jiān)聽(tīng)目錄下新增文件,內(nèi)容如下:
hello you
hello he
hello me
可以看到flume agent終端輸出:
2018-03-24 21:23:59,182 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{file=/home/uplooking/data/flume/hello.txt} body: 68 65 6C 6C 6F 20 79 6F 75 hello you }
2018-03-24 21:23:59,182 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{file=/home/uplooking/data/flume/hello.txt} body: 68 65 6C 6C 6F 20 68 65 hello he }
2018-03-24 21:23:59,182 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{file=/home/uplooking/data/flume/hello.txt} body: 68 65 6C 6C 6F 20 6D 65 hello me }
2018-03-24 21:23:59,184 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:324)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2018-03-24 21:23:59,184 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:433)] Preparing to move file /home/uplooking/data/flume/hello.txt to /home/uplooking/data/flume/hello.txt.ok
可以看到提示說(shuō),原來(lái)的文本文件已經(jīng)被重命名為.ok,查看數(shù)據(jù)目錄中的文件:
[uplooking@uplooking01 flume]$ ls
hello.txt.ok
tail -f與tail -F的說(shuō)明:
在生產(chǎn)環(huán)境中,為了防止日志文件過(guò)大,通常會(huì)每天生成一個(gè)新的日志文件,
這是通過(guò)重命名原來(lái)的日志文件,然后touch一個(gè)原來(lái)的日志文件的方式來(lái)實(shí)現(xiàn)的。
http-xxx.log
http-xxx.log.2017-03-15
http-xxx.log.2017-03-16
-f不會(huì)監(jiān)聽(tīng)分割之后的文件,而-F則會(huì)繼續(xù)監(jiān)聽(tīng)。
配置文件:
#####################################################################
## 監(jiān)聽(tīng)文件中的新增數(shù)據(jù)
##
## this agent is consists of source which is r1 , sinks which is k1,
## channel which is c1
##
## 這里面的a1 是flume一個(gè)實(shí)例agent的名字
#####################################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 監(jiān)聽(tīng)數(shù)據(jù)源的方式,這里監(jiān)聽(tīng)文件中的新增數(shù)據(jù)
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/uplooking/data/flume/http-flume.log
# 采集的數(shù)據(jù)的下沉(落地)方式 通過(guò)日志
a1.sinks.k1.type = logger
# 描述channel的部分,使用內(nèi)存做數(shù)據(jù)的臨時(shí)存儲(chǔ)
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000000
a1.channels.c1.transactionCapacity = 1000000
# 使用channel將source和sink連接起來(lái)
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動(dòng)flume agent:
flume-ng agent -c conf -n a1 -f conf/flume-data.conf -Dflume.root.logger=INFO,console
向監(jiān)聽(tīng)文件中添加數(shù)據(jù):
cat hello.txt.ok > http-flume.log
查看flume agent終端的輸出:
2018-03-25 01:28:39,359 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 79 6F 75 hello you }
2018-03-25 01:28:40,465 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 68 65 hello he }
2018-03-25 01:28:40,465 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 6D 65 hello me }
使用jps -v命令可以查看啟動(dòng)flume時(shí),分配的內(nèi)存大小:
20837 Application -Xmx20m -Dflume.root.logger=INFO,console -Djava.library.path=:/home/uplooking/app/hadoop/lib/native:/home/uplooking/app/hadoop/lib/native
可以看到分配的最大內(nèi)存為20M,因?yàn)槲覀兪褂玫氖菍hannel中的數(shù)據(jù)保存到內(nèi)存中,所以一旦監(jiān)聽(tīng)的文本數(shù)據(jù)過(guò)大,就會(huì)造成內(nèi)存溢出,先使用下面的腳本生成一個(gè)比較大的文本數(shù)據(jù):
for i in `seq 1 10000000`
do
echo "${i}.I like bigdata, I would like to do something with bigdata." >> /home/uplooking/data/mr/bigData.log
done
然后向監(jiān)聽(tīng)的日志中打數(shù)據(jù):
cat bigData.log > ../flume/http-flume.log
這時(shí)可以在flume agent終端中看到異常:
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOfRange(Arrays.java:3664)
at java.lang.String.<init>(String.java:207)
at java.lang.StringBuilder.toString(StringBuilder.java:407)
at sun.net.www.protocol.jar.Handler.parseContextSpec(Handler.java:207)
at sun.net.www.protocol.jar.Handler.parseURL(Handler.java:153)
at java.net.URL.<init>(URL.java:622)
at java.net.URL.<init>(URL.java:490)
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "SinkRunner-PollingRunner-DefaultSinkProcessor"
解決方案:
通過(guò)調(diào)整
# 描述channel的部分,使用內(nèi)存做數(shù)據(jù)的臨時(shí)存儲(chǔ)
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000000
a1.channels.c1.transactionCapacity = 1000000
執(zhí)行案例監(jiān)聽(tīng)日志文件中的新增記錄,操作一下異常
java.lang.OutOfMemoryError: GC overhead limit exceeded,簡(jiǎn)稱(chēng)OOM/OOME
兩種方案解決:
第一種方案:給該flume程序加大內(nèi)存存儲(chǔ)容量
默認(rèn)值為-Xmx20m(最大堆內(nèi)存大小),--->-Xmx 2000m
-Xms10m(初始堆內(nèi)存大小)
flume-ng agent -Xms1000m -Xmx1000m -c conf -n a1 -f conf/flume-data.conf -Dflume.root.logger=INFO,console
第二種方案:第一種搞不定的時(shí)候,比如機(jī)器可用內(nèi)存不夠的話(huà)的,使用其它c(diǎn)hannel解決
比如磁盤(pán)文件,比如jdbc
如果文本數(shù)據(jù)不是特別大,那么用第一種方案也是可以解決的,但是一旦文本數(shù)據(jù)過(guò)大,第一種方案需要分配很大的內(nèi)存空間,所以下面演示使用第二種方案。
配置文件如下:
#####################################################################
## 監(jiān)聽(tīng)文件中的新增數(shù)據(jù)
## 使用文件做為channel
##
## this agent is consists of source which is r1 , sinks which is k1,
## channel which is c1
##
## 這里面的a1 是flume一個(gè)實(shí)例agent的名字
#####################################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 監(jiān)聽(tīng)數(shù)據(jù)源的方式,這里監(jiān)聽(tīng)文件中的新增數(shù)據(jù)
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/uplooking/data/flume/http-flume.log
# 采集的數(shù)據(jù)的下沉(落地)方式 通過(guò)日志
a1.sinks.k1.type = logger
# 描述channel的部分,使用內(nèi)存做數(shù)據(jù)的臨時(shí)存儲(chǔ)
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
a1.channels.c1.transactionCapacity = 1000000
a1.channels.c1.dataDirs = /home/uplooking/data/flume/data
# 使用channel將source和sink連接起來(lái)
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注意需要?jiǎng)?chuàng)建下面兩個(gè)目錄:
/home/uplooking/data/flume/checkpoint # 存放檢查點(diǎn)數(shù)據(jù)
/home/uplooking/data/flume/data # 存放channel的數(shù)據(jù)
這樣再向監(jiān)聽(tīng)文件中打數(shù)據(jù),會(huì)在終端中看到不停地刷數(shù)據(jù)。
可以將channel中的數(shù)據(jù)最終保存到hdfs中,配置文件如下:
#####################################################################
## 監(jiān)聽(tīng)文件中的新增數(shù)據(jù)
## 使用文件做為channel
## this agent is consists of source which is r1 , sinks which is k1,
## channel which is c1
##
## 這里面的a1 是flume一個(gè)實(shí)例agent的名字
#####################################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 監(jiān)聽(tīng)數(shù)據(jù)源的方式,這里采用監(jiān)聽(tīng)網(wǎng)絡(luò)端口
a1.sources.r1.type = netcat
a1.sources.r1.bind = uplooking01
a1.sources.r1.port = 52019
# 采集的數(shù)據(jù)的下沉(落地)方式 存儲(chǔ)到hdfs的某一路徑
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ns1/input/flume/%Y/%m/%d
# 文件生成后的前綴
a1.sinks.k1.hdfs.filePrefix = http
# 文件生成后的后綴,如http.1521927418991.log
a1.sinks.k1.hdfs.fileSuffix = .log
# 文件使用時(shí)的前綴
a1.sinks.k1.hdfs.inUsePrefix = xttzm.
# 文件使用時(shí)的后綴,如xttzm.http.1521927418992.log.zdhm
a1.sinks.k1.hdfs.inUseSuffix = .zdhm
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 默認(rèn)為SequenceFile,查看hdfs上的文件時(shí)為序列化的
a1.sinks.k1.hdfs.fileType = DataStream
# 上面的要配置,這個(gè)也要配置,寫(xiě)入的數(shù)據(jù)格式為文本內(nèi)容
a1.sinks.k1.hdfs.writeFormat = Text
# 下面這個(gè)配置選項(xiàng)不加,那么rollInterval rollSize rollCount是不會(huì)生效的
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 描述channel的部分,使用文件做數(shù)據(jù)的臨時(shí)存儲(chǔ)
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
a1.channels.c1.transactionCapacity = 1000000
a1.channels.c1.dataDirs = /home/uplooking/data/flume/data
# 使用channel將source和sink連接起來(lái)
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動(dòng)flume agent:
flume-ng agent -c conf -n a1 -f conf/flume-hdfs-sink.conf -Dflume.root.logger=INFO,console
通過(guò)nc發(fā)送數(shù)據(jù):
$ nc uplooking01 52019
1
OK
2
OK
3
OK
......
12
OK
13
OK
14
OK
15
OK
16
OK
這樣,在hdfs目錄下會(huì)生成三個(gè)正式文件,同時(shí)還應(yīng)該有一個(gè)臨時(shí)文件:
$ hdfs dfs -ls /input/flume/2018/03/25/
Found 4 items
-rw-r--r-- 3 uplooking supergroup 10 2018-03-25 06:00 /input/flume/2018/03/25/http.1521928799720.log
-rw-r--r-- 3 uplooking supergroup 11 2018-03-25 06:00 /input/flume/2018/03/25/http.1521928799721.log
-rw-r--r-- 3 uplooking supergroup 15 2018-03-25 06:00 /input/flume/2018/03/25/http.1521928799722.log
-rw-r--r-- 3 uplooking supergroup 3 2018-03-25 06:00 /input/flume/2018/03/25/xttzm.http.1521928799723.log.zdhm
網(wǎng)頁(yè)名稱(chēng):Flume筆記整理
網(wǎng)站地址:http://chinadenli.net/article44/gphhee.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站制作、定制開(kāi)發(fā)、面包屑導(dǎo)航、手機(jī)網(wǎng)站建設(shè)、域名注冊(cè)、云服務(wù)器
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)