本篇文章給大家分享的是有關kafka流量監(jiān)控的原理及實現(xiàn)方法是什么,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
10多年專注成都網站制作,企業(yè)網站制作,個人網站制作服務,為大家分享網站制作知識、方案,網站設計流程、步驟,成功服務上千家企業(yè)。為您提供網站建設,網站制作,網頁設計及定制高端網站建設服務,專注于企業(yè)網站制作,高端網頁制作,對紗窗等多個領域,擁有豐富的網站設計經驗。
工程能力
作為一個優(yōu)秀的開發(fā)人員,項目開發(fā)的過程中監(jiān)控告警系統(tǒng)的可靠性是可以體現(xiàn)出一個人的工程管理能力的。優(yōu)秀的監(jiān)控告警系統(tǒng)可以免去很多精力消耗,比如維護,故障預判,故障及時準確通知,故障定位排查等。
可以想像項目上線后,假如沒有監(jiān)控告警系統(tǒng),這么一個暗箱是多么可怕。
對于大數(shù)據(jù)項目,數(shù)據(jù)一般需要先入消息隊列,如kafka,然后分離線和實時將數(shù)據(jù)進行解耦分流,用于實時處理和離線處理。消息隊列存在的好處:
消息隊列的訂閱者可以根據(jù)需要隨時擴展,可以很好的擴展數(shù)據(jù)的使用者。
消息隊列的橫向擴展,增加吞吐量,做起來還是很簡單的。這個用傳統(tǒng)數(shù)據(jù)庫,分庫分表還是很麻煩的。
由于消息隊列的存在,也可以幫助我們抗高峰,避免高峰時期后端處理壓力過大導致整個業(yè)務處理宕機。
kafka在大數(shù)據(jù)項目中作用至關重要,那么對其的監(jiān)控告警就至關重要了,我們這里主要是講針對kafka流量的監(jiān)控告警,其目的也是很明顯的便于我們了解數(shù)據(jù)的整體情況及波動情況,以調整處理后端,如spark streaming,flume等。
kafka 監(jiān)控工具很多,常見的有kafka manager,KafkaOffsetMonitor,kafka eagle,kafka tools等,浪尖最經常使用的是kafka manager,也建議大家使用該工具,其不僅有監(jiān)控功能還有管理功能。具體使用方法可以參看:
kafka管理神器-kafkamanager
監(jiān)控指標
kafka的指標服務器和客戶端都有的。具體指標內容,可以參看kafka官網:
http://kafka.apache.org/0102/documentation.html#monitoring
查看可用指標的最簡單方法是啟動jconsole并將其指向正在運行的kafka客戶端或服務器; 這將允許使用JMX瀏覽所有指標。
對于熟悉kafka manager的朋友都應該看過broker相關信息,比如每秒鐘的流入的消息條數(shù),每秒鐘的流入的消息大小,流出的消息大小等。
使用kafka manager可以很方便的查看。但是,這其實不能讓我們及時的發(fā)現(xiàn)數(shù)據(jù)流量波動,或者說我們想畫個曲線的詳細對比歷史流量,它是做不到的。所以,我們要想辦法去獲取出來這些指標,然后做我們自己的展示。還有一點就是,流量波動告警。
浪尖這里只做了圖中幾個指標的接口:
def getBytesInPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesInPerSec", topicOption)
}
def getBytesOutPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesOutPerSec", topicOption)
}
def getBytesRejectedPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesRejectedPerSec", topicOption)
}
def getFailedFetchRequestsPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "FailedFetchRequestsPerSec", topicOption)
}
def getFailedProduceRequestsPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "FailedProduceRequestsPerSec", topicOption)
}
def getMessagesInPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = {
getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "MessagesInPerSec", topicOption)
}
jmx客戶端
連接jmx的server是可以使用jconsole,但是滿足不了我們的需求。所以,我們使用JMXConnectorFactory 方式連接jmx。使用JMXConnectorFactory 鏈接jmx時,JMXServiceURL 的參數(shù) url 必須使用 service:jmx 方式進行連接,具體鏈接創(chuàng)建方式很簡單,幾行代碼而已,如下:
val jmxHost = "hostname"
val jmxPort = 9999
val urlString = s"service:jmx:rmi:///jndi/rmi://$jmxHost:$jmxPort/jmxrmi"
val url = new JMXServiceURL(urlString)
val jmxc = JMXConnectorFactory.connect(url )
val mbsc = jmxc.getMBeanServerConnection;
println(KafkaMetrics.getMessagesInPerSec(Kafka_0_10_2_1,mbsc,Some("test")).fifteenMinuteRate)
jmxc.close()
開啟kafka的jmx端口
kafka的jmx服務默認時關閉的,開啟的話很簡單,只需要在kafka server的啟動腳本kafka-server-start.sh里增加一行代碼即可,內容export JMX_PORT="9999",增加位置如下:
if [ "x$KAFKA_HEAP_OPTS" = "x"]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT="9999"
fi
測試
我這里測試就比較簡單了,主要是將消息條數(shù)打出來,大家可以根據(jù)需要自行調整,比如均值大于閾值發(fā)短信告警等。
一套完整的kafka監(jiān)控,包括:
消費者監(jiān)控,主要是存活告警,消費滯后告警。
生產者監(jiān)控,主要是存活告警,生產者消費上游數(shù)據(jù)能力告警。
broker監(jiān)控,主要是存活告警,流量告警,isr列表,topic異常告警,control變換告警。
以上就是kafka流量監(jiān)控的原理及實現(xiàn)方法是什么,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
當前名稱:kafka流量監(jiān)控的原理及實現(xiàn)方法是什么
分享地址:http://chinadenli.net/article26/jgjdcg.html
成都網站建設公司_創(chuàng)新互聯(lián),為您提供網站策劃、品牌網站制作、做網站、微信公眾號、商城網站、小程序開發(fā)
聲明:本網站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)