欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

Kafka0.10KafkaConsumer流程簡(jiǎn)述-創(chuàng)新互聯(lián)

ConsumerConfig.scala 儲(chǔ)存Consumer的配置

成都創(chuàng)新互聯(lián)公司主營(yíng)金東網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營(yíng)網(wǎng)站建設(shè)方案,成都App定制開(kāi)發(fā),金東h5小程序制作搭建,金東網(wǎng)站營(yíng)銷(xiāo)推廣歡迎金東等地區(qū)企業(yè)咨詢

按照我的理解,0.10的Kafka沒(méi)有專(zhuān)門(mén)的SimpleConsumer,仍然是沿用0.8版本的。

1.從poll開(kāi)始

消費(fèi)的規(guī)則如下:

  • 一個(gè)partition只能被同一個(gè)ConsumersGroup的一個(gè)線程所消費(fèi).

  • 線程數(shù)小于partition數(shù),某些線程會(huì)消費(fèi)多個(gè)partition.

  • 線程數(shù)等于partition數(shù),一個(gè)線程正好消費(fèi)一個(gè)線程.

  • 當(dāng)添加消費(fèi)者線程時(shí),會(huì)觸發(fā)rebalance,partition的分配發(fā)送變化.

  • 同一個(gè)partition的offset保證消費(fèi)有序,不同的partition消費(fèi)不保證順序.

Consumers編程的用法:

private final KafkaConsumer<Long, String> consumer; // 與Kafka進(jìn)行通信的consumer...
consumer = new KafkaConsumer<Long, String>(props);
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<Long, String> records = consumer.poll(512);
...

consumer,是一個(gè)純粹的單線程程序,后面所講的所有機(jī)制(包括coordinator,rebalance, heartbeat等),都是在這個(gè)單線程的poll函數(shù)里面完成的。也因此,在consumer的代碼內(nèi)部,沒(méi)有鎖的出現(xiàn)。

1.1包括的組件

從KafkaConsumer的構(gòu)造函數(shù)可以看出,KafkaConsumer有以下幾個(gè)核心部件:

  • Metadata: 存儲(chǔ)Topic/Partion與broker的映射關(guān)系

  • NetworkClient:網(wǎng)絡(luò)層 A network client for asynchronous request/response network i/o.

  • ConsumerNetworkClient: Higher level consumer access to the network layer //對(duì)NetworkClient的封裝,非線程安全

  • ConsumerCoordinator:只是client端的類(lèi),只是和服務(wù)端的GroupCoordinator通信的介質(zhì)。(broker端的Coordinator 負(fù)責(zé)reblance、Offset提交、心跳)

  • SubscriptionState: consumer的Topic、Partition的offset狀態(tài)維護(hù)

  • Fetcher: manage the fetching process with the brokers. //獲取消息

后面會(huì)分組件講解Consumers的工作流程

1.2 Consumer消費(fèi)者的工作過(guò)程:

  1. 在consumer啟動(dòng)時(shí)或者coordinator節(jié)點(diǎn)故障轉(zhuǎn)移時(shí),consumer發(fā)送ConsumerMetadataRequest給任意一個(gè)brokers。在ConsumerMetadataResponse中,它接收對(duì)應(yīng)的Consumer Group所屬的Coordinator的位置信息。

  2. Consumer連接Coordinator節(jié)點(diǎn),并發(fā)送HeartbeatRequest。如果返回的HeartbeatResponse中返回IllegalGeneration錯(cuò)誤碼,說(shuō)明協(xié)調(diào)節(jié)點(diǎn)已經(jīng)在初始化平衡。消費(fèi)者就會(huì)停止抓取數(shù)據(jù),提交offsets,發(fā)送JoinGroupRequest給協(xié)調(diào)節(jié)點(diǎn)。在JoinGroupResponse,它接收消費(fèi)者應(yīng)該擁有的topic-partitions列表以及當(dāng)前Consumer Group的新的generation編號(hào)。這個(gè)時(shí)候Consumer Group管理已經(jīng)完成,Consumer就可以開(kāi)始fetch數(shù)據(jù),并為它擁有的partitions提交offsets。

  3. 如果HeartbeatResponse沒(méi)有錯(cuò)誤返回,Consumer會(huì)從它上次擁有的partitions列表繼續(xù)抓取數(shù)據(jù),這個(gè)過(guò)程是不會(huì)被中斷的。


2 設(shè)計(jì)

2.0 MetaData

見(jiàn)Producer里面的分析。

補(bǔ)充一下,KafkaConsumer、KafkaProducer都是在構(gòu)造函數(shù)中獲取metadata信息,通過(guò)調(diào)用metadata.update方法來(lái)獲取信息。

2.1 coordinator 為什么,做什么

1.去zookeeper依賴 -- 為什么
  • 在0.9以前的client api中,consumer是要依賴Zookeeper的。因?yàn)橥粋€(gè)consumer group中的所有consumer需要進(jìn)行協(xié)同,新航道雅思培訓(xùn)這與后面要講的rebalance有關(guān)。(ConsumerConnector、KafkaStream、ConsumerIterator) -- package kafka.consumer

  • 0.9之后新的consumer不依賴與Zookeeper,一個(gè)consumerGroup內(nèi)的consumer由Coordinator管理.(KafkaConsumer) -- package org.apache.kafka.clients.consumer

為什么?后面講

提問(wèn):為什么在一個(gè)group內(nèi)部,1個(gè)parition只能被1個(gè)consumer擁有?

2.coordinator協(xié)議/partition分配問(wèn)題

給定一個(gè)topic,有4個(gè)partition: p0, p1, p2, p3, 一個(gè)group有3個(gè)consumer: c0, c1, c2。

  • 那么,如果按RangeAssignor策略,分配結(jié)果是:
    c0: p0, c1: p1, c2: p2, p3

  • 如果按RoundRobinAssignor策略:
    c0: p1, p3, c1: p1, c2: p2

  • partition.assignment.strategy=RangeAssignor,默認(rèn)值

(到底是哪種分配狀態(tài)呢)
那這整個(gè)分配過(guò)程是如何進(jìn)行的呢?見(jiàn)下圖所示:
Kafka 0.10 KafkaConsumer流程簡(jiǎn)述

3步分配過(guò)程

1. 步驟1:對(duì)于每1個(gè)consumer group,Kafka集群為其從broker集群中選擇一個(gè)broker作為其coordinator。因此,第1步就是找到這個(gè)coordinator。(1個(gè)consumer group對(duì)應(yīng)一個(gè)coordinattor)

GroupCoordinatorRequest: GCR,由ConsumerNetworkClient發(fā)送請(qǐng)求去尋找coordinator。

2. 步驟2:找到coordinator之后,發(fā)送JoinGroup請(qǐng)求
consumer在這里會(huì)被劃分leader、follower(無(wú)責(zé)任的說(shuō):選擇第一個(gè)consumer)

  • leader作用:perform the leader synchronization and send back the assignment for the group(負(fù)責(zé)發(fā)送partition分配的結(jié)果)

  • follower作用:send follower's sync group with an empty assignment

3. 步驟3:JoinGroup返回之后,發(fā)送SyncGroup,得到自己所分配到的partition
SyncGroupRequest

  • consumer leader發(fā)送 SyncGroupRequest給Coordinator,Coordinator回給它null

  • follower發(fā)送 null的 SyncGroupRequest 給Coordinator,Coordinator回給它partition分配的結(jié)果。

注意,在上面3步中,有一個(gè)關(guān)鍵點(diǎn):

  • partition的分配策略和分配結(jié)果其實(shí)是由client決定的,而不是由coordinator決定的。什么意思呢?在第2步,所有consumer都往coordinator發(fā)送JoinGroup消息之后,coordinator會(huì)指定其中一個(gè)consumer作為leader,其他consumer作為follower。

  • 然后由這個(gè)leader進(jìn)行partition分配。

  • 然后在第3步,leader通過(guò)SyncGroup消息,把分配結(jié)果發(fā)給coordinator,其他consumer也發(fā)送SyncGroup消息,獲得這個(gè)分配結(jié)果。

接下來(lái)就到Fetcher拉取數(shù)據(jù)了

2.2 Fetcher

四個(gè)步驟

  1. 步驟0:獲取consumer的offset

  2. 步驟1:生成FetchRequest,并放入發(fā)送隊(duì)列

  3. 步驟2:網(wǎng)絡(luò)poll

  4. 步驟3:獲取結(jié)果

1.獲取consumer的offset

當(dāng)consumer初次啟動(dòng)的時(shí)候,面臨的一個(gè)首要問(wèn)題就是:從offset為多少的位置開(kāi)始消費(fèi)。

poll之前,給集群發(fā)送請(qǐng)求,讓集群告知客戶端,當(dāng)前該TopicPartition的offset是多少。通過(guò)SubscriptionState來(lái)實(shí)現(xiàn), 通過(guò)ConsumerCoordinator

if (!subscriptions.hasAllFetchPositions())            updateFetchPositions(this.subscriptions.missingFetchPositions());

核心是:向Coordinator發(fā)了一個(gè)OffsetFetchRequest,并且是同步調(diào)用,直到獲取到初始的offset,再開(kāi)始接下來(lái)的poll.(也就是說(shuō)Offset的信息如果存在Kafka里,是存在GroupCoordinator里面)

consumer的每個(gè)TopicPartition都有了初始的offset,接下來(lái)就可以進(jìn)行不斷循環(huán)取消息了,這也就是Fetch的過(guò)程:

2.生成FetchRequest,并放入發(fā)送隊(duì)列 -- fetcher.initFetches(cluster)

核心就是生成FetchRequest: 假設(shè)一個(gè)consumer訂閱了3個(gè)topic: t0, t1, t2,為其分配的partition分別是: t0: p0; t1: p1, p2; t2: p2

即總共4個(gè)TopicPartition,即t0p0, t0p1, t1p1, t2p2。這4個(gè)TopicPartition可能分布在2臺(tái)機(jī)器n0, n1上面: n0: t0p0, t1p1 n1: t0p1, t2p2

則會(huì)分別針對(duì)每臺(tái)機(jī)器生成一個(gè)FetchRequest,即Map<Node, FetchRequest>。所以會(huì)有一個(gè)方法把所有屬于同一個(gè)Node的TopicPartition放在一起,生成一個(gè)FetchRequest。

3.網(wǎng)絡(luò)poll

調(diào)用ConsumerNetworkClient.poll發(fā)送網(wǎng)絡(luò)請(qǐng)求。向服務(wù)器發(fā) 送響應(yīng)請(qǐng)求和獲取服務(wù)器的響應(yīng)。(默認(rèn)值:executeDelayedTasks=true)

4.獲取結(jié)果 -- fetcher.fetchedRecords()

獲取Broker返回的Response,里面包含了List<ConsumerRecord> records

2.3 offset確認(rèn)機(jī)制

  • 是否自動(dòng)消費(fèi)確認(rèn):由參數(shù)auto.xxx.commit=true控制

  • 手動(dòng)消費(fèi):用于自定義Consumers的消費(fèi)控制

下面從自動(dòng)消費(fèi)確認(rèn)來(lái)分析,Offset自動(dòng)確認(rèn)是由ConsumerCoordinatorAutoCommitTask來(lái)實(shí)現(xiàn)的。

其調(diào)用在ConsumerNetworkClient的 DelayedTaskQueue delayedTasks里面,然后被周期性的調(diào)用。 周期性的發(fā)送確認(rèn)消息,類(lèi)似HeartBeat,其實(shí)現(xiàn)機(jī)制也就是前面所講的DelayedQueue + DelayedTask.

確認(rèn)一次:offset的提交

poll函數(shù)中的注釋?zhuān)?br/>// execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records

  • 可以這樣理解:第二次poll調(diào)用的時(shí)候,提交上一次poll的offset和心跳發(fā)送

  • 先提交offset,再去拉取record。那么這次Offset其實(shí)是上一次poll的Record的offset。

  • 因此,當(dāng)你把按照下面的邏輯寫(xiě)程序的時(shí)候,可能會(huì)導(dǎo)致Consumer與Coordinator的心跳超時(shí)。

    while(true) {
    consumer.poll();do process message // 假如這個(gè)耗時(shí)過(guò)長(zhǎng),那么這個(gè)consumer就無(wú)法發(fā)送心跳給coordinator,導(dǎo)致它錯(cuò)誤認(rèn)為這個(gè)consumer失去聯(lián)系了,引起不必要的rebalance。槽糕的情況下,會(huì)丟重復(fù)消費(fèi)數(shù)據(jù)。}

    因此,有必要把offset的提交單獨(dú)拿出來(lái)做一個(gè)線程。

到這里,就把整個(gè)Consumer的流程走完了。

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專(zhuān)為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。

本文名稱(chēng):Kafka0.10KafkaConsumer流程簡(jiǎn)述-創(chuàng)新互聯(lián)
標(biāo)題鏈接:http://chinadenli.net/article48/cdodep.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供靜態(tài)網(wǎng)站移動(dòng)網(wǎng)站建設(shè)手機(jī)網(wǎng)站建設(shè)網(wǎng)站設(shè)計(jì)公司外貿(mào)建站企業(yè)網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站網(wǎng)頁(yè)設(shè)計(jì)