欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

Flume自定義Source

模擬編寫(xiě)了一個(gè)Flume 1.7中TAILDIR的功能實(shí)現(xiàn),通過(guò)手動(dòng)控制文件的讀取位置來(lái)達(dá)到對(duì)文件的讀寫(xiě),防止flume掛了之后重復(fù)消費(fèi)的情況。
以下是代碼實(shí)現(xiàn),僅做參考,生產(chǎn)上直接用TAILDIR讀取文件內(nèi)容即可,若要讀取一個(gè)目錄下的子目錄,可使用github上以實(shí)現(xiàn)的這個(gè)項(xiàng)目包:https://github.com/qwurey/flume-source-taildir-recursive

創(chuàng)新互聯(lián)長(zhǎng)期為1000+客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開(kāi)放共贏平臺(tái),與合作伙伴共同營(yíng)造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為滿城企業(yè)提供專業(yè)的成都做網(wǎng)站、網(wǎng)站建設(shè),滿城網(wǎng)站改版等技術(shù)服務(wù)。擁有10多年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開(kāi)發(fā)。

package com.fwmagic.flume.source;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @Description:自定義Source 1、讀取指定目錄下的文件,如nginx的access.log
 * 2、讀取文件前先判斷offset文件是否存在,不存在則創(chuàng)建它
 * 3、每次讀取完都寫(xiě)一個(gè)offset文件記錄讀取到文件的什么位置,防止重啟flume時(shí)發(fā)生重復(fù)消費(fèi)的情況
 * 4、如何自定義?參考ExecSource
 * <p>
 * (1):獲取自定義配置文件屬性
 * (2):創(chuàng)建線程池,用channelProcessor發(fā)送數(shù)據(jù)給channel
 * (3):線程池提交(啟動(dòng)任務(wù))
 * 任務(wù)內(nèi)容:
 * (1):讀取偏移量文件,沒(méi)有則創(chuàng)建,有則獲取偏移量,將讀取的指針重置到指定偏移量
 * (2):讀取指定的日志文件,將讀取的一行內(nèi)容打包成Event,用Channel發(fā)送Event
 * (3):獲取讀取內(nèi)容后的偏移量,重置偏移量
 * (4):stop方法調(diào)用,關(guān)閉線程池,調(diào)用super.stop方法。
 * @Date:Create in 2018/8/19
 */
public class TailFileSource extends AbstractSource implements EventDrivenSource, Configurable {

    /*監(jiān)聽(tīng)的文件*/
    private String filePath;

    /*記錄讀取偏移量的文件*/
    private String posiFile;

    /*若讀取文件暫無(wú)內(nèi)容,則等待數(shù)秒*/
    private Long interval;

    /*讀寫(xiě)文件的字符集*/
    private String charset;

    /*讀取文件內(nèi)容的線程*/
    private FileRunner fileRunner;

    /*線程池*/
    private ExecutorService executor;

    private static final Logger logger = LoggerFactory.getLogger(TailFileSource.class);

    /**
     * 初始化配置文件內(nèi)容
     *
     * @param context
     */
    @Override
    public void configure(Context context) {
        filePath = context.getString("filePath");
        posiFile = context.getString("posiFile");
        interval = context.getLong("interval", 2000L);
        charset = context.getString("charset", "UTF-8");
    }

    @Override
    public synchronized void start() {
        //啟動(dòng)一個(gè)線程,用于監(jiān)聽(tīng)對(duì)應(yīng)的日志文件
        //創(chuàng)建一個(gè)線程池
        executor = Executors.newSingleThreadExecutor();
        //用channelProcessor發(fā)送數(shù)據(jù)給channel
        ChannelProcessor channelProcessor = super.getChannelProcessor();
        fileRunner = new FileRunner(filePath, posiFile, interval, charset, channelProcessor);
        executor.submit(fileRunner);
        super.start();
    }

    @Override
    public synchronized void stop() {
        fileRunner.setFlag(Boolean.FALSE);
        while (!executor.isTerminated()) {
            logger.debug("waiting for exec executor service to stop");
            try {
                executor.awaitTermination(500, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
                logger.debug("Interrupted while waiting for executor service to stop,Just exiting.");
                Thread.currentThread().interrupt();
            }
        }
        super.stop();
    }

    public static class FileRunner implements Runnable {
        private Long interval;

        private String charset;

        private Long offset = 0L;

        private File pFile;

        private RandomAccessFile raf;

        private ChannelProcessor channelProcessor;

        private Boolean flag = Boolean.TRUE;

        public void setFlag(Boolean flag) {
            this.flag = flag;
        }

        public FileRunner(String filePath, String posiFile, Long interval, String charset, ChannelProcessor channelProcessor) {
            this.interval = interval;
            this.charset = charset;
            this.channelProcessor = channelProcessor;

            //1、判斷是否有偏移量文件,有則讀取偏移量,沒(méi)有則創(chuàng)建
            pFile = new File(posiFile);
            if (!pFile.exists()) {
                try {
                    pFile.createNewFile();
                } catch (IOException e) {
                    e.printStackTrace();
                    logger.error("create position file error!", e);
                }
            }
            //2、判斷偏移量中的文件內(nèi)容是否大于0
            try {
                String offsetStr = FileUtils.readFileToString(pFile, this.charset);
//          3、如果偏移量文件中有記錄,則將內(nèi)容轉(zhuǎn)換為L(zhǎng)ong
                if (StringUtils.isNotBlank(offsetStr)) {
                    offset = Long.parseLong(offsetStr);
                }
//           4、如果有偏移量,則直接跳到文件的偏移量位置
                raf = new RandomAccessFile(filePath, "r");
//              跳到指定的位置
                raf.seek(offset);
            } catch (IOException e) {
                e.printStackTrace();
                logger.error("read position file error!", e);
            }
        }

        @Override
        public void run() {
            //監(jiān)聽(tīng)文件
            while (flag) {
//            讀取文件中的內(nèi)容
                String line = null;
                try {
                    line = raf.readLine();
                    if (StringUtils.isNotBlank(line)) {
//                      把數(shù)據(jù)打包成Event,發(fā)送到Channel
                        line = new String(line.getBytes("ISO-8859-1"), "UTF-8");
                        Event event = EventBuilder.withBody(line.getBytes());
                        channelProcessor.processEvent(event);
                        //更新偏移量文件,把偏移量寫(xiě)入文件
                        offset = raf.getFilePointer();
                        FileUtils.writeStringToFile(pFile, offset.toString());
                    } else {
                        try {
                            Thread.sleep(interval);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            logger.error("thread sleep error", e);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

本文名稱:Flume自定義Source
瀏覽地址:http://chinadenli.net/article28/gicijp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供手機(jī)網(wǎng)站建設(shè)、網(wǎng)站導(dǎo)航、微信小程序、網(wǎng)站策劃、外貿(mào)網(wǎng)站建設(shè)、品牌網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站網(wǎng)頁(yè)設(shè)計(jì)