欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

【SparkStreaming-創(chuàng)新互聯(lián)

文章目錄
  • 1.轉(zhuǎn)換算子:
    • 案例需求:
  • sparkstreaming + kafka 整合 :
    • 版本選擇:
    • 2.spark整合kafka api:
    • 查看kafka topic命令:
    • sparkstreaming里面: 開(kāi)發(fā)模式:***
    • 3.提交offset信息
    • kafka消費(fèi)語(yǔ)義:
    • 存儲(chǔ)offset:

創(chuàng)新互聯(lián)一直在為企業(yè)提供服務(wù),多年的磨煉,使我們?cè)趧?chuàng)意設(shè)計(jì),成都全網(wǎng)營(yíng)銷(xiāo)到技術(shù)研發(fā)擁有了開(kāi)發(fā)經(jīng)驗(yàn)。我們擅長(zhǎng)傾聽(tīng)企業(yè)需求,挖掘用戶(hù)對(duì)產(chǎn)品需求服務(wù)價(jià)值,為企業(yè)制作有用的創(chuàng)意設(shè)計(jì)體驗(yàn)。核心團(tuán)隊(duì)擁有超過(guò)十年以上行業(yè)經(jīng)驗(yàn),涵蓋創(chuàng)意,策化,開(kāi)發(fā)等專(zhuān)業(yè)領(lǐng)域,公司涉及領(lǐng)域有基礎(chǔ)互聯(lián)網(wǎng)服務(wù)托管服務(wù)器成都app開(kāi)發(fā)、手機(jī)移動(dòng)建站、網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)絡(luò)整合營(yíng)銷(xiāo)。1.轉(zhuǎn)換算子:

transform

DStream 和 rdd之間數(shù)據(jù)進(jìn)行交互的算子

流處理 數(shù)據(jù)源:
	一個(gè)數(shù)據(jù)來(lái)自于 mysql數(shù)據(jù)/hdfs上文本數(shù)據(jù)  【量小】  從表/維表 
	一個(gè)數(shù)據(jù) 來(lái)自于 kafka sss 讀取形成 DStream數(shù)據(jù) 【量大】 主業(yè)務(wù)  =》 主表
案例需求:
彈幕 過(guò)濾的功能 /黑名單的功能 
離線(xiàn):

彈幕: 主表
	不好看
	垃圾
	男主真帥
	女主真好看
	666
過(guò)濾的彈幕:維表 
	熱巴真丑
	雞兒真美
	王鶴棣退出娛樂(lè)圈

實(shí)時(shí):

sparkstreaming + kafka 整合 :

kafka =》 sparkstreaming

版本選擇:

spark 2.x : kafka版本: 0.8 0.10.0 or higher ok
spark 3.x =>kafka : 1.kafka版本: 0.10.0 or higher ok

spark 去kafka讀取數(shù)據(jù)的方式:
1.kafka 0.8 reciver方式讀取kafka數(shù)據(jù) 【效率低 、代碼開(kāi)發(fā)復(fù)雜】
2.kafka 0.10.0版本之后 direct stream的方式加載kafka數(shù)據(jù) 【效率高、代碼開(kāi)發(fā)簡(jiǎn)單】
kafka:
版本也有要求: 0.11.0 版本之后

交付語(yǔ)義: consumer producer
producer 默認(rèn)就是精準(zhǔn)一次
consumer 交付語(yǔ)義取決于 consumer 框架本身

交付語(yǔ)義: consumer

? 至多一次 數(shù)據(jù)丟失問(wèn)題
? 至少一次 數(shù)據(jù)不會(huì)丟失,數(shù)據(jù)重復(fù)消費(fèi)
? 精準(zhǔn)一次 數(shù)據(jù)不會(huì)丟失 數(shù)據(jù)也不會(huì)重復(fù)消費(fèi)

spark 整合kafka 版本 0.10.0版本之后:
1.kafka 0.11.0之后 2.2.1 =>direct stream
2.sparkstreaming 默認(rèn)消費(fèi)kafka數(shù)據(jù) 交付語(yǔ)義:
至少一次

  1. spark消費(fèi)kafka, DStream 【rdd 分區(qū)數(shù)】 =》 kafka topic 分區(qū)數(shù) 是一一對(duì)應(yīng)的
    1:1 correspondence between Kafka partitions and Spark partitions,
2.spark整合kafka api:

? 1.simple API =》 過(guò)時(shí)不用了

  1. new Kafka consumer API 整合 kafka 主流
    3.引入依賴(lài): org.apache.spark spark-streaming-kafka-0-10_2.12 3.2.1

!!!不需要引入 kafka-clients 依賴(lài)

查看kafka topic命令:

kafka-topics.sh --list
–zookeeper bigdata32:2181,bigdata33:2181,bigdata34:2181/kafka

kafka-topics.sh --create
–zookeeper bigdata32:2181,bigdata33:2181,bigdata34:2181/kafka
–topic spark-kafka01 --partitions 3 --replication-factor 1

producer:
kafka-console-producer.sh
–broker-list bigdata33:9092,bigdata34:9092
–topic spark-kafka01

consumer:
kafka-console-consumer.sh
–bootstrap-server bigdata33:9092,bigdata34:9092
–topic spark-kafka
–from-beginning

val kafkaParams = Map[String, Object](
“bootstrap.servers” ->“bigdata33:9092,bigdata34:9092”,
“key.deserializer” ->classOf[StringDeserializer],
“value.deserializer” ->classOf[StringDeserializer],
“group.id” ->“dl2262_01”,
“auto.offset.reset” ->“l(fā)atest”,
“enable.auto.commit” ->(false: java.lang.Boolean)
)

需求:
消費(fèi)kafka數(shù)據(jù) wc 將 結(jié)果寫(xiě)到 mysql里面

input
todo
output

kafka =>spark =>mysql 鏈路打通了

模擬:spark作業(yè)掛掉 =》 重啟

“消費(fèi)完kafka的數(shù)據(jù) 程序重啟之后接著從上次消費(fèi)的位置接著消費(fèi) ”

目前: code不能滿(mǎn)足
1.目前代碼 這兩個(gè)參數(shù) 不能動(dòng)
“auto.offset.reset” ->“earliest”
“enable.auto.commit” ->(false: java.lang.Boolean)

2.主要原因 : spark作業(yè) 消費(fèi)kafka數(shù)據(jù):
1.獲取kafka offset =》 處理kafka數(shù)據(jù) =》 “提交offset的操作” 沒(méi)有
解決:
1.獲取kafka offset // todo
2. 處理kafka數(shù)據(jù)
3.提交offset的操作 // todo

1.獲取kafka offset // todo
1. kafka offset 信息
2.spark rdd分區(qū)數(shù) 和 kafka topic 的分區(qū)數(shù) 是不是 一對(duì)一

報(bào)錯(cuò):
org.apache.spark.rdd.ShuffledRDD cannot be cast to org.apache.spark.streaming.kafka010.HasOffsetRanges

ShuffledRDD =》 HasOffsetRanges 說(shuō)明 代碼有問(wèn)題

sparkstreaming里面: 開(kāi)發(fā)模式:***

? 1.獲取kafka 流數(shù)據(jù)
? 2. 流 Dstream =》 調(diào)用foreachRDD算子 進(jìn)行輸出:
? 0.獲取offset 信息
? 1.做業(yè)務(wù)邏輯
? 2.結(jié)果數(shù)據(jù)輸出
? 3.提交offset信息

offset解釋?zhuān)?/p>

01 batch:

rdd的分區(qū)數(shù):3
topic partition fromOffset untilOffset
spark-kafka01 0 0 1
spark-kafka01 1 0 1
spark-kafka01 2 0 0

02 batch:
rdd的分區(qū)數(shù):3
topic partition fromOffset untilOffset
spark-kafka01 0 1 1
spark-kafka01 1 1 1
spark-kafka01 2 0 0

此時(shí) kafka 里面數(shù)據(jù)已經(jīng)消費(fèi)完了 fromOffset=untilOffset

3.提交offset信息

2.存儲(chǔ)offset信息
spark流式處理 默認(rèn)消費(fèi)語(yǔ)義 : 至少一次
精準(zhǔn)一次:
1.output + offset 同時(shí)完成
1.生產(chǎn)上Checkpoints不能用
2.Kafka itself =》至少一次
推薦使用 =》 簡(jiǎn)單 高效
90% 都可以解決 10% 精準(zhǔn)一次
3.Your own data store: =》 開(kāi)發(fā)大量代碼 =》
mysql、redis、hbase、
至少一次
精準(zhǔn)一次
mysql:
獲取offset
todo
output
提交offset

spark作業(yè)掛了 =》 啟動(dòng)spark作業(yè) :
1.從mysql里面獲取offset
todo
output
提交offset

kafka消費(fèi)語(yǔ)義:

? 1.至多一次 【丟數(shù)據(jù)】
? 2.至少一次 【不會(huì)丟數(shù)據(jù) 可能會(huì)重復(fù)消費(fèi)數(shù)據(jù)】
? 3.精準(zhǔn)一次 【不丟、不重復(fù)消費(fèi)】

offset信息提交 :
1.spark todo :
至少一次:
1 2 3 4
offset get
業(yè)務(wù)邏輯 output db
提交offset

精準(zhǔn)一次:output + 提交offset 一起發(fā)生 =》 事務(wù)來(lái)實(shí)現(xiàn)
事務(wù): 一次操作要么成功 要么失敗

topic partition fromOffset untilOffset
spark-kafka01 0 3 3
spark-kafka01 2 2 2
spark-kafka01 1 2 2

存儲(chǔ)offset:

? kafka 本身:
? offset 信息存儲(chǔ)在哪?

kafka 某個(gè)topic下:
__consumer_offsets =》 spark作業(yè) 消費(fèi)kafka的offset信息

topic offset 信息存儲(chǔ)的地方

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧

網(wǎng)站標(biāo)題:【SparkStreaming-創(chuàng)新互聯(lián)
網(wǎng)頁(yè)地址:http://chinadenli.net/article32/dpgepc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供服務(wù)器托管App開(kāi)發(fā)品牌網(wǎng)站設(shè)計(jì)自適應(yīng)網(wǎng)站網(wǎng)站設(shè)計(jì)公司微信小程序

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作