欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

SparkDataSources怎么使用

本篇內(nèi)容主要講解“Spark Data Sources怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“Spark Data Sources怎么使用”吧!

創(chuàng)新互聯(lián)專業(yè)為企業(yè)提供鐵門關(guān)網(wǎng)站建設(shè)、鐵門關(guān)做網(wǎng)站、鐵門關(guān)網(wǎng)站設(shè)計(jì)、鐵門關(guān)網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計(jì)與制作、鐵門關(guān)企業(yè)網(wǎng)站模板建站服務(wù),十載鐵門關(guān)做網(wǎng)站經(jīng)驗(yàn),不只是建網(wǎng)站,更提供有價(jià)值的思路和整體網(wǎng)絡(luò)服務(wù)。

一:Data Sources(數(shù)據(jù)源):

1.1    了解數(shù)據(jù)源。

        Spark SQL 支持對(duì)各種數(shù)據(jù)源通過DataFrame接口操作。DataFrame 可以作為正常 的RDDs進(jìn)行操作,也可以注冊(cè)為一個(gè)臨時(shí)表。
    注冊(cè)DataFrame為一個(gè)表允許您在其數(shù)據(jù)運(yùn)行 SQL 查詢。本節(jié)介紹用于加載和保存使用 Spark數(shù)據(jù)源的數(shù)據(jù)的一般方法,然后再進(jìn)入到可用的內(nèi)置數(shù)據(jù)源的特定選項(xiàng)。

1.2    Generic Load/Save Functions(通用加載/保存功能)。

        最簡(jiǎn)單的形式,默認(rèn)的數(shù)據(jù)源 (parquet除非否則配置由 spark.sql.sources.default) 將用于所有操作。

        eg:第一種讀取方式:通過 parquetFile("xxx") 來讀取

       首先把spark-1.6.1-bin-hadoop2.6\examples\src\main\resources下的users.parquet上傳到HDFS上。

public class SparkSqlDemo4 {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//創(chuàng)建了 sqlContext的上下文,注意,它是DataFrame的起點(diǎn)。
		SQLContext sqlContext = new SQLContext(sc);

		DataFrame df = sqlContext.read().load("hdfs://192.168.226.129:9000/txt/sparkshell/users.parquet");
		
		df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

		//指定保存模式  
		//df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");  
		//第一種讀取方式  
		DataFrame parquetFile = sqlContext.parquetFile("namesAndFavColors.parquet");

		parquetFile.registerTempTable("users");  

		DataFrame df1 = sqlContext.sql("SELECT name,favorite_color FROM users ");  
		
		df1.show();
		
		List<String> listString = df1.javaRDD().map(new Function<Row, String>() {  

			private static final long serialVersionUID = 1L;
			public String call(Row row) {  
				return "Name: " + row.getString(0) + " ,FavoriteColor: " + row.getString(1);  
			}  
		}).collect();  

		for (String string : listString) {
			System.out.println(  string );
		}
	}
}

輸出結(jié)果如下:

+------+--------------+
|  name|favorite_color|
+------+--------------+
|Alyssa|          null|
|   Ben|           red|
+------+--------------+

Name: Alyssa ,FavoriteColor: null
Name: Ben ,FavoriteColor: red

1.3    Manually Specifying Options(手動(dòng)指定選項(xiàng)):

         你可以也手動(dòng)指定的數(shù)據(jù)源,將與您想要將傳遞給數(shù)據(jù)源的任何額外選項(xiàng)一起使用。

         數(shù)據(jù)源由其完全限定名稱 (即 org.apache.spark.sql.parquet),

        但對(duì)于內(nèi)置來源您還可以使用 他們短名稱(json, parquet, jdbc)。

        DataFrames 任何類型可以轉(zhuǎn)換為其他類型,使用此語法。

     eg:第二種讀取方式:通過 parquet("xxx") 來讀取

public class SparkSqlDemo5 {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//創(chuàng)建了 sqlContext的上下文,注意,它是DataFrame的起點(diǎn)。
		SQLContext sqlContext = new SQLContext(sc);


		DataFrame df = sqlContext.read().format("json").load("hdfs://192.168.226.129:9000/txt/sparkshell/people.json");
		df.select("id", "name","sex","age").write().format("parquet").save("people.parquet");


		DataFrame parquetFile = sqlContext.read().parquet("people.parquet");  
		
		parquetFile.registerTempTable("people");  

		DataFrame df1 = sqlContext.sql("SELECT id,name,sex,age FROM people WHERE age >= 21 AND age <= 23");  

		df1.show();
		
		df1.printSchema();

		List<String> listString = df1.javaRDD().map(new Function<Row, String>() {  

			private static final long serialVersionUID = 1L;
			public String call(Row row) {  
				return "Id: " + row.getString(0) + ", Name: "+row.getString(1) + ",Sex" +row.getString(2)+ ", Age: " + row.getLong(3);
			}
		}).collect();  

		for (String string : listString) {
			System.out.println(  string );
		}
	}
}

1.4    Run SQL on files directly(直接在文件上運(yùn)行SQL)

       您也可以查詢?cè)撐募苯邮褂?SQL,并對(duì)其進(jìn)行查詢,而不是使用 API 讀取文件加載到DataFrame。

public class SparkSqlDemo6 {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//創(chuàng)建了 sqlContext的上下文,注意,它是DataFrame的起點(diǎn)。
		SQLContext sqlContext = new SQLContext(sc);

		// 注意 sql 語句 parquet 后面目錄的符號(hào) 。 
		DataFrame df = sqlContext.sql("SELECT * FROM parquet.`hdfs://192.168.226.129:9000/txt/sparkshell/users.parquet`");
		
		df.show();
	}
}

    注意:sql語句中紅色部分標(biāo)記的符號(hào)。

"SELECT * FROM parquet.`hdfs://192.168.226.129:9000/txt/sparkshell/users.parquet`"

二:Save Modes(保存模式):

        Save操作可以可選擇性地接收一個(gè)SaveModel,如果數(shù)據(jù)已經(jīng)存在了,指定如何處理已經(jīng)存在的數(shù)據(jù)。意識(shí)到這些保存模式?jīng)]有利用任何鎖,也不是原子的,這很重要。因此,如果有多個(gè)寫入者試圖往同一個(gè)地方寫入,這是不安全的。此外,當(dāng)執(zhí)行一個(gè)Overwrite,在寫入新的數(shù)據(jù)之前會(huì)將原來的數(shù)據(jù)進(jìn)行刪除。

eg:指定保存模式:
       df.select("name","favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");  

Scala/JavaAny LanguageMeaning
SaveMode.ErrorIfExists(default)"error"(default)當(dāng)保存DataFrame到一個(gè)數(shù)據(jù)源,如果數(shù)據(jù)已經(jīng)存在,將會(huì)引發(fā)異常。
SaveMode.Append"append"


當(dāng)保存DataFrame到一個(gè)數(shù)據(jù)源,如果數(shù)據(jù)/表已經(jīng)存在,DataFrame的內(nèi)容預(yù)計(jì)將追加到現(xiàn)有數(shù)據(jù)后面。

SaveMode.Overwrite"overwrite"覆蓋模式意味著當(dāng)保存DataFrame到一個(gè)數(shù)據(jù)源,如果數(shù)據(jù)/表已存在,現(xiàn)有數(shù)據(jù)預(yù)計(jì)將覆蓋原有的DataFrame內(nèi)容。
SaveMode.Ignore"ignore"Ignore模式意味著當(dāng)向數(shù)據(jù)源中保存一個(gè)DataFrame時(shí),如果數(shù)據(jù)已經(jīng)存在,save操作不會(huì)將DataFrame的內(nèi)容進(jìn)行保存,也不會(huì)修改已經(jīng)存在的數(shù)據(jù)。這與SQL中的`CREATE TABLE IF NOT EXISTS`相似。

三:Parquet 文件

    Parquet是一種列式存儲(chǔ)格式的文件,被許多其他數(shù)據(jù)處理系統(tǒng)所支持。Spark SQL支持度對(duì)Parquet文件的讀和寫,自動(dòng)保存原有數(shù)據(jù)的模式。

到此,相信大家對(duì)“Spark Data Sources怎么使用”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

網(wǎng)頁標(biāo)題:SparkDataSources怎么使用
標(biāo)題URL:http://chinadenli.net/article32/pigcpc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)頁設(shè)計(jì)公司網(wǎng)站維護(hù)靜態(tài)網(wǎng)站定制網(wǎng)站響應(yīng)式網(wǎng)站網(wǎng)站內(nèi)鏈

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站優(yōu)化排名