欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

如何進行PulsarConnector機制的剖析

本篇文章給大家分享的是有關如何進行Pulsar Connector機制的剖析,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

創(chuàng)新互聯(lián)公司主營灌云網(wǎng)站建設的網(wǎng)絡公司,主營網(wǎng)站建設方案,重慶APP開發(fā),灌云h5微信平臺小程序開發(fā)搭建,灌云網(wǎng)站營銷推廣歡迎灌云等地區(qū)企業(yè)咨詢

Apache Pulsar 是 Yahoo 開源的下一代分布式消息系統(tǒng),在2018年9月從 Apache  軟件基金會畢業(yè)成為頂級項目。Pulsar 特有的分層分片的架構,在保證大數(shù)據(jù)消息流系統(tǒng)的性能和吞吐量的同時,也提供了高可用性、高可擴展性和易維護性。

分片架構將消息流數(shù)據(jù)的存儲粒度從分區(qū)拉低到了分片,以及相應的層級化存儲,使 Pulsar 成為 unbounded streaming data storage 的不二之選。這使得 Pulsar 可以更完美地匹配和適配 Flink 的批流一體的計算模式。

1. Pulsar 簡介


1.1 特點
 
隨著開源后,各行業(yè)企業(yè)可以根據(jù)不同需求,為 Pulsar 賦予更豐富的功能,所以目前它也不再只是中間件的功能,而是慢慢發(fā)展成為一個 Event Streaming Platform(事件流處理平臺),具有 Connect(連接)、Store(存儲)和 Process(處理)功能。
 
■ Connect
 
在連接方面,Pulsar 具有自己單獨的 Pub/Sub 模型,可以同時滿足 Kafka 和 RocketMQ 的應用場景。同時 Pulsar IO 的功能,其實就是 Connector,可以非常方便地將數(shù)據(jù)源導入到 Pulsar 或從 Pulsar 導出等。

另外,在Pulsar 2.5.0 中,我們新增了一個重要機制:Protocol handler。這個機制支持在 broker 自定義添加額外的協(xié)議支持,可以保證在不更改原數(shù)據(jù)庫的基礎上,也能享用 Pulsar 的一些高級功能。所以 Pulsar 也延展出比如:KoP、ActiveMQ、Rest 等。
 
■   Store
 
Pulsar 提供了可以讓用戶導入的途徑后就必然需要考慮在 Pulsar 上進行存儲。Pulsar 采用的是分布式存儲,最開始是在 Apache BookKeeper 上進行。后來添加了更多的層級存儲,通過 JCloud 和 HDFS 等多種模式進行存儲的選擇。當然,層級存儲也受限于存儲容量。
 
■ Process
 
Pulsar 提供了一個無限存儲的抽象,方便第三方平臺進行更好的批流融合的計算。即 Pulsar 的數(shù)據(jù)處理能力。Pulsar 的數(shù)據(jù)處理能力實際上是按照你數(shù)據(jù)計算的難易程度、實效性等進行了切分。
 
目前 Pulsar 包含以下幾類集成融合處理方式:
 
  • Pulsar Function:Pulsar 自帶的函數(shù)處理,通過不同系統(tǒng)端的函數(shù)編寫,即可完成計算并運用到 Pulsar 中。

  • Pulsar-Flink connector 和 Pulsar-Spark connector:作為批流融合計算引擎,F(xiàn)link 和 Spark 都提供流計算的機制。如果你已經(jīng)在使用他們了,那恭喜你。因為 Pulsar 也全部支持這兩種計算,無需你再進行多余的操作了。

  • Presto (Pulsar SQL):有的朋友會在應用場景中更多的使用 SQL,進行交互式查詢等。Pulsar 與 Presto 有很好的集成處理,可以用 SQL 在 Pulsar 進行處理。

 

如何進行Pulsar Connector機制的剖析

 
1.2 訂閱模型
 
從使用來看,Pulsar 的用法與傳統(tǒng)的消息系統(tǒng)類似,是基于發(fā)布-訂閱模型的。使用者被分為生產(chǎn)者(Producer)和消費者(Consumer)兩個角色,對于更具體的需求,還可以以 Reader 的角色來消費數(shù)據(jù)。用戶可以以生產(chǎn)者的身份將數(shù)據(jù)發(fā)布在特定的主題之下,也可以以消費者的身份訂閱(Subscription)特定的主題,從而獲取數(shù)據(jù)。在這個過程中,Pulsar 實現(xiàn)了數(shù)據(jù)的持久化與數(shù)據(jù)分發(fā),Pulsar 還提供了Schema 功能,能夠?qū)?shù)據(jù)進行驗證。

如下圖所示,Pulsar 里面有幾種訂閱模式:

  1. 獨占訂閱(Exclusive) 

  2. 故障轉(zhuǎn)移訂閱(Failover) 

  3. 共享訂閱(Shared) 

  4. Key保序共享訂閱(Key_shared)

   

如何進行Pulsar Connector機制的剖析

如何進行Pulsar Connector機制的剖析

    
Pulsar 里的主題分成兩類,一類是分區(qū)主題(Partitioned Topic),一類是非分區(qū)主題(Not Partitioned Topic)。

分區(qū)主題實際上是由多個非分區(qū)主題組成的。主題和分區(qū)都是邏輯上的概念,我們可以把主題看作是一個大的無限的事件流,被分區(qū)切分成幾條小的無限事件流。

而對應的,在物理上,Pulsar 采用分層結(jié)構。每一條事件流存儲在一個 Segment 中,每個Segment 包括了許多個Entry,Entry 里面存放的才是用戶發(fā)送過來的一條或多條消息實體。
 
Message 是 Entry 中存放的數(shù)據(jù),也是 Pulsar 中消費者消費一次獲得的數(shù)據(jù)。Message 中除了包括字節(jié)流數(shù)據(jù),還有 Key 屬性,兩種時間屬性和 MessageId 以及其他信息。MessageId 是消息的唯一標識,包括了ledger-id、entry-id、 batch-index、 partition-index 的信息,如下圖,分別記錄了消息在Pulsar 中的Segment、Entry、Message、Partition 存儲位置, 因此也可以據(jù)此從物理上找到Message的信息內(nèi)容。

如何進行Pulsar Connector機制的剖析

   

2. Pulsar 架構

   
 
一個 Pulsar 集群由 Brokers 集群和 Bookies 集群組成。Brokers 之間是相互獨立的,負責向生產(chǎn)者和消費者提供關于某個主題的服務。Bookies 之間也是相互獨立的,負責存儲 Segment 的數(shù)據(jù),是消息持久化的地方。為了管理配置信息和代理信息,Pulsar 還借助了 Zookeeper 這個組件,Brokers 和 Bookies 都會在 zookeeper 上注冊,下面從消息的具體讀寫路徑(見下圖)來介紹 Pulsar 的結(jié)構。
 

如何進行Pulsar Connector機制的剖析

如何進行Pulsar Connector機制的剖析

 
在寫路徑中,生產(chǎn)者創(chuàng)建并發(fā)送一條消息到主題中,該消息可能會以某種算法(比如Round robin)被路由到一個具體的分區(qū)上,Pulsar 會選擇一個Broker 為這個分區(qū)服務,該分區(qū)的消息實際會被發(fā)送到這個 Broker上。當Broker 拿到一條消息,它會以 Write Quorum (Qw)的方式將消息寫入到 Bookies 中。當成功寫入到 Bookies 的數(shù)量達到設定時,Broker 會收到完成通知,并且 Broker 也會返回通知生產(chǎn)者寫入成功。

在讀路徑中,消費者首先要發(fā)起一次訂閱,之后才能與主題對應的 Broker 進行連接,Broker 從 Bookies 請求數(shù)據(jù)并發(fā)送給消費者。當數(shù)據(jù)接受成功,消費者可以選擇向 Broker 發(fā)送確認信息,使得 Broker 能夠更新消費者的訪問位置信息。前面也提到,對于剛寫入的數(shù)據(jù),Pulsar 會存儲在緩存中,那么就可以直接從 Brokers 的緩存中讀取了,縮短了讀取路徑。
 
Pulsar 將存儲與服務相分離,實現(xiàn)了很好的可拓展性,在平臺層面,能夠通過調(diào)整Bookies 的數(shù)量來滿足不同的需求。在用戶層面,只需要跟 Brokers 通信,而Brokers 本身被設計成沒有狀態(tài)的,當某個 Broker 因故障無法使用時,可以動態(tài)的生成一個新的 Broker 來替換。
 

3. Pulsar Connector 內(nèi)部機制


 
首先,Pulsar Connector 在使用上是比較簡單的,由一個 Source 和一個 Sink 組成,source 的功能就是將一個或多個主題下的消息傳入到 Flink 的Source中,Sink的功能就是從 Flink 的 Sink 中獲取數(shù)據(jù)并放入到某些主題下,在使用方式上,如下所示,與 Kafa Connector 很相似,使用時需要設置一些參數(shù)。
 

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.setProperty("topic", "test-source-topic") FlinkPulsarSource<String> source = new FlinkPulsarSource<>(              serviceUrl,              adminUrl,              new SimpleStringSchema(),              props); DataStream<String> stream = see.addSource(source);
FlinkPulsarSink<Person> sink =      new FlinkPulsarSink(              serviceUrl,              adminUrl,              Optional.of(topic), // mandatory target topic              props,              TopicKeyExtractor.NULL, // replace this to extract key or topic for each record              Person.class); stream.addSink(sink);

 
現(xiàn)在介紹 Kulsar Connector 一些特性的實現(xiàn)機制。
 
3.1 精確一次
 
因為 Pulsar 中的 MessageId 是全局唯一且有序的,與消息在 Pulsar 中的物理存儲也對應,因此為了實現(xiàn) Exactly Once,Pulsar Connector 借助 Flink 的 Checkpoint 機制,將 MessageId 存儲到 Checkpoint。

對于連接器的 Source 任務,在每次觸發(fā) Checkpoint 的時候,會將各個分區(qū)當前處理的 MessageId 保存到狀態(tài)存儲里面,這樣在任務重啟的時候,每個分區(qū)都可以通過 Pulsar 提供的 Reader seek 接口找到 MessageId 對應的消息位置,然后從這個位置之后讀取消息數(shù)據(jù)。

通過 Checkpoint 機制,還能夠向存儲數(shù)據(jù)的節(jié)點發(fā)送數(shù)據(jù)使用完畢的通知,從而能準確刪除過期的數(shù)據(jù),做到存儲的合理利用。
 
3.2 動態(tài)發(fā)現(xiàn)
 
考慮到Flink中的任務都是長時間運行的,在運行任務的過程中,用戶也許會需要動態(tài)的增加部分主題或者分區(qū),Pulsar Connector 提供了自動發(fā)現(xiàn)的解決方案。

Pulsar 的策略是另外啟動一個線程,定期的去查詢設定的主題是否改變,分區(qū)有沒有增刪,如果發(fā)生了新增分區(qū)的情況,那么就額外創(chuàng)建新的Reader 任務去完成主題下的數(shù)據(jù)的反序列化,當然如果是刪除分區(qū),也會相應的減少讀取任務。
 
3.3 結(jié)構化數(shù)據(jù)
 
在讀取主題下的數(shù)據(jù)的過程中,我們可以將數(shù)據(jù)轉(zhuǎn)化成一條條結(jié)構化的記錄來處理。Pulsar 支持 Avro schema and avro/json/protobuf Message 格式類型的數(shù)據(jù)轉(zhuǎn)化成 Flink 中的 Row格式數(shù)據(jù)。對于用戶關心的元數(shù)據(jù),Pulsar 也在 Row 中提供了對應的元數(shù)據(jù)域。

另外,Pulsar 基于 Flink 1.9 版本進行了新的開發(fā),支持 Table API 和 Catalog,Pulsar 做了一個簡單的映射,如下圖所示,將 Pulsar 的租戶/命名空間對應到 Catalog 的數(shù)據(jù)庫,將主題對應為庫中的具體表。
 

如何進行Pulsar Connector機制的剖析

 

 之前提到 Pulsar 將數(shù)據(jù)存儲在 Bookeeper 中,還可以導入到 Hdfs 或者 S3 這樣的文件系統(tǒng)中,但對于分析型應用來說,我們往往只關心所有數(shù)據(jù)中每條數(shù)據(jù)的部分屬性,因此采用列存儲的方式對 IO 和網(wǎng)絡都會有性能提升,Pulsar 也在嘗試在Segment 中以列的方式存儲。
在原來的讀路徑中,不管是 Reader 還是Comsumer,都需要通過 Brokers 來傳遞數(shù)據(jù)。如果采用新的 Bypass Broker方式,通過查詢元數(shù)據(jù),就能直接找到每條 Message 存儲的 Bookie 位置,這樣可以直接從 Bookie 讀取數(shù)據(jù),縮短讀取路徑,從而提升效率。
Pulsar 相對 Kafka 來說,由于數(shù)據(jù)在物理上是存放在一個個 Segment 中的,那么在讀取的過程中,通過提高并行化的方式,建立多線程同時讀取多個 Segment,就能夠提升整個作業(yè)的完成效率,不過這也需要你的任務自身對每個Topic 分區(qū)的訪問順序沒有嚴格要求,并且對于新產(chǎn)生的數(shù)據(jù),是不保存在 Segement 的,還是需要做緩存的訪問來獲取數(shù)據(jù),因此,并行讀取將成為一個可選項,為用戶提供更多的選擇方案。

以上就是如何進行Pulsar Connector機制的剖析,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降摹OM隳芡ㄟ^這篇文章學到更多知識。更多詳情敬請關注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

網(wǎng)站標題:如何進行PulsarConnector機制的剖析
網(wǎng)站路徑:http://chinadenli.net/article8/gojsop.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營銷網(wǎng)站排名App設計標簽優(yōu)化小程序開發(fā)網(wǎng)站收錄

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

綿陽服務器托管