今天就跟大家聊聊有關(guān)spark與kafaka整合workcount示例分析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

站在用戶的角度思考問題,與客戶深入溝通,找到振安網(wǎng)站設(shè)計與振安網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗,讓設(shè)計與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個性化、用戶體驗好的作品,建站類型包括:成都網(wǎng)站設(shè)計、網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、域名注冊、虛擬空間、企業(yè)郵箱。業(yè)務(wù)覆蓋振安地區(qū)。
package hgs.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.storage.StorageLevel
import kafka.serializer.StringDecoder
import org.apache.kafka.common.serialization.StringDeserializer
import kafka.serializer.DefaultDecoder
import org.apache.spark.HashPartitioner
/*
* pom.xml添加
* <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.1</version>
</dependency>
* */
object SparkStreamingKafkaReciverWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(4))
ssc.checkpoint("d:\\checkpoint")
val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{
//iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一
//iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二
iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三
}
//注意下面的map一定要加上泛型,否則createStream會報錯
//kafaka的一些參數(shù)
val props = Map[String,String](
"bootstrap.servers"->"bigdata01:9092,bigdata02:9092,bigdata03:9092",
"group.id"->"group_test",
"enable.auto.commit"->"true",
"auto.commit.intervals.ms"->"2000",
"auto.offset.reset"->"smallest",
"zookeeper.connect"->"bigdata01:2181,bigdata02:2181,bigdata03:2181")
//topics
val topics = Map[String,Int]("test"->1)
val rds = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc, props, topics, StorageLevel.MEMORY_AND_DISK)
val words = rds.flatMap(x=>x._2.split(" "))
val wordscount = words.map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(sc.defaultMinPartitions), true)
wordscount.print()
//啟動
ssc.start()
ssc.awaitTermination()
}
}看完上述內(nèi)容,你們對spark與kafaka整合workcount示例分析有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。
網(wǎng)站題目:spark與kafaka整合workcount示例分析
URL地址:http://chinadenli.net/article38/gejopp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)網(wǎng)站制作、電子商務(wù)、、外貿(mào)網(wǎng)站建設(shè)、Google、虛擬主機(jī)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)