欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

KafkaProducer攔截器

Kafka中的攔截器(Interceptor)是0.10.x.x版本引入的一個(gè)功能,一共有兩種:Kafka Producer端的攔截器和Kafka Consumer端的攔截器。本篇主要講述的是Kafka Producer端的攔截器,它主要用來(lái)對(duì)消息進(jìn)行攔截或者修改,也可以用于Producer的Callback回調(diào)之前進(jìn)行相應(yīng)的預(yù)處理。

網(wǎng)站的建設(shè)成都創(chuàng)新互聯(lián)專(zhuān)注網(wǎng)站定制,經(jīng)驗(yàn)豐富,不做模板,主營(yíng)網(wǎng)站定制開(kāi)發(fā).小程序定制開(kāi)發(fā),H5頁(yè)面制作!給你煥然一新的設(shè)計(jì)體驗(yàn)!已為混凝土攪拌罐等企業(yè)提供專(zhuān)業(yè)服務(wù)。

使用Kafka Producer端的攔截器非常簡(jiǎn)單,主要是實(shí)現(xiàn)ProducerInterceptor接口,此接口包含4個(gè)方法:

    1. ProducerRecord<K, V> onSend(ProducerRecord<K, V> record):Producer在將消息序列化和分配分區(qū)之前會(huì)調(diào)用攔截器的這個(gè)方法來(lái)對(duì)消息進(jìn)行相應(yīng)的操作。一般來(lái)說(shuō)最好不要修改消息ProducerRecord的topic、key以及partition等信息,如果要修改,也需確保對(duì)其有準(zhǔn)確的判斷,否則會(huì)與預(yù)想的效果出現(xiàn)偏差。比如修改key不僅會(huì)影響分區(qū)的計(jì)算,同樣也會(huì)影響B(tài)roker端日志壓縮(Log Compaction)的功能。
    1. void onAcknowledgement(RecordMetadata metadata, Exception exception):在消息被應(yīng)答(Acknowledgement)之前或者消息發(fā)送失敗時(shí)調(diào)用,優(yōu)先于用戶(hù)設(shè)定的Callback之前執(zhí)行。這個(gè)方法運(yùn)行在Producer的IO線(xiàn)程中,所以這個(gè)方法里實(shí)現(xiàn)的代碼邏輯越簡(jiǎn)單越好,否則會(huì)影響消息的發(fā)送速率。
    1. void close():關(guān)閉當(dāng)前的攔截器,此方法主要用于執(zhí)行一些資源的清理工作。
    1. configure(Map<String, ?> configs):用來(lái)初始化此類(lèi)的方法,這個(gè)是ProducerInterceptor接口的父接口Configurable中的方法。

一般情況下只需要關(guān)注并實(shí)現(xiàn)onSend或者onAcknowledgement方法即可。下面我們來(lái)舉個(gè)案例,通過(guò)onSend方法來(lái)過(guò)濾消息體為空的消息以及通過(guò)onAcknowledgement方法來(lái)計(jì)算發(fā)送消息的成功率。

public class ProducerInterceptorDemo implements ProducerInterceptor<String,String> {
    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        if(record.value().length()<=0)
            return null;
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            sendSuccess++;
        } else {
            sendFailure ++;
        }
    }

    @Override
    public void close() {
        double succe***atio = (double)sendSuccess / (sendFailure + sendSuccess);
        System.out.println("[INFO] 發(fā)送成功率="+String.format("%f", succe***atio * 100)+"%");
    }

    @Override
    public void configure(Map<String, ?> configs) {}
}

自定義的ProducerInterceptorDemo類(lèi)實(shí)現(xiàn)之后就可以在Kafka Producer的主程序中指定,示例代碼如下:

public class ProducerMain {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "hidden-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", brokerList);
        properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo");

        Producer<String, String> producer = new KafkaProducer<String, String>(properties);

        for(int i=0;i<100;i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, "msg-" + i);
            producer.send(producerRecord).get();
        }
        producer.close();
    }
}

Kafka Producer不僅可以指定一個(gè)攔截器,還可以指定多個(gè)攔截器以形成攔截鏈,這個(gè)攔截鏈會(huì)按照其中的攔截器的加入順序一一執(zhí)行。比如上面的程序多添加一個(gè)攔截器,示例如下:

properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo,com.hidden.producer.ProducerInterceptorDemoPlus");1

這樣Kafka Producer會(huì)先執(zhí)行攔截器ProducerInterceptorDemo,之后再執(zhí)行ProducerInterceptorDemoPlus。

有關(guān)interceptor.classes參數(shù),在kafka 1.0.0版本中的定義如下:

NAMEDESCRIPTIONTYPEDEFAULTVALID VALUESIMPORTANCE
interceptor.calssses A list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there no interceptors. list null low

本文的重點(diǎn)是你有沒(méi)有收獲與成長(zhǎng),其余的都不重要,希望讀者們能謹(jǐn)記這一點(diǎn)。同時(shí)我經(jīng)過(guò)多年的收藏目前也算收集到了一套完整的學(xué)習(xí)資料,包括但不限于:分布式架構(gòu)、高可擴(kuò)展、高性能、高并發(fā)、Jvm性能調(diào)優(yōu)、Spring,MyBatis,Nginx源碼分析,redis,ActiveMQ、、Mycat、Netty、Kafka、MySQL、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個(gè)知識(shí)點(diǎn)高級(jí)進(jìn)階干貨,希望對(duì)想成為架構(gòu)師的朋友有一定的參考和幫助

需要更詳細(xì)思維導(dǎo)圖和以下資料的可以加一下技術(shù)交流分享群:“708 701 457”免費(fèi)獲取

Kafka Producer 攔截器
Kafka Producer 攔截器
Kafka Producer 攔截器
Kafka Producer 攔截器

當(dāng)前題目:KafkaProducer攔截器
標(biāo)題鏈接:http://chinadenli.net/article0/ihjeoo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供靜態(tài)網(wǎng)站軟件開(kāi)發(fā)、網(wǎng)站策劃、虛擬主機(jī)網(wǎng)站收錄、電子商務(wù)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都網(wǎng)頁(yè)設(shè)計(jì)公司
字幕日本欧美一区二区| 色哟哟精品一区二区三区| 精品视频一区二区不卡| 欧美一级片日韩一级片| 亚洲精品熟女国产多毛| 九九热九九热九九热九九热| 日韩一级毛一欧美一级乱| 欧美成人一区二区三区在线 | 国产精品午夜福利免费在线| 日韩高清毛片免费观看| 欧美一区二区三区喷汁尤物| 99亚洲综合精品成人网色播| 一区二区不卡免费观看免费| 中文字幕在线五月婷婷| 亚洲欧美中文字幕精品| 99热在线精品视频观看| 欧洲日本亚洲一区二区| 亚洲中文字幕在线观看四区 | 中文字幕日韩欧美理伦片| 久热久热精品视频在线观看| 日韩成人高清免费在线| 美女露小粉嫩91精品久久久| 少妇淫真视频一区二区| 亚洲成人久久精品国产| 91精品蜜臀一区二区三区| 国产老女人性生活视频| 日本久久精品在线观看| 欧美日韩国产综合在线| 在线欧美精品二区三区| 暴力三级a特黄在线观看| 中文字幕免费观看亚洲视频| 久久国产亚洲精品成人| 国产欧美一区二区三区精品视 | 欧美亚洲91在线视频| 加勒比系列一区二区在线观看| 一区二区三区亚洲天堂| 日本成人三级在线播放| 中日韩免费一区二区三区| 福利在线午夜绝顶三级| 午夜福利视频六七十路熟女| 中文字幕日韩精品人一妻|