大數(shù)據(jù)學(xué)習(xí)路線分享Master的jps,SparkSubmit?
成都創(chuàng)新互聯(lián)公司是一家以重慶網(wǎng)站建設(shè)公司、網(wǎng)頁設(shè)計、品牌設(shè)計、軟件運維、營銷推廣、小程序App開發(fā)等移動開發(fā)為一體互聯(lián)網(wǎng)公司。已累計為戶外休閑椅等眾行業(yè)中小客戶提供優(yōu)質(zhì)的互聯(lián)網(wǎng)建站和軟件開發(fā)服務(wù)。
類啟動后的服務(wù)進程,用于提交任務(wù),
哪一段啟動提交任務(wù),哪一段啟動submit(Driver端)
?
提交任務(wù)流程
1.Driver端提交任務(wù)到Master(啟動sparkSubmit進程)
2.Master生成任務(wù)信息,放入對列中
3.Master通知Worker啟動Executor,(Master過濾出存活的Worker,將任務(wù)分配給空閑資源多的worker)
4.worker的Executor向Driver端注冊(只有executor真正參與計算) -> worker從Dirver端拿信息
5.Driver端啟動Executor將任務(wù)劃分階段,分成小的task,再廣播給相應(yīng)的Worker讓他去執(zhí)行
6.worker會將執(zhí)行完的任務(wù)回傳給Driver
?
?
range 相當于集合子類
scala> 1.to(10) res0: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, ?9, 10) ? scala> 1 to 10 res1: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, ?9, 10) |
?
提交任務(wù)到集群的任務(wù)類?:
Spark context?available as?sc
SQL context available as sqlContext
直接調(diào)用:
spark WordCount
構(gòu)建模板代碼:
SparkConf:構(gòu)建配置信息類,該配置優(yōu)先于集群配置文件
setAppName:指定應(yīng)用程序名稱,如果不指定,會自動生成一個類似于uuid產(chǎn)生的名稱
setMaster:指定運行模式:local-用1個線程模擬集群運行,
local[2]: 用2個線程模擬集群運行,loca[*]-當前有多少空閑到的線程就用多少線程來運行該任務(wù)
/** ??* 用spark實現(xiàn)單詞計數(shù) ??*/ object SparkWordCount { ??def main(args: Array[String]): Unit = { ????/** ??????* 構(gòu)建模板代碼 ??????*/ ????val conf: SparkConf = new SparkConf() ??????.setAppName("SparkWordCount") // ?????.setMaster("local[2]") ? ????// 創(chuàng)建提交任務(wù)到集群的入口類(上下文對象) ????val sc: SparkContext = new SparkContext(conf) ? ????// 獲取HDFS的數(shù)據(jù) ????val lines: RDD[String] = sc.textFile(args(0)) ? ????// 切分數(shù)據(jù),生成一個個單詞 ????val words: RDD[String] = lines.flatMap(_.split(" ")) ? ????// 把單詞生成一個個元組 ????val tuples: RDD[(String, Int)] = words.map((_, 1)) ? ????// 進行聚合操作 // ???tuples.reduceByKey((x, y) => x + y) ????val sumed: RDD[(String, Int)] = tuples.reduceByKey(_+_) ? ????// 以單詞出現(xiàn)的次數(shù)進行降序排序 ????val sorted: RDD[(String, Int)] = sumed.sortBy(_._2, false) ? ????// 打印到控制臺 // ???println(sorted.collect.toBuffer) // ???sorted.foreach(x => println(x)) // ???sorted.foreach(println) ? ????// 把結(jié)果存儲到HDFS ????sorted.saveAsTextFile(args(1)) ? ????// 釋放資源 ????sc.stop() ??} } |
打包后上傳Linux
1.首先啟動zookeeper,hdfs和Spark集群
啟動hdfs
/usr/local/hadoop-2.6.1/sbin/start-dfs.sh
啟動spark
/usr/local/spark-1.6.1-bin-hadoop2.6/sbin/start-all.sh
?
2.使用spark-submit命令提交Spark應(yīng)用(注意參數(shù)的順序)
/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-submit \
--class com.qf.spark.WordCount \
--master spark://node01:7077 \
--executor-memory 2G \
--total-executor-cores 4 \
/root/spark-mvn-1.0-SNAPSHOT.jar \
hdfs://node01:9000/words.txt \
hdfs://node01:9000/out
?
3.查看程序執(zhí)行結(jié)果
hdfs dfs -cat hdfs://node01:9000/out/part-00000
?
javaSparkWC
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; ? import java.util.Arrays; import java.util.List; ? public class JavaSparkWC { ????public static void main(String[] args) { ????????SparkConf conf = new SparkConf() ????????????????.setAppName("JavaSparkWC").setMaster("local[1]"); ???????? //提交任務(wù)入口類 ????????JavaSparkContext jsc = new JavaSparkContext(conf); ? ????????//獲取數(shù)據(jù) ????????JavaRDD<String> lines = jsc.textFile("hdfs://hadoop01:9000/wordcount/input/a.txt"); ????????//切分數(shù)據(jù) ????????JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ????????????@Override ????????????public Iterable<String> call(String s) throws Exception { ????????????????List<String> splited = Arrays.asList(s.split(" ")); //生成list ????????????????return splited; ????????????} ????????}); ? ????????//生成元祖 ??????????????????????????????//一對一組 ,(輸入單詞,輸出單詞,輸出1) ????????JavaPairRDD<String, Integer> tuples = words.mapToPair(new PairFunction<String, String, Integer>() { ????????????@Override ????????????public Tuple2<String, Integer> call(String s) throws Exception { ????????????????return new Tuple2<String, Integer>(s, 1); ????????????} ????????}); ? ????????//聚合 ?????????????????????????????????????????????????//2個相同key的value,聚合 ????????JavaPairRDD<String, Integer> sumed = tuples.reduceByKey(new Function2<Integer, Integer, Integer>() { ????????????@Override ????????????public Integer call(Integer v1, Integer v2) throws Exception { ????????????????return v1 + v2; ????????????} ????????}); ? ????????//此前key為String類型,沒有辦法排序 ????????//Java api并沒有提供sortBy算子,此時需要把兩個值位置調(diào)換,排序完成后,在換回來 ????????final JavaPairRDD<Integer, String> swaped = sumed.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { ????????????@Override ????????????public Tuple2<Integer, String> call(Tuple2<String, Integer> tup) throws Exception { // ???????????????return new Tuple2<Integer, String>(tup._2, tup._1); ????????????????return tup.swap(); //swap(),交換方法 ????????????} ????????}); ? ????????//降序排序 ????????JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false); ????????//再次交換 ????????JavaPairRDD<String, Integer> res = sorted.mapToPair( ????????????new PairFunction<Tuple2<Integer, String>, String, Integer>() { ???????????????@Override ???????????????public Tuple2<String, Integer> call(Tuple2<Integer, String> tup)throws Exception { ????????????????????return tup.swap(); ???????????????} ????????}); ? ????????System.out.println(res.collect()); ? ????????jsc.stop();//釋放資源 ????} } |
分享文章:大數(shù)據(jù)學(xué)習(xí)路線分享Master的jps
網(wǎng)站地址:http://chinadenli.net/article38/pijssp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供虛擬主機、品牌網(wǎng)站制作、動態(tài)網(wǎng)站、企業(yè)網(wǎng)站制作、營銷型網(wǎng)站建設(shè)、網(wǎng)站排名
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)