欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

Apache的Kafka介紹

今天小編就為大家?guī)硪黄嘘P(guān)Apache的Kafka的文章。小編覺得挺實(shí)用的,為此分享給大家做個(gè)參考。一起跟隨小編過來看看吧。

創(chuàng)新互聯(lián)建站專注于拜城網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠為您提供拜城營銷型網(wǎng)站建設(shè),拜城網(wǎng)站制作、拜城網(wǎng)頁設(shè)計(jì)、拜城網(wǎng)站官網(wǎng)定制、小程序設(shè)計(jì)服務(wù),打造拜城網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供拜城網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。

1. kafka概述

1.1 kafka簡介

Apache Kafka 是一個(gè)快速、可擴(kuò)展的、高吞吐的、可容錯的分布式“發(fā)布-訂閱”消息系統(tǒng), 使用 Scala 與 Java 語言編寫,能夠?qū)⑾囊粋€(gè)端點(diǎn)傳遞到另一個(gè)端點(diǎn),較之傳統(tǒng)的消息中 間件(例如 ActiveMQ、RabbitMQ),Kafka 具有高吞吐量、內(nèi)置分區(qū)、支持消息副本和高容 錯的特性,非常適合大規(guī)模消息處理應(yīng)用程序。

Kafka 官網(wǎng): http://kafka.apache.org/

Kafka主要設(shè)計(jì)目標(biāo)如下:

  • 以時(shí)間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數(shù)據(jù)也能保證常數(shù)時(shí)間的訪問性能。
  • 高吞吐率。即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒100K條消息的傳輸。
  • 支持Kafka Server間的消息分區(qū),及分布式消費(fèi),同時(shí)保證每個(gè)partition內(nèi)的消息順序傳輸。
  • 同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理。
  • 支持在線水平擴(kuò)展

Kafka通常用于兩大類應(yīng)用程序:

  • 建立實(shí)時(shí)流數(shù)據(jù)管道,以可靠地在系統(tǒng)或應(yīng)用程序之間獲取數(shù)據(jù)
  • 構(gòu)建實(shí)時(shí)流應(yīng)用程序,以轉(zhuǎn)換或響應(yīng)數(shù)據(jù)流

要了解Kafka如何執(zhí)行這些操作,讓我們從頭開始深入研究Kafka的功能。

首先幾個(gè)概念:

  • Kafka在一個(gè)或多個(gè)可以跨越多個(gè)數(shù)據(jù)中心的服務(wù)器上作為集群運(yùn)行。
  • Kafka集群將記錄流存儲在稱為主題的類別中。
  • 每個(gè)記錄由一個(gè)鍵,一個(gè)值和一個(gè)時(shí)間戳組成

1.2 kafka架構(gòu)體系

Apache的Kafka介紹

1.3 kafka的應(yīng)用場景

kafka的應(yīng)用場景非常多, 下面我們就來舉幾個(gè)我們最常見的場景

1.3.1 用戶的活動跟蹤

用戶在網(wǎng)站的不同活動消息發(fā)布到不同的主題中心,然后可以對這些消息進(jìn)行實(shí)時(shí)監(jiān)測、實(shí)時(shí)處理。當(dāng)然,也可以加載到Hadoop或離線處理數(shù)據(jù)倉庫,對用戶進(jìn)行畫像。像淘寶、天貓、京東這些大型電商平臺,用戶的所有活動都要進(jìn)行追蹤的。

1.3.2 日志收集

Apache的Kafka介紹

1.3.3 限流削峰

Apache的Kafka介紹

1.3.4 高吞吐率實(shí)現(xiàn)

Kafka與其他MQ相比,最大的特點(diǎn)就是高吞吐率。為了增加存儲能力,Kafka將所有的消息都寫入到了低速大容量的硬盤。按理說,這將導(dǎo)致性能損失,但實(shí)際上,Kafka仍然可以保持超高的吞吐率,并且其性能并未受到影響。其主要采用如下方式實(shí)現(xiàn)了高吞吐率。

  1. 順序讀寫:Kafka將消息寫入到了分區(qū)partition中,而分區(qū)中的消息又是順序讀寫的。順序讀寫要快于隨機(jī)讀寫。
  2. 零拷貝:生產(chǎn)者、消費(fèi)者對于Kafka中的消息是采用零拷貝實(shí)現(xiàn)的。
  3. 批量發(fā)送:Kafka允許批量發(fā)送模式。
  4. 消息壓縮:Kafka允許對消息集合進(jìn)行壓縮。

1.4 kafka的優(yōu)點(diǎn)

1. 解耦:

在項(xiàng)目啟動之初來預(yù)測將來項(xiàng)目會碰到什么需求,是極其困難的。消息系統(tǒng)在處理過程中間插入了一個(gè)隱含的、基于數(shù)據(jù)的接口層,兩邊的處理過程都要實(shí)現(xiàn)這一接口。這允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

2 冗余:(副本)

有些情況下,處理數(shù)據(jù)的過程會失敗。除非數(shù)據(jù)被持久化,否則將造成丟失。消息隊(duì)列把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)。許多消息隊(duì)列所采用的"插入-獲取-刪除"范式中,在把一個(gè)消息從隊(duì)列中刪除之前,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢。

3 擴(kuò)展性

因?yàn)橄㈥?duì)列解耦了你的處理過程,所以增大消息入隊(duì)和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調(diào)節(jié)參數(shù)。擴(kuò)展就像調(diào)大電力按鈕一樣簡單。

4 靈活性&峰值處理能力

在訪問量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見;如果為以能處理這類峰值訪問為標(biāo)準(zhǔn)來投入資源隨時(shí)待命無疑是巨大的浪費(fèi)。使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會因?yàn)橥话l(fā)的超負(fù)荷的請求而完全崩潰。

5. 可恢復(fù)性

系統(tǒng)的一部分組件失效時(shí),不會影響到整個(gè)系統(tǒng)。消息隊(duì)列降低了進(jìn)程間的耦合度,所以即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。

6. 順序保證

在大多使用場景下,數(shù)據(jù)處理的順序都很重要。大部分消息隊(duì)列本來就是排序的,并且能保證數(shù)據(jù)會按照特定的順序來處理。Kafka保證一個(gè)Partition內(nèi)的消息的有序性。

7. 緩沖

在任何重要的系統(tǒng)中,都會有需要不同的處理時(shí)間的元素。例如,加載一張圖片比應(yīng)用過濾器花費(fèi)更少的時(shí)間。消息隊(duì)列通過一個(gè)緩沖層來幫助任務(wù)最高效率的執(zhí)行———寫入隊(duì)列的處理會盡可能的快速。該緩沖有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度。

8. 異步通信

很多時(shí)候,用戶不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制,允許用戶把一個(gè)消息放入隊(duì)列,但并不立即處理它。想向隊(duì)列中放入多少消息就放多少,然后在需要的時(shí)候再去處理它們。

1.5 kafka于其他MQ對比

1. RabbitMQ

RabbitMQ是使用Erlang編寫的一個(gè)開源的消息隊(duì)列,本身支持很多的協(xié)議:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量級,更適合于企業(yè)級的開發(fā)。同時(shí)實(shí)現(xiàn)了Broker構(gòu)架,這意味著消息在發(fā)送給客戶端時(shí)先在中心隊(duì)列排隊(duì)。對路由,負(fù)載均衡或者數(shù)據(jù)持久化都有很好的支持。

2. redis

Redis是一個(gè)基于Key-Value對的NoSql數(shù)據(jù)庫,開發(fā)維護(hù)很活躍。雖然它是一個(gè)Key-Value數(shù)據(jù)庫存儲系統(tǒng),但它本身支持MQ功能,所以完全可以當(dāng)做一個(gè)輕量級的隊(duì)列服務(wù)來使用。對于RabbitMQ和Redis的入隊(duì)和出隊(duì)操作,各執(zhí)行100萬次,每10萬次記錄一次執(zhí)行時(shí)間。測試數(shù)據(jù)分為128Bytes、512Bytes、1K和10K四個(gè)不同大小的數(shù)據(jù)。實(shí)驗(yàn)表明:入隊(duì)時(shí),當(dāng)數(shù)據(jù)比較小時(shí)Redis的性能要高于RabbitMQ,而如果數(shù)據(jù)大小超過了10K,Redis則慢的無法忍受;出隊(duì)時(shí),無論數(shù)據(jù)大小,Redis都表現(xiàn)出非常好的性能,而RabbitMQ的出隊(duì)性能則遠(yuǎn)低于Redis。

3. ZeroMQ

ZeroMQ號稱最快的消息隊(duì)列系統(tǒng),尤其針對大吞吐量的需求場景。ZeroMQ能夠?qū)崿F(xiàn)RabbitMQ不擅長的高級/復(fù)雜的隊(duì)列,但是開發(fā)人員需要自己組合多種技術(shù)框架,技術(shù)上的復(fù)雜度是對這MQ能夠應(yīng)用成功的挑戰(zhàn)。ZeroMQ具有一個(gè)獨(dú)特的非中間件的模式,你不需要安裝和運(yùn)行一個(gè)消息服務(wù)器或中間件,因?yàn)槟愕膽?yīng)用程序?qū)缪葸@個(gè)服務(wù)器角色。你只需要簡單的引用ZeroMQ程序庫,可以使用NuGet安裝,然后你就可以愉快的在應(yīng)用程序之間發(fā)送消息了。但是ZeroMQ僅提供非持久性的隊(duì)列,也就是說如果宕機(jī),數(shù)據(jù)將會丟失。其中,Twitter的Storm 0.9.0以前的版本中默認(rèn)使用ZeroMQ作為數(shù)據(jù)流的傳輸(Storm從0.9版本開始同時(shí)支持ZeroMQ和Netty作為傳輸模塊)。

4. ActiveMQ

ActiveMQ是Apache下的一個(gè)子項(xiàng)目。 類似于ZeroMQ,它能夠以代理人和點(diǎn)對點(diǎn)的技術(shù)實(shí)現(xiàn)隊(duì)列。同時(shí)類似于RabbitMQ,它少量代碼就可以高效地實(shí)現(xiàn)高級應(yīng)用場景。

5. Kafka/Jafka

Kafka是Apache下的一個(gè)子項(xiàng)目,是一個(gè)高性能跨語言分布式發(fā)布/訂閱消息隊(duì)列系統(tǒng),而Jafka是在Kafka之上孵化而來的,即Kafka的一個(gè)升級版。具有以下特性:快速持久化,可以在O(1)的系統(tǒng)開銷下進(jìn)行消息持久化;高吞吐,在一臺普通的服務(wù)器上既可以達(dá)到10W/s的吞吐速率;完全的分布式系統(tǒng),Broker、Producer、Consumer都原生自動支持分布式,自動實(shí)現(xiàn)負(fù)載均衡;支持Hadoop數(shù)據(jù)并行加載,對于像Hadoop的一樣的日志數(shù)據(jù)和離線分析系統(tǒng),但又要求實(shí)時(shí)處理的限制,這是一個(gè)可行的解決方案。Kafka通過Hadoop的并行加載機(jī)制統(tǒng)一了在線和離線的消息處理。Apache Kafka相對于ActiveMQ是一個(gè)非常輕量級的消息系統(tǒng),除了性能非常好之外,還是一個(gè)工作良好的分布式系統(tǒng)。

1.6 kafka的幾種重要角色

1.6.1 kafka作為存儲系統(tǒng)

任何允許發(fā)布與使用無關(guān)的消息發(fā)布的消息隊(duì)列都有效地充當(dāng)了運(yùn)行中消息的存儲系統(tǒng)。Kafka的不同之處在于它是一個(gè)非常好的存儲系統(tǒng)。

寫入Kafka的數(shù)據(jù)將寫入磁盤并進(jìn)行復(fù)制以實(shí)現(xiàn)容錯功能。Kafka允許生產(chǎn)者等待確認(rèn),以便直到完全復(fù)制并確保即使寫入服務(wù)器失敗的情況下寫入也不會完成。

Kafka的磁盤結(jié)構(gòu)可以很好地?cái)U(kuò)展使用-無論服務(wù)器上有50 KB還是50 TB的持久數(shù)據(jù),Kafka都將執(zhí)行相同的操作。

由于認(rèn)真對待存儲并允許客戶端控制其讀取位置,因此您可以將Kafka視為一種專用于高性能,低延遲提交日志存儲,復(fù)制和傳播的專用分布式文件系統(tǒng)。

1.6.2 kafka作為消息傳遞系統(tǒng)

Kafka的流概念與傳統(tǒng)的企業(yè)消息傳遞系統(tǒng)相比如何?

傳統(tǒng)上,消息傳遞具有兩種模型:排隊(duì)和發(fā)布-訂閱。在隊(duì)列中,一組使用者可以從服務(wù)器中讀取內(nèi)容,并且每條記錄都將轉(zhuǎn)到其中一個(gè)。在發(fā)布-訂閱記錄中廣播給所有消費(fèi)者。這兩個(gè)模型中的每一個(gè)都有優(yōu)點(diǎn)和缺點(diǎn)。排隊(duì)的優(yōu)勢在于,它允許您將數(shù)據(jù)處理劃分到多個(gè)使用者實(shí)例上,從而擴(kuò)展處理量。不幸的是,隊(duì)列不是多用戶的—一次進(jìn)程讀取了丟失的數(shù)據(jù)。發(fā)布-訂閱允許您將數(shù)據(jù)廣播到多個(gè)進(jìn)程,但是由于每條消息都傳遞給每個(gè)訂閱者,因此無法擴(kuò)展處理。

Kfka的消費(fèi)者群體概念概括了這兩個(gè)概念。與隊(duì)列一樣,使用者組允許您將處理劃分為一組進(jìn)程(使用者組的成員)。與發(fā)布訂閱一樣,Kafka允許您將消息廣播到多個(gè)消費(fèi)者組。

Kafka模型的優(yōu)點(diǎn)在于,每個(gè)主題都具有這些屬性-可以擴(kuò)展處理范圍,并且是多訂閱者-無需選擇其中一個(gè)。

與傳統(tǒng)的消息傳遞系統(tǒng)相比,Kafka還具有更強(qiáng)的訂購保證。

傳統(tǒng)隊(duì)列將記錄按順序保留在服務(wù)器上,如果多個(gè)使用者從隊(duì)列中消費(fèi),則服務(wù)器將按記錄的存儲順序分發(fā)記錄。但是,盡管服務(wù)器按順序分發(fā)記錄,但是這些記錄是異步傳遞給使用者的,因此它們可能在不同的使用者上亂序到達(dá)。這實(shí)際上意味著在并行使用的情況下會丟失記錄的順序。消息傳遞系統(tǒng)通常通過“專有使用者”的概念來解決此問題,該概念僅允許一個(gè)進(jìn)程從隊(duì)列中使用,但是,這當(dāng)然意味著在處理中沒有并行性。

Kafka做得更好。通過在主題內(nèi)具有并行性(即分區(qū))的概念,Kafka能夠在用戶進(jìn)程池中提供排序保證和負(fù)載均衡。這是通過將主題中的分區(qū)分配給消費(fèi)者組中的消費(fèi)者來實(shí)現(xiàn)的,以便每個(gè)分區(qū)都由組中的一個(gè)消費(fèi)者完全消費(fèi)。通過這樣做,我們確保使用者是該分區(qū)的唯一讀取器,并按順序使用數(shù)據(jù)。由于存在許多分區(qū),因此仍然可以平衡許多使用者實(shí)例上的負(fù)載。但是請注意,使用者組中的使用者實(shí)例不能超過分區(qū)。

1.6.3 kafka用作流處理

僅讀取,寫入和存儲數(shù)據(jù)流是不夠的,目的是實(shí)現(xiàn)對流的實(shí)時(shí)處理。

在Kafka中,流處理器是指從輸入主題中獲取連續(xù)數(shù)據(jù)流,對該輸入進(jìn)行一些處理并生成連續(xù)數(shù)據(jù)流以輸出主題的任何東西。

例如,零售應(yīng)用程序可以接受銷售和裝運(yùn)的輸入流,并輸出根據(jù)此數(shù)據(jù)計(jì)算出的重新訂購和價(jià)格調(diào)整流。

可以直接使用生產(chǎn)者和消費(fèi)者API進(jìn)行簡單處理。但是,對于更復(fù)雜的轉(zhuǎn)換,Kafka提供了完全集成的Streams API。這允許構(gòu)建執(zhí)行非重要處理的應(yīng)用程序,這些應(yīng)用程序計(jì)算流的聚合或?qū)⒘鬟B接在一起。

該功能有助于解決此類應(yīng)用程序所面臨的難題:處理無序數(shù)據(jù),在代碼更改時(shí)重新處理輸入,執(zhí)行狀態(tài)計(jì)算等。

流API建立在Kafka提供的核心原語之上:它使用生產(chǎn)者和使用者API進(jìn)行輸入,使用Kafka進(jìn)行狀態(tài)存儲,并使用相同的組機(jī)制來實(shí)現(xiàn)流處理器實(shí)例之間的容錯。

2. kafka中的關(guān)鍵術(shù)語解釋

2.1 Topic

主題。在 Kafka 中,使用一個(gè)類別屬性來劃分消息的所屬類,劃分消息的這個(gè)類稱為 topic。 topic 相當(dāng)于消息的分類標(biāo)簽,是一個(gè)邏輯概念

物理上不同Topic的消息分開存儲,邏輯上一個(gè)Topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處

##2.2 Partition

分區(qū)。topic 中的消息被分割為一個(gè)或多個(gè) partition,其是一個(gè)物理概念,對應(yīng)到系統(tǒng)上 就是一個(gè)或若干個(gè)目錄。partition 內(nèi)部的消息是有序的,但 partition 間的消息是無序的。

2.3 Segment

段。將 partition 進(jìn)一步細(xì)分為了若干的 segment,每個(gè) segment 文件的大小相等。

2.4 Broker

Kafka 集群包含一個(gè)或多個(gè)服務(wù)器,每個(gè)服務(wù)器節(jié)點(diǎn)稱為一個(gè) broker。

broker存儲topic的數(shù)據(jù)。如果某topic有N個(gè)partition,集群有N個(gè)broker,那么每個(gè)broker存儲該topic的一個(gè)partition。

如果某topic有N個(gè)partition,集群有(N+M)個(gè)broker,那么其中有N個(gè)broker存儲該topic的一個(gè)partition,剩下的M個(gè)broker不存儲該topic的partition數(shù)據(jù)。

如果某topic有N個(gè)partition,集群中broker數(shù)目少于N個(gè),那么一個(gè)broker存儲該topic的一個(gè)或多個(gè)partition。在實(shí)際生產(chǎn)環(huán)境中,盡量避免這種情況的發(fā)生,這種情況容易導(dǎo)致Kafka集群數(shù)據(jù)不均衡。

2.5 Producer

生產(chǎn)者, 即消息的發(fā)布者. 生產(chǎn)者將數(shù)據(jù)發(fā)布到他們選擇的主題。生產(chǎn)者負(fù)責(zé)選擇將哪個(gè)記錄分配給主題中的哪個(gè)分區(qū)。即: 生產(chǎn)者生產(chǎn)的一條消息,會被寫入到某一個(gè) partition。

##2.6 Consumer

消費(fèi)者??梢詮?broker 中讀取消息。

一個(gè)消費(fèi)者可以消費(fèi)多個(gè) topic 的消息

一個(gè)消費(fèi)者可以消費(fèi)同一個(gè) topic 中的多個(gè) partition 中的消息

一個(gè) partiton 允許多個(gè) consumer 同時(shí)消費(fèi)

2.7 Consumer Group

consumer group 是 kafka 提供的可擴(kuò)展且具有容錯性的消費(fèi)者機(jī)制。組內(nèi)可以有多個(gè)消 費(fèi)者,它們共享一個(gè)公共的 ID,即 group ID。組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來消費(fèi)訂閱主題 的所有分區(qū)。

Kafka 保證同一個(gè) consumer group 中只有一個(gè) consumer 會消費(fèi)某條消息,實(shí)際上,Kafka 保證的是穩(wěn)定狀態(tài)下每一個(gè) consumer 實(shí)例只會消費(fèi)某一個(gè)或多個(gè)特定的 partition,而某個(gè) partition 的數(shù)據(jù)只會被某一個(gè)特定的 consumer 實(shí)例所消費(fèi)。

下面我們用官網(wǎng)的一張圖, 來標(biāo)識consumer數(shù)量和partition數(shù)量的對應(yīng)關(guān)系

由兩臺服務(wù)器組成的Kafka群集,其中包含四個(gè)帶有兩個(gè)使用者組的分區(qū)(P0-P3)。消費(fèi)者組A有兩個(gè)消費(fèi)者實(shí)例,組B有四個(gè)。
Apache的Kafka介紹

其實(shí)對于這個(gè)消費(fèi)組, 以前一直搞不明白, 我自己的總結(jié)是:

topic中的partitoin到group是發(fā)布訂閱的通信方式,即一條topic的partition的消息會被所有的group消費(fèi),屬于一對多模式;group到consumer是點(diǎn)對點(diǎn)通信方式,屬于一對一模式。

舉個(gè)例子: 不使用group的話,啟動10個(gè)consumer消費(fèi)一個(gè)topic,這10個(gè)consumer都能得到topic的所有數(shù)據(jù),相當(dāng)于這個(gè)topic中的任一條消息被消費(fèi)10次。

使用group的話,連接時(shí)帶上groupid,topic的消息會分發(fā)到10個(gè)consumer上,每條消息只被消費(fèi)1次

2.8 Replizcas of partition

分區(qū)副本。副本是一個(gè)分區(qū)的備份,是為了防止消息丟失而創(chuàng)建的分區(qū)的備份。

2.9 Partition Leader

每個(gè) partition 有多個(gè)副本,其中有且僅有一個(gè)作為 Leader,Leader 是當(dāng)前負(fù)責(zé)消息讀寫 的 partition。即所有讀寫操作只能發(fā)生于 Leader 分區(qū)上。

2.10 Partition Follower

所有Follower都需要從Leader同步消息,F(xiàn)ollower與Leader始終保持消息同步。Leader 與 Follower 的關(guān)系是主備關(guān)系,而非主從關(guān)系。

2.11 ISR

  • ISR,In-Sync Replicas,是指副本同步列表。 ISR列表是由Leader負(fù)責(zé)維護(hù)。

  • AR,Assigned Replicas,指某個(gè) partition 的所有副本, 即已分配的副本列表。
  • OSR,Outof-Sync Replicas, 即非同步的副本列表。
  • AR = ISR + OSR

2. 12 offset

偏移量。每條消息都有一個(gè)當(dāng)前Partition下唯一的64字節(jié)的offset,它是相當(dāng)于當(dāng)前分區(qū)第一條消息的偏移量。

2.13 Broker Controller

Kafka集群的多個(gè)broker中,有一個(gè)會被選舉controller,負(fù)責(zé)管理整個(gè)集群中partition和replicas的狀態(tài)。

只有 Broker Controller 會向 zookeeper 中注冊 Watcher,其他 broker 及分區(qū)無需注冊。即 zookeeper 僅需監(jiān)聽 Broker Controller 的狀態(tài)變化即可。

2.14 HW與LEO

  • HW,HighWatermark,高水位,表示 Consumer 可以消費(fèi)到的最高 partition 偏移量。HW 保證了 Kafka 集群中消息的一致性。確切地說,是保證了 partition 的 Follower 與 Leader 間數(shù) 據(jù)的一致性。

  • LEO,Log End Offset,日志最后消息的偏移量。消息是被寫入到 Kafka 的日志文件中的, 這是當(dāng)前最后一個(gè)寫入的消息在 Partition 中的偏移量。

  • 對于 leader 新寫入的消息,consumer 是不能立刻消費(fèi)的。leader 會等待該消息被所有 ISR 中的 partition follower 同步后才會更新 HW,此時(shí)消息才能被 consumer 消費(fèi)。

我相信你看完上面的概念還是懵逼的, 好吧, 下面我們就用圖來形象話的表示兩者的關(guān)系吧
Apache的Kafka介紹

2.15 zookeeper

Zookeeper 負(fù)責(zé)維護(hù)和協(xié)調(diào) broker,負(fù)責(zé) Broker Controller 的選舉。

在 kafka0.9 之前版本,offset 是由 zk 負(fù)責(zé)管理的。

總結(jié):zk 負(fù)責(zé) Controller 的選舉,Controller 負(fù)責(zé) leader 的選舉。

2.16 Coordinator

Coordinator一般指的是運(yùn)行在每個(gè)broker上的group Coordinator進(jìn)程,用于管理Consumer Group中的各個(gè)成員,主要用于offset位移管理和Rebalance。一個(gè)Coordinator可以同時(shí)管理多個(gè)消費(fèi)者組。

2. 17 Rebalance

當(dāng)消費(fèi)者組中的數(shù)量發(fā)生變化,或者topic中的partition數(shù)量發(fā)生了變化時(shí),partition的所有權(quán)會在消費(fèi)者間轉(zhuǎn)移,即partition會重新分配,這個(gè)過程稱為再均衡Rebalance。

再均衡能夠給消費(fèi)者組及broker帶來高性能、高可用性和伸縮,但在再均衡期間消費(fèi)者是無法讀取消息的,即整個(gè)broker集群有小一段時(shí)間是不可用的。因此要避免不必要的再均衡。

##2.18 offset commit

Consumer從broker中取一批消息寫入buffer進(jìn)行消費(fèi),在規(guī)定的時(shí)間內(nèi)消費(fèi)完消息后,會自動將其消費(fèi)消息的offset提交給broker,以記錄下哪些消息是消費(fèi)過的。當(dāng)然,若在時(shí)限內(nèi)沒有消費(fèi)完畢,其是不會提交offset的。

3. kafka的工作原理和過程

3.1 消息寫入算法

消息發(fā)送者將消息發(fā)送給broker, 并形成最終的可供消費(fèi)者消費(fèi)的log, 是已給比較復(fù)雜的過程

  1. producer先從zookeeper中找到該partition的leader
  2. producer將消息發(fā)送給該leader
  3. leader將消息接入本地的log, 并通知ISR的followers
  4. ISR中的followers從leader中pull消息, 寫入本地log后向leader發(fā)送ack
  5. leader收到所有ISR中的followers的ack后, 增加HW并向producer發(fā)送ack, 表示消息寫入成功

3.2 消息路由策略

在通過 API 方式發(fā)布消息時(shí),生產(chǎn)者是以 Record 為消息進(jìn)行發(fā)布的。Record 中包含 key 與 value,value 才是我們真正的消息本身,而 key 用于路由消息所要存放的 Partition。消息 要寫入到哪個(gè) Partition 并不是隨機(jī)的,而是有路由策略的。

  1. 若指定了 partition,則直接寫入到指定的 partition;

  2. 若未指定 partition 但指定了 key,則通過對 key 的 hash 值與 partition 數(shù)量取模,該取模

    結(jié)果就是要選出的 partition 索引;

  3. 若 partition 和 key 都未指定,則使用輪詢算法選出一個(gè) partition。

3.3 HW截?cái)鄼C(jī)制

如果 partition leader 接收到了新的消息, ISR 中其它 Follower 正在同步過程中,還未同 步完畢時(shí) leader 宕機(jī)。此時(shí)就需要選舉出新的 leader。若沒有 HW 截?cái)鄼C(jī)制,將會導(dǎo)致 partition 中 leader 與 follower 數(shù)據(jù)的不一致。

當(dāng)原 Leader 宕機(jī)后又恢復(fù)時(shí),將其 LEO 回退到其宕機(jī)時(shí)的 HW,然后再與新的 Leader進(jìn)行數(shù)據(jù)同步,這樣就可以保證老 Leader 與新 Leader 中數(shù)據(jù)一致了,這種機(jī)制稱為 HW 截?cái)鄼C(jī)制。

3.4 消息發(fā)送的可靠性

生產(chǎn)者向 kafka 發(fā)送消息時(shí),可以選擇需要的可靠性級別。通過 request.required.acks參數(shù)的值進(jìn)行設(shè)置。

  1. 0值

異步發(fā)送。生產(chǎn)者向 kafka 發(fā)送消息而不需要 kafka 反饋成功 ack。該方式效率最高,但可靠性最低。其可能會存在消息丟失的情況。

  • 在傳輸過程中會出現(xiàn)消息丟失。
  • 在broker內(nèi)部會出現(xiàn)消息丟失。
  • 會出現(xiàn)寫入到kafka中的消息的順序與生產(chǎn)順序不一致的情況。
  1. 1值

同步發(fā)送。生產(chǎn)者發(fā)送消息給 kafka,broker 的 partition leader 在收到消息后馬上發(fā)送 成功 ack(無需等等 ISR 中的 Follower 同步),生產(chǎn)者收到后知道消息發(fā)送成功,然后會再發(fā)送消息。如果一直未收到 kafka 的 ack,則生產(chǎn)者會認(rèn)為消息發(fā)送失敗,會重發(fā)消息。

該方式對于 Producer 來說,若沒有收到 ACK,一定可以確認(rèn)消息發(fā)送失敗了,然后可以 重發(fā);但是,即使收到了 ACK,也不能保證消息一定就發(fā)送成功了。故,這種情況,也可能 會發(fā)生消息丟失的情況。

  1. -1值

同步發(fā)送。生產(chǎn)者發(fā)送消息給 kafka,kafka 收到消息后要等到 ISR 列表中的所有副本都 同步消息完成后,才向生產(chǎn)者發(fā)送成功 ack。如果一直未收到 kafka 的 ack,則認(rèn)為消息發(fā)送 失敗,會自動重發(fā)消息。該方式會出現(xiàn)消息重復(fù)接收的情況。

3.5 消費(fèi)者消費(fèi)過程解析

生產(chǎn)者將消息發(fā)送到topitc中, 消費(fèi)者即可對其進(jìn)行消費(fèi), 其消費(fèi)過程如下:

  1. consumer向broker提交連接請求,其所連接上的broker都會向其發(fā)送broker controller的通信URL,即配置文件中的listeners地址;
  2. 當(dāng)consumer指定了要消費(fèi)的topic后,會向broker controller發(fā)送消費(fèi)請求;
  3. broker controller會為consumer分配一個(gè)或幾個(gè)partition leader,并將該partition的當(dāng)前offset發(fā)送給consumer;
  4. consumer會按照broker controller分配的partition對其中的消息進(jìn)行消費(fèi);
  5. 當(dāng)consumer消費(fèi)完該條消息后,consumer會向broker發(fā)送一個(gè)消息已經(jīng)被消費(fèi)反饋,即該消息的offset;
  6. 在broker接收到consumer的offset后,會更新相應(yīng)的__consumer_offset中;
  7. 以上過程會一直重復(fù),知道消費(fèi)者停止請求消費(fèi);
  8. Consumer可以重置offset,從而可以靈活消費(fèi)存儲在broker上的消息。

3.6 Partition Leader選舉范圍

當(dāng)leader宕機(jī)后,broker controller會從ISR中挑選一個(gè)follower成為新的leader。如果ISR中沒有其他副本怎么辦?可以通過unclean.leader.election.enable的值來設(shè)置leader選舉范圍。

  1. false

必須等到ISR列表中所有的副本都活過來才進(jìn)行新的選舉。該策略可靠性有保證,但可用性低。

  1. true

    在ISR列表中沒有副本的情況下,可以選擇任意一個(gè)沒有宕機(jī)的主機(jī)作為新的leader,該策略可用性高,但可靠性沒有保證。

3.7 重復(fù)消費(fèi)問題的解決方案

  1. 同一個(gè)consumer重復(fù)消費(fèi)

當(dāng)Consumer由于消費(fèi)能力低而引發(fā)了消費(fèi)超時(shí),則可能會形成重復(fù)消費(fèi)。

在某數(shù)據(jù)剛好消費(fèi)完畢,但是正準(zhǔn)備提交offset時(shí)候,消費(fèi)時(shí)間超時(shí),則broker認(rèn)為這條消息未消費(fèi)成功。這時(shí)就會產(chǎn)生重復(fù)消費(fèi)問題。

其解決方案:延長offset提交時(shí)間。

  1. 不同的consumer重復(fù)消費(fèi)

當(dāng)Consumer消費(fèi)了消息,但還沒有提交offset時(shí)宕機(jī),則這些已經(jīng)被消費(fèi)過的消息會被重復(fù)消費(fèi)。

其解決方案:將自動提交改為手動提交。

3.8 從架構(gòu)設(shè)計(jì)上解決kafka重復(fù)消費(fèi)的問題

其實(shí)在開發(fā)的時(shí)候, 我們在設(shè)計(jì)程序的時(shí)候, 比如考慮到網(wǎng)絡(luò)故障等一些異常的情況, 我們都會設(shè)置消息的重試次數(shù),

可能還有其他可能出現(xiàn)消息重復(fù), 那我們應(yīng)該如何解決呢?

下面提供三個(gè)方案:

3.8.1 方案一: 保存并查詢

給每個(gè)消息都設(shè)置一個(gè)獨(dú)一無二的uuid, 所有的消息, 我們都要存一個(gè)uuid, 我們在消費(fèi)消息的時(shí)候, 首先去持久化系統(tǒng)中查詢一下, 看這個(gè)看是否以前消費(fèi)過, 如沒有消費(fèi)過, 在進(jìn)行消費(fèi), 如果已經(jīng)消費(fèi)過, 丟棄就好了, 下圖, 表明了這種方案

Apache的Kafka介紹

3.8.2 方案二: 利用冪等

冪等(Idempotence)在數(shù)學(xué)上是這樣定義的,如果一個(gè)函數(shù) f(x) 滿足:f(f(x)) = f(x),則函數(shù) f(x) 滿足冪等性。

這個(gè)概念被拓展到計(jì)算機(jī)領(lǐng)域,被用來描述一個(gè)操作、方法或者服務(wù)。一個(gè)冪等操作的特點(diǎn)是,其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。一個(gè)冪等的方法,使用同樣的參數(shù),對它進(jìn)行多次調(diào)用和一次調(diào)用,對系統(tǒng)產(chǎn)生的影響是一樣的。所以,對于冪等的方法,不用擔(dān)心重復(fù)執(zhí)行會對系統(tǒng)造成任何改變。

我們舉個(gè)例子來說明一下。在不考慮并發(fā)的情況下,“將 X 老師的賬戶余額設(shè)置為 100 萬元”,執(zhí)行一次后對系統(tǒng)的影響是,X 老師的賬戶余額變成了 100 萬元。只要提供的參數(shù) 100萬元不變,那即使再執(zhí)行多少次,X 老師的賬戶余額始終都是 100萬元,不會變化,這個(gè)操作就是一個(gè)冪等的操作。
再舉一個(gè)例子,“將 X 老師的余額加 100 萬元”,這個(gè)操作它就不是冪等的,每執(zhí)行一次,賬戶余額就會增加 100 萬元,執(zhí)行多次和執(zhí)行一次對系統(tǒng)的影響(也就是賬戶的余額)是不一樣的。

所以,通過這兩個(gè)例子,我們可以想到如果系統(tǒng)消費(fèi)消息的業(yè)務(wù)邏輯具備冪等性,那就不用擔(dān)心消息重復(fù)的問題了,因?yàn)橥粭l消息,消費(fèi)一次和消費(fèi)多次對系統(tǒng)的影響是完全一樣的。也就可以認(rèn)為,消費(fèi)多次等于消費(fèi)一次。

那么,如何實(shí)現(xiàn)冪等操作呢?最好的方式就是,從業(yè)務(wù)邏輯設(shè)計(jì)上入手,將消費(fèi)的業(yè)務(wù)邏輯設(shè)計(jì)成具備冪等性的操作。但是,不是所有的業(yè)務(wù)都能設(shè)計(jì)成天然冪等的,這里就需要一些方法和技巧來實(shí)現(xiàn)冪等。

下面我們介紹一種常用的方法:利用數(shù)據(jù)庫的唯一約束實(shí)現(xiàn)冪等。

例如,我們剛剛提到的那個(gè)不具備冪等特性的轉(zhuǎn)賬的例子:將 X 老師的賬戶余額加 100 萬元。在這個(gè)例子中,我們可以通過改造業(yè)務(wù)邏輯,讓它具備冪等性。

首先,我們可以限定,對于每個(gè)轉(zhuǎn)賬單每個(gè)賬戶只可以執(zhí)行一次變更操作,在分布式系統(tǒng)中,這個(gè)限制實(shí)現(xiàn)的方法非常多,最簡單的是我們在數(shù)據(jù)庫中建一張轉(zhuǎn)賬流水表,這個(gè)表有三個(gè)字段:轉(zhuǎn)賬單 ID、賬戶 ID 和變更金額,然后給轉(zhuǎn)賬單 ID 和賬戶 ID 這兩個(gè)字段聯(lián)合起來創(chuàng)建一個(gè)唯一約束,這樣對于相同的轉(zhuǎn)賬單 ID 和賬戶 ID,表里至多只能存在一條記錄。

這樣,我們消費(fèi)消息的邏輯可以變?yōu)椋骸霸谵D(zhuǎn)賬流水表中增加一條轉(zhuǎn)賬記錄,然后再根據(jù)轉(zhuǎn)賬記錄,異步操作更新用戶余額即可。”在轉(zhuǎn)賬流水表增加一條轉(zhuǎn)賬記錄這個(gè)操作中,由于我們在這個(gè)表中預(yù)先定義了“賬戶 ID 轉(zhuǎn)賬單 ID”的唯一約束,對于同一個(gè)轉(zhuǎn)賬單同一個(gè)賬戶只能插入一條記錄,后續(xù)重復(fù)的插入操作都會失敗,這樣就實(shí)現(xiàn)了一個(gè)冪等的操作。
Apache的Kafka介紹

3.8.3 方案三: 設(shè)置前提條件

為更新的數(shù)據(jù)設(shè)置前置條件另外一種實(shí)現(xiàn)冪等的思路是,給數(shù)據(jù)變更設(shè)置一個(gè)前置條件,如果滿足條件就更新數(shù)據(jù),否則拒絕更新數(shù)據(jù),在更新數(shù)據(jù)的時(shí)候,同時(shí)變更前置條件中需要判斷的數(shù)據(jù)。

這樣,重復(fù)執(zhí)行這個(gè)操作時(shí),由于第一次更新數(shù)據(jù)的時(shí)候已經(jīng)變更了前置條件中需要判斷的數(shù)據(jù),不滿足前置條件,則不會重復(fù)執(zhí)行更新數(shù)據(jù)操作。

比如,剛剛我們說過,“將 X 老師的賬戶的余額增加 100 萬元”這個(gè)操作并不滿足冪等性,我們可以把這個(gè)操作加上一個(gè)前置條件,變?yōu)椋骸叭绻鸛老師的賬戶當(dāng)前的余額為 500萬元,將余額加 100萬元”,這個(gè)操作就具備了冪等性。

對應(yīng)到消息隊(duì)列中的使用時(shí),可以在發(fā)消息時(shí)在消息體中帶上當(dāng)前的余額,在消費(fèi)的時(shí)候進(jìn)行判斷數(shù)據(jù)庫中,當(dāng)前余額是否與消息中的余額相等,只有相等才執(zhí)行變更操作。

但是,如果我們要更新的數(shù)據(jù)不是數(shù)值,或者我們要做一個(gè)比較復(fù)雜的更新操作怎么辦?用什么作為前置判斷條件呢?更加通用的方法是,給你的數(shù)據(jù)增加一個(gè)版本號屬性,每次更數(shù)據(jù)前,比較當(dāng)前數(shù)據(jù)的版本號是否和消息中的版本號一致,如果不一致就拒絕更新數(shù)據(jù),更新數(shù)據(jù)的同時(shí)將版本號 +1,一樣可以實(shí)現(xiàn)冪等。
Apache的Kafka介紹

4 . kafka集群搭建

我們在工作中, 為了保證環(huán)境的高可用, 防止單點(diǎn), kafka都是以集群的方式出現(xiàn)的, 下面就帶領(lǐng)大家一起搭建一套kafka集群環(huán)境

我們在官網(wǎng)下載kafka, 下載地址為: http://kafka.apache.org/downloads, 下載我們需要的版本, 推薦使用穩(wěn)定的版本

4.1 搭建集群

1.下載并解壓

cd /usr/local/src
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.11-2.4.0.tgz
mkdir /data/servers
tar xzvf kafka_2.11-2.4.0.tgz  -C /data/servers/
cd /data/servers/kafka_2.11-2.4.0

2.修改配置文件

kafka的配置文件$KAFKA_HOME/config/server.properties, 主要修改一下下面幾項(xiàng)

確保每個(gè)機(jī)器上的id不一樣
 broker.id=0
  配置服務(wù)端的監(jiān)控地址
 listeners=PLAINTEXT://192.168.51.128:9092
  kafka 日志目錄
 log.dirs=/data/servers/kafka_2.11-2.4.0/logs
 #kafka設(shè)置的partitons的個(gè)數(shù)
 num.partitions=1

  zookeeper的連接地址, 如果有自己的zookeeper集群, 請直接使用自己搭建的zookeeper集群
 zookeeper.connect=192.168.51.128:2181

因?yàn)槲易约菏潜緳C(jī)做實(shí)驗(yàn), 所有使用的是一個(gè)主機(jī)的不同端口, 在線上, 就是不同的機(jī)器,大家參考即可

我們這里使用kafka的zookeeper, 只啟動一個(gè)節(jié)點(diǎn), 但是正真的生產(chǎn)過程中, 是需要zookeeper集群, 自己搭建就好, 后期我們也會出zookeeper的教程, 大家請關(guān)注就好了.

3.拷貝3份配置文件
#創(chuàng)建對應(yīng)的日志目錄
mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9092
mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9093
mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9094

#拷貝三份配置文件
cp server.properties server_9092.properties 
cp server.properties server_9093.properties 
cp server.properties server_9094.properties 
修改不同端口對應(yīng)的文件
#9092的id為0, 9093的id為1, 9094的id為2
 broker.id=0
 # 配置服務(wù)端的監(jiān)控地址, 分別在不通的配置文件中寫入不同的端口
 listeners=PLAINTEXT://192.168.51.128:9092
 # kafka 日志目錄, 目錄也是對應(yīng)不同的端口
 log.dirs=/data/servers/kafka_2.11-2.4.0/logs/9092
 # kafka設(shè)置的partitons的個(gè)數(shù)
 num.partitions=1
 # zookeeper的連接地址, 如果有自己的zookeeper集群, 請直接使用自己搭建的zookeeper集群
 zookeeper.connect=192.168.51.128:2181
修改zookeeper的配置文件
dataDir=/data/servers/zookeeper
server.1=192.168.51.128:2888:3888
然后創(chuàng)建zookeeper的myid文件
echo "1"> /data/servers/zookeeper/myid
5.啟動zookeeper

使用kafka內(nèi)置的zookeeper

cd /data/servers/kafka_2.11-2.4.0/bin
zookeeper-server-start.sh -daemon ../config/zookeeper.properties 
netstat -anp |grep 2181
啟動kafka
./kafka-server-start.sh -daemon ../config/server_9092.properties   
./kafka-server-start.sh -daemon ../config/server_9093.properties   
./kafka-server-start.sh -daemon ../config/server_9094.properties   

4.2 kafka的操作

  1. topic

我們先來看一下創(chuàng)建topic常用的參數(shù)吧

--create 創(chuàng)建topic

--delete 刪除topic

--alter 修改topic的名字或者partition個(gè)數(shù)

--list 查看topic

--describe 查看topic的詳細(xì)信息

--topic <String: topic> 指定topic的名字

--zookeeper <String: hosts> 指定zookeeper的連接地址,

參數(shù)提示并不贊成這樣使用

DEPRECATED, The connection string for
the zookeeper connection in the form  host:port. Multiple hosts can be
given to allow fail-over.

--bootstrap-server <String: server to connect to>: 指定kafka的連接地址, 推薦使用這個(gè),

參數(shù)的提示信息顯示

REQUIRED: The Kafka server to connect

to. In case of providing this, a     
direct Zookeeper connection won't be 
required.

--replication-factor <Integer: replication factor> : 對于每個(gè)partiton的備份個(gè)數(shù)

The replication factor for each
partition in the topic being
created. If not supplied, defaults
to the cluster default.

--partitions <Integer: # of partitions>: 指定該topic的分區(qū)的個(gè)數(shù)

示例:

cd /data/servers/kafka_2.11-2.4.0/bin
# 創(chuàng)建topic  test1
kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test1
# 創(chuàng)建topic test2
kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test2
# 查看topic
kafka-topics.sh --list --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 
2. 自動創(chuàng)建topic

我們在工作中, 如果我們不想去管理topic, 可以通過kafka的配置文件來管理, 我們可以讓kafka自動創(chuàng)建topic, 需要在我們的kafka配置文件中加入如下配置文件

auto.create.topics.enable=true

如果刪除topic想達(dá)到物理刪除的目的, 也是需要配置的

delete.topic.enable=true
4. 發(fā)送消息

他們可以通過客戶端的命令生產(chǎn)消息

先來看看kafka-console-producer.sh常用的幾個(gè)參數(shù)吧

--topic <String: topic> 指定topic

--timeout <Integer: timeout_ms> 超時(shí)時(shí)間

--sync 異步發(fā)送消息

--broker-list <String: broker-list> 官網(wǎng)提示: REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2. 這個(gè)參數(shù)是必須的

kafka-console-producer.sh --broker-list 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1
  1. 消費(fèi)消息

我們也還是先來看看kafka-console-consumer.sh的參數(shù)吧

--topic <String: topic> 指定topic

--group <String: consumer group id> 指定消費(fèi)者組

--from-beginning : 指定從開始進(jìn)行消費(fèi), 如果不指定, 就從當(dāng)前進(jìn)行消費(fèi)

--bootstrap-server : kafka的連接地址

kafka-console-consumer.sh --bootstrap-server 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1 ---beginning

Apache的Kafka介紹

4.3 kafka的日志

kafka的日志分兩種:

第一種日志: 是我們的kafka的啟動日志, 就是我們排查問題, 查看報(bào)錯信息的日志,

第二種日志:就是我們的數(shù)據(jù)日志, kafka是我們的數(shù)據(jù)是以日志的形式存在存盤中的, 我們第二種所說的日志就是我們的partiton與segment

那我們就來說說備份和分區(qū)吧

我們創(chuàng)建一個(gè)分區(qū), 一個(gè)備份, 那么test就應(yīng)該在三臺機(jī)器上或者三個(gè)數(shù)據(jù)目錄只有一個(gè)test-0, (分區(qū)的下標(biāo)是從0開始的)

如果我們創(chuàng)建N個(gè)分區(qū), 我們就會在三個(gè)服務(wù)器上發(fā)現(xiàn), test_0-n

如果我們創(chuàng)建M個(gè)備份, 我們就會在發(fā)現(xiàn), test_0 到test_n 每一個(gè)都是M個(gè)

5. kafaka API

5.1 使用kafaka原生的api

1.消費(fèi)者自動提交:

定義自己的生產(chǎn)者

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

/**
 * @ClassName MyKafkaProducer
 * @Description TODO
 * @Author lingxiangxiang
 * @Date 3:37 PM
 * @Version 1.0
 **/
public class MyKafkaProducer {
    private org.apache.kafka.clients.producer.KafkaProducer<Integer, String> producer;

    public MyKafkaProducer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 設(shè)置批量發(fā)送
        properties.put("batch.size", 16384);
        // 批量發(fā)送的等待時(shí)間50ms, 超過50ms, 不足批量大小也發(fā)送
        properties.put("linger.ms", 50);
        this.producer = new org.apache.kafka.clients.producer.KafkaProducer<Integer, String>(properties);
    }

    public boolean sendMsg() {
        boolean result = true;
        try {
            // 正常發(fā)送, test2是topic, 0代表的是分區(qū), 1代表的是key, hello world是發(fā)送的消息內(nèi)容
            final ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("test2", 0, 1, "hello world");
            producer.send(record);
            // 有回調(diào)函數(shù)的調(diào)用
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    System.out.println(recordMetadata.topic());
                    System.out.println(recordMetadata.partition());
                    System.out.println(recordMetadata.offset());
                }
            });
          // 自己定義一個(gè)類
            producer.send(record, new MyCallback(record));
        } catch (Exception e) {
            result = false;
        }
        return result;
    }
}
定義生產(chǎn)者發(fā)送成功的回調(diào)函數(shù)
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * @ClassName MyCallback
 * @Description TODO
 * @Author lingxiangxiang
 * @Date 3:51 PM
 * @Version 1.0
 **/
public class MyCallback implements Callback {
    private Object msg;

    public MyCallback(Object msg) {
        this.msg = msg;
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) {
        System.out.println("topic = " + metadata.topic());
        System.out.println("partiton = " + metadata.partition());
        System.out.println("offset = " + metadata.offset());
        System.out.println(msg);
    }
}
生產(chǎn)者測試類:

在生產(chǎn)者測試類中,自己遇到一個(gè)坑, 就是最后自己沒有加sleep, 就是怎么檢查自己的代碼都沒有問題, 但是最后就是沒法發(fā)送成功消息, 最后加了一個(gè)sleep就可以了, 因?yàn)橹骱瘮?shù)main已經(jīng)執(zhí)行完退出, 但是消息并沒有發(fā)送完成, 需要進(jìn)行等待一下.當(dāng)然, 你在生產(chǎn)環(huán)境中可能不會遇到這樣問題, 呵呵, 代碼如下

import static java.lang.Thread.sleep;

/**
 * @ClassName MyKafkaProducerTest
 * @Description TODO
 * @Author lingxiangxiang
 * @Date 3:46 PM
 * @Version 1.0
 **/
public class MyKafkaProducerTest {
    public static void main(String[] args) throws InterruptedException {
        MyKafkaProducer producer = new MyKafkaProducer();
        boolean result = producer.sendMsg();
        System.out.println("send msg " + result);
        sleep(1000);
    }
}
消費(fèi)者類:
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

/**
 * @ClassName MyKafkaConsumer
 * @Description TODO
 * @Author lingxiangxiang
 * @Date 4:12 PM
 * @Version 1.0
 **/
public class MyKafkaConsumer extends ShutdownableThread {

    private KafkaConsumer<Integer, String> consumer;

    public MyKafkaConsumer() {
        super("KafkaConsumerTest", false);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
        properties.put("group.id", "mygroup");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("heartbeat.interval.ms", "10000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<Integer, String>(properties);
    }

    @Override
    public void doWork() {
        consumer.subscribe(Arrays.asList("test2"));
        ConsumerRecords<Integer, String>records = consumer.poll(1000);
        for (ConsumerRecord record : records) {
            System.out.println("topic = " + record.topic());
            System.out.println("partition = " + record.partition());
            System.out.println("key = " + record.key());
            System.out.println("value = " + record.value());
        }
    }
}
消費(fèi)者的測試類:
/**
 * @ClassName MyConsumerTest
 * @Description TODO
 * @Author lingxiangxiang
 * @Date 4:23 PM
 * @Version 1.0
 **/
public class MyConsumerTest {
    public static void main(String[] args) {
        MyKafkaConsumer consumer = new MyKafkaConsumer();
        consumer.start();
        System.out.println("==================");
    }
}

Apache的Kafka介紹

2. 消費(fèi)者同步手動提交

前面的消費(fèi)者都是以自動提交 offset 的方式對 broker 中的消息進(jìn)行消費(fèi)的,但自動提交 可能會出現(xiàn)消息重復(fù)消費(fèi)的情況。所以在生產(chǎn)環(huán)境下,很多時(shí)候需要對 offset 進(jìn)行手動提交, 以解決重復(fù)消費(fèi)的問題。

手動提交又可以劃分為同步提交、異步提交,同異步聯(lián)合提交。這些提交方式僅僅是 doWork()方法不相同,其構(gòu)造器是相同的。所以下面首先在前面消費(fèi)者類的基礎(chǔ)上進(jìn)行構(gòu)造 器的修改,然后再分別實(shí)現(xiàn)三種不同的提交方式。

  • 同步提交方式是,消費(fèi)者向 broker 提交 offset 后等待 broker 成功響應(yīng)。若沒有收到響 應(yīng),則會重新提交,直到獲取到響應(yīng)。而在這個(gè)等待過程中,消費(fèi)者是阻塞的。其嚴(yán)重影響 了消費(fèi)者的吞吐量。
    修改前面的MyKafkaConsumer.java, 主要修改下面的配置
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

/**
 * @ClassName MyKafkaConsumer
 * @Description TODO
 * @Author lingxiangxiang
 * @Date 4:12 PM
 * @Version 1.0
 **/
public class MyKafkaConsumer extends ShutdownableThread {

    private KafkaConsumer<Integer, String> consumer;

    public MyKafkaConsumer() {
        super("KafkaConsumerTest", false);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
        properties.put("group.id", "mygroup");
      // 這里要修改成手動提交
        properties.put("enable.auto.commit", "false");
        // properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("heartbeat.interval.ms", "10000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<Integer, String>(properties);
    }
    @Override
    public void doWork() {
        consumer.subscribe(Arrays.asList("test2"));
        ConsumerRecords<Integer, String>records = consumer.poll(1000);
        for (ConsumerRecord record : records) {
            System.out.println("topic = " + record.topic());
            System.out.println("partition = " + record.partition());
            System.out.println("key = " + record.key());
            System.out.println("value = " + record.value());

          //手動同步提交
          consumer.commitSync();
        }

    }
}

3. 消費(fèi)者異步手工提交

手動同步提交方式需要等待 broker 的成功響應(yīng),效率太低,影響消費(fèi)者的吞吐量。異步提交方式是,消費(fèi)者向 broker 提交 offset 后不用等待成功響應(yīng),所以其增加了消費(fèi)者的吞吐量。

import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

/**
 * @ClassName MyKafkaConsumer
 * @Description TODO
 * @Author lingxiangxiang
 * @Date 4:12 PM
 * @Version 1.0
 **/
public class MyKafkaConsumer extends ShutdownableThread {

    private KafkaConsumer<Integer, String> consumer;

    public MyKafkaConsumer() {
        super("KafkaConsumerTest", false);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
        properties.put("group.id", "mygroup");
      // 這里要修改成手動提交
        properties.put("enable.auto.commit", "false");
        // properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("heartbeat.interval.ms", "10000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<Integer, String>(properties);
    }

    @Override
    public void doWork() {
        consumer.subscribe(Arrays.asList("test2"));
        ConsumerRecords<Integer, String>records = consumer.poll(1000);
        for (ConsumerRecord record : records) {
            System.out.println("topic = " + record.topic());
            System.out.println("partition = " + record.partition());
            System.out.println("key = " + record.key());
            System.out.println("value = " + record.value());

          //手動同步提交
          // consumer.commitSync();
          //手動異步提交
          // consumer.commitAsync();
          // 帶回調(diào)公共的手動異步提交
          consumer.commitAsync((offsets, e) -> {
            if(e != null) {
              System.out.println("提交次數(shù), offsets = " + offsets);
              System.out.println("exception = " + e);
            }
          });
        }
    }
}
5.2 springboot使用kafka

現(xiàn)在大家的開發(fā)過程中, 很多都用的是springboot的項(xiàng)目, 直接啟動了, 如果還是用原生的API, 就是有點(diǎn)low了啊, 那kafka是如何和springboot進(jìn)行聯(lián)合的呢?

  1. maven配置
 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.1.1</version>
    </dependency>
  1. 添加配置文件

在application.properties中加入如下配置信息:

kafka  連接地址

spring.kafka.bootstrap-servers = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094

生產(chǎn)者
spring.kafka.producer.acks = 0
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.retries = 3
spring.kafka.producer.batch-size = 4096
spring.kafka.producer.buffer-memory = 33554432
spring.kafka.producer.compression-type = gzip
消費(fèi)者
spring.kafka.consumer.group-id = mygroup
spring.kafka.consumer.auto-commit-interval = 5000
spring.kafka.consumer.heartbeat-interval = 3000
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset = earliest
spring.kafka.consumer.enable-auto-commit = true
# listenner, 標(biāo)識消費(fèi)者監(jiān)聽的個(gè)數(shù)
spring.kafka.listener.concurrency = 8
# topic的名字
kafka.topic1 = topic1
生產(chǎn)者
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;

@Service
@Slf4j
public class MyKafkaProducerServiceImpl implements MyKafkaProducerService {
        @Resource
    private KafkaTemplate<String, String> kafkaTemplate;
        // 讀取配置文件
    @Value("${kafka.topic1}")
    private String topic;

    @Override
    public void sendKafka() {
      kafkaTemplate.send(topic, "hell world");
    }
}
消費(fèi)者

@Component

@Slf4j

public class MyKafkaConsumer {  @KafkaListener(topics = "${kafka.topic1}")    public void listen(ConsumerRecord<?, ?> record) {        Optional<?> kafkaMessage = Optional.ofNullable(record.value());        if (kafkaMessage.isPresent()) {            log.info("----------------- record =" + record);            log.info("------------------ message =" + kafkaMessage.get()); }

看完上述內(nèi)容,你們對Apache的Kafka大概了解了嗎?如果想了解更多相關(guān)文章內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!

分享文章:Apache的Kafka介紹
標(biāo)題URL:http://chinadenli.net/article34/giggse.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供移動網(wǎng)站建設(shè)、域名注冊服務(wù)器托管、網(wǎng)站營銷、靜態(tài)網(wǎng)站、小程序開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都seo排名網(wǎng)站優(yōu)化