欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

Kafka冪等性原理及實現(xiàn)剖析

1.概述

最近和一些同學交流的時候反饋說,在面試Kafka時,被問到Kafka組件組成部分、API使用、Consumer和Producer原理及作用等問題都能詳細作答。但是,問到一個平時不注意的問題,就是Kafka的冪等性,被卡主了。那么,今天筆者就為大家來剖析一下Kafka的冪等性原理及實現(xiàn)。

創(chuàng)新互聯(lián)"三網(wǎng)合一"的企業(yè)建站思路。企業(yè)可建設擁有電腦版、微信版、手機版的企業(yè)網(wǎng)站。實現(xiàn)跨屏營銷,產(chǎn)品發(fā)布一步更新,電腦網(wǎng)絡+移動網(wǎng)絡一網(wǎng)打盡,滿足企業(yè)的營銷需求!創(chuàng)新互聯(lián)具備承接各種類型的成都網(wǎng)站設計、成都網(wǎng)站建設項目的能力。經(jīng)過10年的努力的開拓,為不同行業(yè)的企事業(yè)單位提供了優(yōu)質(zhì)的服務,并獲得了客戶的一致好評。

2.內(nèi)容

2.1 Kafka為啥需要冪等性?

Producer在生產(chǎn)發(fā)送消息時,難免會重復發(fā)送消息。Producer進行retry時會產(chǎn)生重試機制,發(fā)生消息重復發(fā)送。而引入冪等性后,重復發(fā)送只會生成一條有效的消息。Kafka作為分布式消息系統(tǒng),它的使用場景常見與分布式系統(tǒng)中,比如消息推送系統(tǒng)、業(yè)務平臺系統(tǒng)(如物流平臺、銀行結(jié)算平臺等)。以銀行結(jié)算平臺來說,業(yè)務方作為上游把數(shù)據(jù)上報到銀行結(jié)算平臺,如果一份數(shù)據(jù)被計算、處理多次,那么產(chǎn)生的影響會很嚴重。

2.2 影響Kafka冪等性的因素有哪些?

在使用Kafka時,需要確保Exactly-Once語義。分布式系統(tǒng)中,一些不可控因素有很多,比如網(wǎng)絡、OOM、FullGC等。在Kafka Broker確認Ack時,出現(xiàn)網(wǎng)絡異常、FullGC、OOM等問題時導致Ack超時,Producer會進行重復發(fā)送??赡艹霈F(xiàn)的情況如下:

Kafka冪等性原理及實現(xiàn)剖析

2.3 Kafka的冪等性是如何實現(xiàn)的?

Kafka為了實現(xiàn)冪等性,它在底層設計架構(gòu)中引入了ProducerID和SequenceNumber。那這兩個概念的用途是什么呢?

  • ProducerID:在每個新的Producer初始化時,會被分配一個唯一的ProducerID,這個ProducerID對客戶端使用者是不可見的。
  • SequenceNumber:對于每個ProducerID,Producer發(fā)送數(shù)據(jù)的每個Topic和Partition都對應一個從0開始單調(diào)遞增的SequenceNumber值。

2.3.1 冪等性引入之前的問題?

Kafka在引入冪等性之前,Producer向Broker發(fā)送消息,然后Broker將消息追加到消息流中后給Producer返回Ack信號值。實現(xiàn)流程如下:

Kafka冪等性原理及實現(xiàn)剖析

上圖的實現(xiàn)流程是一種理想狀態(tài)下的消息發(fā)送情況,但是實際情況中,會出現(xiàn)各種不確定的因素,比如在Producer在發(fā)送給Broker的時候出現(xiàn)網(wǎng)絡異常。比如以下這種異常情況的出現(xiàn):

Kafka冪等性原理及實現(xiàn)剖析

上圖這種情況,當Producer第一次發(fā)送消息給Broker時,Broker將消息(x2,y2)追加到了消息流中,但是在返回Ack信號給Producer時失敗了(比如網(wǎng)絡異常) 。此時,Producer端觸發(fā)重試機制,將消息(x2,y2)重新發(fā)送給Broker,Broker接收到消息后,再次將該消息追加到消息流中,然后成功返回Ack信號給Producer。這樣下來,消息流中就被重復追加了兩條相同的(x2,y2)的消息。

2.3.2 冪等性引入之后解決了什么問題?

面對這樣的問題,Kafka引入了冪等性。那么冪等性是如何解決這類重復發(fā)送消息的問題的呢?下面我們可以先來看看流程圖:

Kafka冪等性原理及實現(xiàn)剖析

?同樣,這是一種理想狀態(tài)下的發(fā)送流程。實際情況下,會有很多不確定的因素,比如Broker在發(fā)送Ack信號給Producer時出現(xiàn)網(wǎng)絡異常,導致發(fā)送失敗。異常情況如下圖所示:

Kafka冪等性原理及實現(xiàn)剖析

?當Producer發(fā)送消息(x2,y2)給Broker時,Broker接收到消息并將其追加到消息流中。此時,Broker返回Ack信號給Producer時,發(fā)生異常導致Producer接收Ack信號失敗。對于Producer來說,會觸發(fā)重試機制,將消息(x2,y2)再次發(fā)送,但是,由于引入了冪等性,在每條消息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber發(fā)送給Broker,而之前Broker緩存過之前發(fā)送的相同的消息,那么在消息流中的消息就只有一條(x2,y2),不會出現(xiàn)重復發(fā)送的情況。

2.3.3 ProducerID是如何生成的?

客戶端在生成Producer時,會實例化如下代碼:

// 實例化一個Producer對象
Producer<String, String> producer = new KafkaProducer<>(props);

在org.apache.kafka.clients.producer.internals.Sender類中,在run()中有一個maybeWaitForPid()方法,用來生成一個ProducerID,實現(xiàn)代碼如下:

private void maybeWaitForPid() {
        if (transactionState == null)
            return;

        while (!transactionState.hasPid()) {
            try {
                Node node = awaitLeastLoadedNodeReady(requestTimeout);
                if (node != null) {
                    ClientResponse response = sendAndAwaitInitPidRequest(node);
                    if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
                        InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
                        transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
                    } else {
                        log.error("Received an unexpected response type for an InitPidRequest from {}. " +
                                "We will back off and try again.", node);
                    }
                } else {
                    log.debug("Could not find an available broker to send InitPidRequest to. " +
                            "We will back off and try again.");
                }
            } catch (Exception e) {
                log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
            }
            log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
            time.sleep(retryBackoffMs);
            metadata.requestUpdate();
        }
    }

3.事務

與冪等性有關(guān)的另外一個特性就是事務。Kafka中的事務與數(shù)據(jù)庫的事務類似,Kafka中的事務屬性是指一系列的Producer生產(chǎn)消息和消費消息提交Offsets的操作在一個事務中,即原子性操作。對應的結(jié)果是同時成功或者同時失敗。

這里需要與數(shù)據(jù)庫中事務進行區(qū)別,操作數(shù)據(jù)庫中的事務指一系列的增刪查改,對Kafka來說,操作事務是指一系列的生產(chǎn)和消費等原子性操作。

3.1 Kafka引入事務的用途?

在事務屬性引入之前,先引入Producer的冪等性,它的作用為:

  • Producer多次發(fā)送消息可以封裝成一個原子性操作,即同時成功,或者同時失敗;
  • 消費者&生產(chǎn)者模式下,因為Consumer在Commit Offsets出現(xiàn)問題時,導致重復消費消息時,Producer重復生產(chǎn)消息。需要將這個模式下Consumer的Commit Offsets操作和Producer一系列生產(chǎn)消息的操作封裝成一個原子性操作。

產(chǎn)生的場景有:

比如,在Consumer中Commit Offsets時,當Consumer在消費完成時Commit的Offsets為100(假設最近一次Commit的Offsets為50),那么執(zhí)行觸發(fā)Balance時,其他Consumer就會重復消費消息(消費的Offsets介于50~100之間的消息)。

3.2 事務提供了哪些可使用的API?

Producer提供了五種事務方法,它們分別是:initTransactions()、beginTransaction()、sendOffsetsToTransaction()、commitTransaction()、abortTransaction(),代碼定義在org.apache.kafka.clients.producer.Producer<K,V>接口中,具體定義接口如下:

// 初始化事務,需要注意確保transation.id屬性被分配
void initTransactions();

// 開啟事務
void beginTransaction() throws ProducerFencedException;

// 為Consumer提供的在事務內(nèi)Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;

// 提交事務
void commitTransaction() throws ProducerFencedException;

// 放棄事務,類似于回滾事務的操作
void abortTransaction() throws ProducerFencedException;

3.3 事務的實際應用場景有哪些?

在Kafka事務中,一個原子性操作,根據(jù)操作類型可以分為3種情況。情況如下:

  • 只有Producer生產(chǎn)消息,這種場景需要事務的介入;
  • 消費消息和生產(chǎn)消息并存,比如Consumer&Producer模式,這種場景是一般Kafka項目中比較常見的模式,需要事務介入;
  • 只有Consumer消費消息,這種操作在實際項目中意義不大,和手動Commit Offsets的結(jié)果一樣,而且這種場景不是事務的引入目的。

4.總結(jié)

Kafka的冪等性和事務是比較重要的特性,特別是在數(shù)據(jù)丟失和數(shù)據(jù)重復的問題上非常重要。Kafka引入冪等性,設計的原理也比較好理解。而事務與數(shù)據(jù)庫的事務特性類似,有數(shù)據(jù)庫使用的經(jīng)驗對理解Kafka的事務也比較容易接受。

本文名稱:Kafka冪等性原理及實現(xiàn)剖析
URL標題:http://chinadenli.net/article10/joiego.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設計公司、全網(wǎng)營銷推廣建站公司、App開發(fā)網(wǎng)站內(nèi)鏈、手機網(wǎng)站建設

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)頁設計公司
美国女大兵激情豪放视频播放| 国产美女精品人人做人人爽| 国产亚洲视频香蕉一区| 亚洲一区二区精品福利| 国产在线一区中文字幕| 大伊香蕉一区二区三区| 麻豆果冻传媒一二三区| 日韩欧美黄色一级视频| 国产精品欧美在线观看| 午夜资源在线观看免费高清| 日本高清视频在线观看不卡 | 日木乱偷人妻中文字幕在线| 色老汉在线视频免费亚欧| 2019年国产最新视频| 亚洲一区二区三区av高清| 日韩18一区二区三区| 日韩精品综合免费视频| 亚洲精品高清国产一线久久| 亚洲欧美黑人一区二区| 国产大屁股喷水在线观看视频 | 激情五月综五月综合网| 国产精品丝袜美腿一区二区| 国产又粗又猛又黄又爽视频免费| 日韩18一区二区三区| 夫妻性生活动态图视频| 午夜精品久久久免费视频| 欧美精品二区中文乱码字幕高清| 欧美字幕一区二区三区| 亚洲中文字幕免费人妻| 亚洲精品中文字幕在线视频| 东京不热免费观看日本| 千仞雪下面好爽好紧好湿全文| 日本丰满大奶熟女一区二区| 91久久精品中文内射| 欧美亚洲美女资源国产| 日本少妇中文字幕不卡视频| 夜色福利久久精品福利| 91久久精品国产一区蜜臀| 不卡一区二区高清视频| 亚洲午夜精品视频观看| 99在线视频精品免费播放|