欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

日志服務(wù)Python消費(fèi)組如何進(jìn)行實(shí)時(shí)分發(fā)數(shù)據(jù)

這篇文章給大家介紹日志服務(wù)Python消費(fèi)組如何進(jìn)行實(shí)時(shí)分發(fā)數(shù)據(jù),內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。

創(chuàng)新互聯(lián)于2013年創(chuàng)立,公司自成立以來(lái)始終致力于為企業(yè)提供官網(wǎng)建設(shè)、移動(dòng)互聯(lián)網(wǎng)業(yè)務(wù)開(kāi)發(fā)(小程序定制開(kāi)發(fā)、手機(jī)網(wǎng)站建設(shè)、重慶APP開(kāi)發(fā)公司等),并且包含互聯(lián)網(wǎng)基礎(chǔ)服務(wù)(域名、主機(jī)服務(wù)、企業(yè)郵箱、網(wǎng)絡(luò)營(yíng)銷等)應(yīng)用服務(wù);以先進(jìn)完善的建站體系及不斷開(kāi)拓創(chuàng)新的精神理念,幫助企業(yè)客戶實(shí)現(xiàn)互聯(lián)網(wǎng)業(yè)務(wù),嚴(yán)格把控項(xiàng)目進(jìn)度與質(zhì)量監(jiān)控加上過(guò)硬的技術(shù)實(shí)力獲得客戶的一致贊譽(yù)。

場(chǎng)景目標(biāo)

使用日志服務(wù)的Web-tracking、logtail(文件極簡(jiǎn))、syslog等收集上來(lái)的日志經(jīng)常存在各種各樣的格式,我們需要針對(duì)特定的日志(例如topic)進(jìn)行一定的分發(fā)到特定的logstore中處理和索引,本文主要介紹如何使用消費(fèi)組實(shí)時(shí)分發(fā)日志到不通的目標(biāo)日志庫(kù)中。并且利用消費(fèi)組的特定,達(dá)到自動(dòng)平衡、負(fù)載均衡和高可用性。

日志服務(wù)Python消費(fèi)組如何進(jìn)行實(shí)時(shí)分發(fā)數(shù)據(jù)cdn.com/cdd946a301ff55389b6f03559de059b6dc3b4b81.png">

基本概念

協(xié)同消費(fèi)庫(kù)(Consumer Library)是對(duì)日志服務(wù)中日志進(jìn)行消費(fèi)的高級(jí)模式,提供了消費(fèi)組(ConsumerGroup)的概念對(duì)消費(fèi)端進(jìn)行抽象和管理,和直接使用SDK進(jìn)行數(shù)據(jù)讀取的區(qū)別在于,用戶無(wú)需關(guān)心日志服務(wù)的實(shí)現(xiàn)細(xì)節(jié),只需要專注于業(yè)務(wù)邏輯,另外,消費(fèi)者之間的負(fù)載均衡、failover等用戶也都無(wú)需關(guān)心。

消費(fèi)組(Consumer Group)- 一個(gè)消費(fèi)組由多個(gè)消費(fèi)者構(gòu)成,同一個(gè)消費(fèi)組下面的消費(fèi)者共同消費(fèi)一個(gè)logstore中的數(shù)據(jù),消費(fèi)者之間不會(huì)重復(fù)消費(fèi)數(shù)據(jù)。
消費(fèi)者(Consumer)- 消費(fèi)組的構(gòu)成單元,實(shí)際承擔(dān)消費(fèi)任務(wù),同一個(gè)消費(fèi)組下面的消費(fèi)者名稱必須不同。

在日志服務(wù)中,一個(gè)logstore下面會(huì)有多個(gè)shard,協(xié)同消費(fèi)庫(kù)的功能就是將shard分配給一個(gè)消費(fèi)組下面的消費(fèi)者,分配方式遵循以下原則:

  • 每個(gè)shard只會(huì)分配到一個(gè)消費(fèi)者。

  • 一個(gè)消費(fèi)者可以同時(shí)擁有多個(gè)shard。
    新的消費(fèi)者加入一個(gè)消費(fèi)組,這個(gè)消費(fèi)組下面的shard從屬關(guān)系會(huì)調(diào)整,以達(dá)到消費(fèi)負(fù)載均衡的目的,但是上面的分配原則不會(huì)變,分配過(guò)程對(duì)用戶透明。

協(xié)同消費(fèi)庫(kù)的另一個(gè)功能是保存checkpoint,方便程序故障恢復(fù)時(shí)能接著從斷點(diǎn)繼續(xù)消費(fèi),從而保證數(shù)據(jù)不會(huì)被重復(fù)消費(fèi)。

使用消費(fèi)組進(jìn)行實(shí)時(shí)分發(fā)

這里我們描述用Python使用消費(fèi)組進(jìn)行編程,實(shí)時(shí)根據(jù)數(shù)據(jù)的topic進(jìn)行分發(fā)。
注意:本篇文章的相關(guān)代碼可能會(huì)更新,最新版本在這里可以找到:Github樣例.

image

安裝

環(huán)境

  1. 建議程序運(yùn)行在源日志庫(kù)同Region下的ECS上,并使用局域網(wǎng)服務(wù)入口,這樣好處是網(wǎng)絡(luò)速度最快,其次是讀取沒(méi)有外網(wǎng)費(fèi)用產(chǎn)生。

  2. 強(qiáng)烈推薦PyPy3來(lái)運(yùn)行本程序,而不是使用標(biāo)準(zhǔn)CPython解釋器。

  3. 日志服務(wù)的Python SDK可以如下安裝:

pypy3 -m pip install aliyun-log-python-sdk -U

更多SLS Python SDK的使用手冊(cè),可以參考這里

程序配置

如下展示如何配置程序:

  1. 配置程序日志文件,以便后續(xù)測(cè)試或者診斷可能的問(wèn)題(跳過(guò),具體參考樣例)。

  2. 基本的日志服務(wù)連接與消費(fèi)組的配置選項(xiàng)。

  3. 目標(biāo)Logstore的一些連接信息

請(qǐng)仔細(xì)閱讀代碼中相關(guān)注釋并根據(jù)需要調(diào)整選項(xiàng):

#encoding: utf8
def get_option():
    ##########################
    # 基本選項(xiàng)
    ##########################

    # 從環(huán)境變量中加載SLS參數(shù)與選項(xiàng),根據(jù)需要可以配置多個(gè)目標(biāo)
    accessKeyId = os.environ.get('SLS_AK_ID', '')
    accessKey = os.environ.get('SLS_AK_KEY', '')
    endpoint = os.environ.get('SLS_ENDPOINT', '')
    project = os.environ.get('SLS_PROJECT', '')
    logstore = os.environ.get('SLS_LOGSTORE', '')
    to_endpoint = os.environ.get('SLS_ENDPOINT_TO', endpoint)
    to_project = os.environ.get('SLS_PROJECT_TO', project)
    to_logstore1 = os.environ.get('SLS_LOGSTORE_TO1', '')
    to_logstore2 = os.environ.get('SLS_LOGSTORE_TO2', '')
    to_logstore3 = os.environ.get('SLS_LOGSTORE_TO3', '')
    consumer_group = os.environ.get('SLS_CG', '')

    # 消費(fèi)的起點(diǎn)。這個(gè)參數(shù)在第一次跑程序的時(shí)候有效,后續(xù)再次運(yùn)行將從上一次消費(fèi)的保存點(diǎn)繼續(xù)。
    # 可以使”begin“,”end“,或者特定的ISO時(shí)間格式。
    cursor_start_time = "2018-12-26 0:0:0"

    # 一般不要修改消費(fèi)者名,尤其是需要并發(fā)跑時(shí)
    consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

    # 構(gòu)建一個(gè)消費(fèi)組和消費(fèi)者
    option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time)

    # bind put_log_raw which is faster
    to_client = LogClient(to_endpoint, accessKeyId, accessKey)
    put_method1 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore1)
    put_method2 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore2)
    put_method3 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore3)

    return option, {u'ngnix': put_method1, u'sql_audit': put_method2, u'click': put_method3}

注意,這里使用了functools.partial對(duì)put_log_raw進(jìn)行綁定,以便后續(xù)調(diào)用方便。

數(shù)據(jù)消費(fèi)與分發(fā)

如下代碼展示如何從SLS拿到數(shù)據(jù)后根據(jù)topic進(jìn)行轉(zhuǎn)發(fā)。

if __name__ == '__main__':
    option, put_methods = get_copy_option()

    def copy_data(shard_id, log_groups):
        for log_group in log_groups.LogGroups:
            # update topic
            if log_group.Topic in put_methods:
                put_methods[log_group.Topic](log_group=log_group)

    logger.info("*** start to consume data...")
    worker = ConsumerWorker(ConsumerProcessorAdaptor, option, args=(copy_data, ))
    worker.start(join=True)

啟動(dòng)

假設(shè)程序命名為"dispatch_data.py",可以如下啟動(dòng):

export SLS_ENDPOINT=<Endpoint of your region>
export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_PROJECT=<SLS Project Name>
export SLS_LOGSTORE=<SLS Logstore Name>
export SLS_LOGSTORE_TO1=<SLS To Logstore1 Name>
export SLS_LOGSTORE_TO1=<SLS To Logstore2 Name>
export SLS_LOGSTORE_TO1=<SLS To Logstore3 Name>
export SLS_CG=<消費(fèi)組名,可以簡(jiǎn)單命名為"dispatch_data">

pypy3 dispatch_data.py

性能考慮

啟動(dòng)多個(gè)消費(fèi)者

基于消費(fèi)組的程序可以直接啟動(dòng)多次以便達(dá)到并發(fā)作用:

nohup pypy3 dispatch_data.py &
nohup pypy3 dispatch_data.py &
nohup pypy3 dispatch_data.py &
...

注意:
所有消費(fèi)者使用了同一個(gè)消費(fèi)組的名字和不同的消費(fèi)者名字(因?yàn)橄M(fèi)者名以進(jìn)程ID為后綴)。
因?yàn)橐粋€(gè)分區(qū)(Shard)只能被一個(gè)消費(fèi)者消費(fèi),假設(shè)一個(gè)日志庫(kù)有10個(gè)分區(qū),那么最多有10個(gè)消費(fèi)者同時(shí)消費(fèi)。

性能吞吐

基于測(cè)試,在沒(méi)有帶寬限制、接收端速率限制(如Splunk端)的情況下,以推進(jìn)硬件用pypy3運(yùn)行上述樣例,單個(gè)消費(fèi)者占用大約10%的單核CPU下可以消費(fèi)達(dá)到5 MB/s原始日志的速率。因此,理論上可以達(dá)到50 MB/s原始日志每個(gè)CPU核,也就是每個(gè)CPU核每天可以消費(fèi)4TB原始日志

注意:這個(gè)數(shù)據(jù)依賴帶寬、硬件參數(shù)和目標(biāo)Logstore是否能夠較快接收數(shù)據(jù)。

高可用性

消費(fèi)組會(huì)將檢測(cè)點(diǎn)(check-point)保存在服務(wù)器端,當(dāng)一個(gè)消費(fèi)者停止,另外一個(gè)消費(fèi)者將自動(dòng)接管并從斷點(diǎn)繼續(xù)消費(fèi)。

可以在不同機(jī)器上啟動(dòng)消費(fèi)者,這樣當(dāng)一臺(tái)機(jī)器停止或者損壞的清下,其他機(jī)器上的消費(fèi)者可以自動(dòng)接管并從斷點(diǎn)進(jìn)行消費(fèi)。

理論上,為了備用,也可以啟動(dòng)大于shard數(shù)量的消費(fèi)者。

其他

限制與約束

每一個(gè)日志庫(kù)(logstore)最多可以配置10個(gè)消費(fèi)組,如果遇到錯(cuò)誤ConsumerGroupQuotaExceed則表示遇到限制,建議在控制臺(tái)端刪除一些不用的消費(fèi)組。

監(jiān)測(cè)

  • 在控制臺(tái)查看消費(fèi)組狀態(tài)

  • 通過(guò)云監(jiān)控查看消費(fèi)組延遲,并配置報(bào)警

Https

如果服務(wù)入口(endpoint)配置為https://前綴,如https://cn-beijing.log.aliyuncs.com,程序與SLS的連接將自動(dòng)使用HTTPS加密。

服務(wù)器證書(shū)*.aliyuncs.com是GlobalSign簽發(fā),默認(rèn)大多數(shù)Linux/Windows的機(jī)器會(huì)自動(dòng)信任此證書(shū)。如果某些特殊情況,機(jī)器不信任此證書(shū),可以參考這里下載并安裝此證書(shū)。

關(guān)于日志服務(wù)Python消費(fèi)組如何進(jìn)行實(shí)時(shí)分發(fā)數(shù)據(jù)就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。

分享標(biāo)題:日志服務(wù)Python消費(fèi)組如何進(jìn)行實(shí)時(shí)分發(fā)數(shù)據(jù)
本文網(wǎng)址:http://chinadenli.net/article24/gdopje.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供電子商務(wù)、ChatGPT、商城網(wǎng)站網(wǎng)站設(shè)計(jì)公司、虛擬主機(jī)、網(wǎng)站改版

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

小程序開(kāi)發(fā)