本篇文章給大家分享的是有關怎么進行Spark WC開發(fā)與應用部署的分析,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
創(chuàng)新互聯(lián)建站專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務,包含不限于網(wǎng)站建設、成都網(wǎng)站設計、武江網(wǎng)絡推廣、成都小程序開發(fā)、武江網(wǎng)絡營銷、武江企業(yè)策劃、武江品牌公關、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運營等,從售前售中售后,我們都將竭誠為您服務,您的肯定,是我們最大的嘉獎;創(chuàng)新互聯(lián)建站為所有大學生創(chuàng)業(yè)者提供武江建站搭建服務,24小時服務熱線:028-86922220,官方網(wǎng)址:chinadenli.net
Spark WordCount開發(fā)
創(chuàng)建的是maven工程,使用的依賴如下:
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.2</version> </dependency>
package cn.xpleaf.bigdata.spark.java.core.p1;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
/**
* spark Core 開發(fā)
*
* 基于Java
* 計算國際慣例
*
* Spark程序的入口:
* SparkContext
* Java:JavaSparkContext
* scala:SparkContext
*
* D:/data\spark\hello.txt
*
* spark RDD的操作分為兩種,第一為Transformation,第二為Action
* 我們將Transformation稱作轉換算子,Action稱作Action算子
* Transformation算子常見的有:map flatMap reduceByKey groupByKey filter...
* Action常見的有:foreach collect count save等等
*
* Transformation算子是懶加載的,其執(zhí)行需要Action算子的觸發(fā)
* (可以參考下面的代碼,只要foreach不執(zhí)行,即使中間RDD的操作函數(shù)有異常也不會報錯,因為其只是加載到內存中,并沒有真正執(zhí)行)
*/
public class _01SparkWordCountOps {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName(_01SparkWordCountOps.class.getSimpleName());
/**
* sparkConf中設置的master選擇,
* local
* local
* spark作業(yè)在本地執(zhí)行,為該spark作業(yè)分配一個工作線程
* local[N]
* spark作業(yè)在本地執(zhí)行,為該spark作業(yè)分配N個工作線程
* local[*]
* spark作業(yè)在本地執(zhí)行,根據(jù)機器的硬件資源,為spark分配適合的工作線程,一般也就2個
* local[N, M]
* local[N, M]和上面最大的區(qū)別就是,當spark作業(yè)啟動或者提交失敗之后,可以有M次重試的機會,上面幾種沒有
* standalone模式:
* 就是spark集群中master的地址,spark://uplooking01:7077
* yarn
* yarn-cluster
* 基于yarn的集群模式,sparkContext的構建和作業(yè)的運行都在yarn集群中執(zhí)行
* yarn-client
* 基于yarn的client模式,sparkContext的構建在本地,作業(yè)的運行在集群
*
* mesos
* mesos-cluster
* mesos-client
*/
String master = "local[*]";
conf.setMaster(master);
JavaSparkContext jsc = new JavaSparkContext(conf);
Integer defaultParallelism = jsc.defaultParallelism();
System.out.println("defaultParallelism=" + defaultParallelism);
/**
* 下面的操作代碼,其實就是spark中RDD的DAG圖
*/
JavaRDD<String> linesRDD = jsc.textFile("D:/data/spark/hello.txt");
System.out.println("linesRDD's partition size is: " + linesRDD.partitions().size());
JavaRDD<String> wordsRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String line) throws Exception {
// int i = 1 / 0; // 用以驗證Transformation算子的懶加載
return Arrays.asList(line.split(" "));
}
});
JavaPairRDD<String, Integer> pairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairRDD<String, Integer> retRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println("retRDD's partition size is: " + retRDD.partitions().size());
retRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> tuple) throws Exception {
System.out.println(tuple._1 + "---" + tuple._2);
}
});
jsc.close();
}
}本地執(zhí)行,輸出結果如下:
defaultParallelism=20 ...... linesRDD's partition size is: 2 retRDD's partition size is: 2 ...... hello---3 you---1 me---1 he---1
package cn.xpleaf.bigdata.spark.java.core.p1;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
/**
* spark Core 開發(fā)
*
* 基于Java
* 計算國際慣例
*
* Spark程序的入口:
* SparkContext
* Java:JavaSparkContext
* scala:SparkContext
*
* D:/data\spark\hello.txt
*
* lambda表達式的版本
*/
public class _02SparkWordCountOps {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName(_02SparkWordCountOps.class.getSimpleName());
String master = "local";
conf.setMaster(master);
JavaSparkContext jsc = new JavaSparkContext(conf);
/**
* 下面的操作代碼,其實就是spark中RDD的DAG圖
* 現(xiàn)在使用lambda表達式,更加簡單清晰
*/
JavaRDD<String> linesRDD = jsc.textFile("D:/data/spark/hello.txt");
JavaRDD<String> wordsRDD = linesRDD.flatMap(line -> {return Arrays.asList(line.split(" "));});
JavaPairRDD<String, Integer> pairRDD = wordsRDD.mapToPair(word -> {return new Tuple2<String, Integer>(word, 1);});
JavaPairRDD<String, Integer> retRDD = pairRDD.reduceByKey((v1, v2) -> {return v1 + v2;});
retRDD.foreach(tuple -> {
System.out.println(tuple._1 + "---" + tuple._2);
});
jsc.close();
}
}本地執(zhí)行,輸出結果如下:
you---1 he---1 hello---3 me---1
package cn.xpleaf.bigdata.spark.scala.core.p1
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 基于Scala的WordCount統(tǒng)計
*
* java.net.UnknownHostException: ns1
*
* spark系統(tǒng)不認識ns1
* 在spark的配置文件spark-defaults.conf中添加:
* spark.files /home/uplooking/app/hadoop/etc/hadoop/hdfs-site.xml,/home/uplooking/app/hadoop/etc/hadoop/core-site.xml
*/
object _01SparkWordCountOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(s"${_01SparkWordCountOps.getClass().getSimpleName}")
.setMaster("local")
val sc = new SparkContext(conf)
val linesRDD:RDD[String] = sc.textFile("D:/data/spark/hello.txt")
/*val wordsRDD:RDD[String] = linesRDD.flatMap(line => line.split(" "))
val parsRDD:RDD[(String, Int)] = wordsRDD.map(word => new Tuple2[String, Int](word, 1))
val retRDD:RDD[(String, Int)] = parsRDD.reduceByKey((v1, v2) => v1 + v2)
retRDD.collect().foreach(t => println(t._1 + "..." + t._2))*/
// 更簡潔的方式
linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect().foreach(t => println(t._1 + "..." + t._2))
sc.stop()
}
}本地執(zhí)行,輸出結果如下:
you...1 he...1 hello...3 me...1
上面的方式其實都是本地執(zhí)行的,可以把我們的應用部署到Spark集群或Yarn集群上,前面的代碼注釋也有提到這一點,就是關于Spark作業(yè)執(zhí)行的問題:
/** * sparkConf中設置的master選擇, * local * local * spark作業(yè)在本地執(zhí)行,為該spark作業(yè)分配一個工作線程 * local[N] * spark作業(yè)在本地執(zhí)行,為該spark作業(yè)分配N個工作線程 * local[*] * spark作業(yè)在本地執(zhí)行,根據(jù)機器的硬件資源,為spark分配適合的工作線程,一般也就2個 * local[N, M] * local[N, M]和上面最大的區(qū)別就是,當spark作業(yè)啟動或者提交失敗之后,可以有M次重試的機會,上面幾種沒有 * standalone模式: * 就是spark集群中master的地址,spark://uplooking01:7077 * yarn * yarn-cluster * 基于yarn的集群模式,sparkContext的構建和作業(yè)的運行都在yarn集群中執(zhí)行 * yarn-client * 基于yarn的client模式,sparkContext的構建在本地,作業(yè)的運行在集群 * * mesos * mesos-cluster * mesos-client */
local的多種情況可以自己測試一下。
這里只測試部署standalone和yarn-cluster兩種模式,實際上yarn-client也測試了,不過報異常,沒去折騰。注意用的是Scala的代碼。
其實很顯然,這里使用的是Spark離線計算的功能(Spark Core)。
將前面的scala版本的代碼修改為如下:
package cn.xpleaf.bigdata.spark.scala.core.p1
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 基于Scala的WordCount統(tǒng)計
*
* java.net.UnknownHostException: ns1
*
* spark系統(tǒng)不認識ns1
* 在spark的配置文件spark-defaults.conf中添加:
* spark.files /home/uplooking/app/hadoop/etc/hadoop/hdfs-site.xml,/home/uplooking/app/hadoop/etc/hadoop/core-site.xml
*/
object _01SparkWordCountOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(s"${_01SparkWordCountOps.getClass().getSimpleName}")
//.setMaster("local")
val sc = new SparkContext(conf)
val linesRDD:RDD[String] = sc.textFile("hdfs://ns1/hello")
/*val wordsRDD:RDD[String] = linesRDD.flatMap(line => line.split(" "))
val parsRDD:RDD[(String, Int)] = wordsRDD.map(word => new Tuple2[String, Int](word, 1))
val retRDD:RDD[(String, Int)] = parsRDD.reduceByKey((v1, v2) => v1 + v2)
retRDD.collect().foreach(t => println(t._1 + "..." + t._2))*/
// 更簡潔的方式
linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect().foreach(t => println(t._1 + "..." + t._2))
// collect不是必須要加的,但是如果在standalone的運行模式下,不加就看不到控制臺的輸出
// 而在yarn運行模式下,是看不到輸出的
sc.stop()
}
}主要是做了兩處的修改,一是注釋掉setMaster("local"),因為現(xiàn)在不是本地跑了,另外是數(shù)據(jù)來源,選擇的是HDFS上的數(shù)據(jù)文件。
需要注意的是,要想讓Spark集群認識ns1(我的Hadoop集群是HA部署方式),其實有兩種方式,一種設置環(huán)境變量HADOOP_CONF_DIR,但我測試的時候不生效,依然是無法識別ns1;另外一種是需要在Spark的配置文件spark-defaults.conf中添加spark.files /home/uplooking/app/hadoop/etc/hadoop/hdfs-site.xml,/home/uplooking/app/hadoop/etc/hadoop/core-site.xml,即指定Hadoop的配置文件地址,Hadoop HA的配置,就是在這兩個文件中進行的配置。我采用第二種方式有效。
上面準備工作完成后就可以將程序打包了,使用普通的打包或者maven打包都可以,注意不需要將依賴一起打包,因為我們的Spark集群環(huán)境中已經(jīng)存在這些依賴了。
關于應用的部署,準確來說是submit,官方文檔有很詳細的說明,可以參考:http://spark.apache.org/docs/latest/submitting-applications.html
先編寫下面一個腳本:
[uplooking@uplooking01 spark]$ cat spark-submit-standalone.sh #export HADOOP_CONF_DIR=/home/uplooking/app/hadoop/etc/hadoop /home/uplooking/app/spark/bin/spark-submit \ --class $2 \ --master spark://uplooking01:7077 \ --executor-memory 1G \ --num-executors 1 \ $1 \
然后執(zhí)行下面的命令:
[uplooking@uplooking01 spark]$ ./spark-submit-standalone.sh spark-wc.jar cn.xpleaf.bigdata.spark.scala.core.p1._01SparkWordCountOps
因為在程序代碼中已經(jīng)添加了collect Action算子,所以運行成功后可以直接在控制臺中看到輸出結果:
hello...3 me...1 you...1 he...1
然后也可以在spark提供的UI界面中看到其提交的作業(yè)以及執(zhí)行結果:

先編寫下面一個腳本:
[uplooking@uplooking01 spark]$ cat spark-submit-yarn.sh #export HADOOP_CONF_DIR=/home/uplooking/app/hadoop/etc/hadoop /home/uplooking/app/spark/bin/spark-submit \ --class $2 \ --master yarn \ --deploy-mode cluster \ --executor-memory 1G \ --num-executors 1 \ $1 \
執(zhí)行如下命令:
[uplooking@uplooking01 spark]$ ./spark-submit-yarn.sh spark-wc.jar cn.xpleaf.bigdata.spark.scala.core.p1._01SparkWordCountOps 18/04/25 17:47:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 18/04/25 17:47:39 INFO yarn.Client: Requesting a new application from cluster with 2 NodeManagers 18/04/25 17:47:39 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container) 18/04/25 17:47:39 INFO yarn.Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead 18/04/25 17:47:39 INFO yarn.Client: Setting up container launch context for our AM 18/04/25 17:47:39 INFO yarn.Client: Setting up the launch environment for our AM container 18/04/25 17:47:39 INFO yarn.Client: Preparing resources for our AM container 18/04/25 17:47:40 INFO yarn.Client: Uploading resource file:/home/uplooking/app/spark/lib/spark-assembly-1.6.2-hadoop2.6.0.jar -> hdfs://ns1/user/uplooking/.sparkStaging/application_1524552224611_0005/spark-assembly-1.6.2-hadoop2.6.0.jar 18/04/25 17:47:42 INFO yarn.Client: Uploading resource file:/home/uplooking/jars/spark/spark-wc.jar -> hdfs://ns1/user/uplooking/.sparkStaging/application_1524552224611_0005/spark-wc.jar 18/04/25 17:47:42 INFO yarn.Client: Uploading resource file:/tmp/spark-ae34fa23-5166-4fd3-a4ec-8e5115691801/__spark_conf__6834084285342234312.zip -> hdfs://ns1/user/uplooking/.sparkStaging/application_1524552224611_0005/__spark_conf__6834084285342234312.zip 18/04/25 17:47:43 INFO spark.SecurityManager: Changing view acls to: uplooking 18/04/25 17:47:43 INFO spark.SecurityManager: Changing modify acls to: uplooking 18/04/25 17:47:43 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(uplooking); users with modify permissions: Set(uplooking) 18/04/25 17:47:43 INFO yarn.Client: Submitting application 5 to ResourceManager 18/04/25 17:47:43 INFO impl.YarnClientImpl: Submitted application application_1524552224611_0005 18/04/25 17:47:44 INFO yarn.Client: Application report for application_1524552224611_0005 (state: ACCEPTED) 18/04/25 17:47:44 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1524649663869 final status: UNDEFINED tracking URL: http://uplooking02:8088/proxy/application_1524552224611_0005/ user: uplooking 18/04/25 17:47:45 INFO yarn.Client: Application report for application_1524552224611_0005 (state: ACCEPTED) 18/04/25 17:47:46 INFO yarn.Client: Application report for application_1524552224611_0005 (state: ACCEPTED) 18/04/25 17:47:47 INFO yarn.Client: Application report for application_1524552224611_0005 (state: ACCEPTED) 18/04/25 17:47:48 INFO yarn.Client: Application report for application_1524552224611_0005 (state: ACCEPTED) 18/04/25 17:47:49 INFO yarn.Client: Application report for application_1524552224611_0005 (state: ACCEPTED) 18/04/25 17:47:50 INFO yarn.Client: Application report for application_1524552224611_0005 (state: ACCEPTED) 18/04/25 17:47:51 INFO yarn.Client: Application report for application_1524552224611_0005 (state: RUNNING) 18/04/25 17:47:51 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: 192.168.43.103 ApplicationMaster RPC port: 0 queue: default start time: 1524649663869 final status: UNDEFINED tracking URL: http://uplooking02:8088/proxy/application_1524552224611_0005/ user: uplooking 18/04/25 17:47:52 INFO yarn.Client: Application report for application_1524552224611_0005 (state: RUNNING) 18/04/25 17:47:53 INFO yarn.Client: Application report for application_1524552224611_0005 (state: RUNNING) 18/04/25 17:47:54 INFO yarn.Client: Application report for application_1524552224611_0005 (state: RUNNING) 18/04/25 17:47:55 INFO yarn.Client: Application report for application_1524552224611_0005 (state: RUNNING) 18/04/25 17:47:56 INFO yarn.Client: Application report for application_1524552224611_0005 (state: RUNNING) 18/04/25 17:47:57 INFO yarn.Client: Application report for application_1524552224611_0005 (state: RUNNING) 18/04/25 17:47:58 INFO yarn.Client: Application report for application_1524552224611_0005 (state: RUNNING) 18/04/25 17:47:59 INFO yarn.Client: Application report for application_1524552224611_0005 (state: FINISHED) 18/04/25 17:47:59 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: 192.168.43.103 ApplicationMaster RPC port: 0 queue: default start time: 1524649663869 final status: SUCCEEDED tracking URL: http://uplooking02:8088/proxy/application_1524552224611_0005/ user: uplooking 18/04/25 17:47:59 INFO util.ShutdownHookManager: Shutdown hook called 18/04/25 17:47:59 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-ae34fa23-5166-4fd3-a4ec-8e5115691801
可以通過yarn提供的Web界面來查看其提交的作業(yè)情況:

但是找了日志也沒有找到輸出的統(tǒng)計結果,所以這種情況下,數(shù)據(jù)結果的落地就不應該只是輸出而已了,可以考慮其它的持久化存儲。
總體而言,對比MapReduce,僅僅從Spark Core來看,速度真的是有非常大的提高。
參考下面的圖示:

然后,下面是我跑的一個wordcount任務,在spark history server中查看其詳細信息,就很容易理解上面所說的stage劃分、寬依賴、窄依賴,相信會有一個相對比較清晰的認識:
以上就是怎么進行Spark WC開發(fā)與應用部署的分析,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
當前文章:怎么進行SparkWC開發(fā)與應用部署的分析
標題網(wǎng)址:http://chinadenli.net/article24/gospce.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營銷、靜態(tài)網(wǎng)站、手機網(wǎng)站建設、面包屑導航、網(wǎng)站設計、動態(tài)網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)