在學習Spark前,建議先正確理解spark,可以參考:正確理解spark
創(chuàng)新互聯(lián)建站專注于企業(yè)營銷型網(wǎng)站、網(wǎng)站重做改版、容城網(wǎng)站定制設計、自適應品牌網(wǎng)站建設、H5高端網(wǎng)站建設、商城網(wǎng)站建設、集團公司官網(wǎng)建設、外貿(mào)網(wǎng)站建設、高端網(wǎng)站制作、響應式網(wǎng)頁設計等建站業(yè)務,價格優(yōu)惠性價比高,為容城等各大城市提供網(wǎng)站開發(fā)制作服務。
本篇對JavaRDD基本的action api進行了詳細的描述
先定義兩個Comparator實現(xiàn),一個是實現(xiàn)升序,一個是實現(xiàn)降序
//升序排序比較器
private static class AscComparator implements Comparator<Integer>, Serializable {
@Override
public int compare(java.lang.Integer o1, java.lang.Integer o2) {
return o1 - o2;
}
}
//降序排序比較器
private static class DescComparator implements Comparator<Integer>, Serializable {
@Override
public int compare(java.lang.Integer o1, java.lang.Integer o2) {
return o2 - o1;
}
}再定義一個RDD:
JavaRDD<Integer> listRDD = sc.parallelize(Arrays.asList(1, 2, 4, 3, 3, 6), 2);
一、collect、take、top、first
//結(jié)果: [1, 2, 4, 3, 3, 6] 將RDD的所有數(shù)據(jù)收集到driver端來,用于小數(shù)據(jù)或者實驗,
// 對大數(shù)據(jù)量的RDD進行collect會出現(xiàn)driver端內(nèi)存溢出
System.out.println("collect = " + listRDD.collect());
//結(jié)果:[1, 2] 將RDD前面兩個元素收集到java端
//take的原理大致為:先看看RDD第一個分區(qū)的元素夠不夠我們想take的數(shù)量
//不夠的話再根據(jù)剩余的需要take的數(shù)據(jù)量來估算需要掃描多少個分區(qū)的數(shù)據(jù),直到take到了我們想要的數(shù)據(jù)個數(shù)為止
System.out.println("take(2) = " + listRDD.take(2));
//結(jié)果:[6, 4] 取RDD升序的最大的兩個元素
System.out.println("top(2) = " + listRDD.top(2));
//結(jié)果:[1, 2] 取RDD降序的最大的兩個元素(即取RDD最小的兩個元素)
System.out.println("DescComparator top(2) = " + listRDD.top(2, new DescComparator()));
//結(jié)果:1 其底層實現(xiàn)就是take(1)
System.out.println("first = " + listRDD.first());二、min、max
//結(jié)果:1。 按照升序取最小值,就是RDD的最小值
System.out.println("min = " + listRDD.min(new AscComparator()));
//結(jié)果:6 按照降序取最小值,就是RDD的最大值
System.out.println("min = " + listRDD.min(new DescComparator()));
//結(jié)果:6 按照升序取最大值,就是RDD的最大值
System.out.println("max = " + listRDD.max(new AscComparator()));
//結(jié)果:1 按照降序取最大值,就是RDD的最小值
System.out.println("max = " + listRDD.max(new DescComparator()));min和max的底層是用reduce api來實現(xiàn)的,下面是偽代碼
min() == reduce((x, y) => if (x <= y) x else y) max() == redcue((x, y) => if (x >= y) x else y)
對于reduce api我們見下面的講解
三、takeOrdered
//結(jié)果:[1, 2] 返回該RDD最小的兩個元素
System.out.println("takeOrdered(2) = " + listRDD.takeOrdered(2));
//結(jié)果:[1, 2] 返回RDD按照升序的前面兩個元素,即返回該RDD最小的兩個元素
System.out.println("takeOrdered(2) = " + listRDD.takeOrdered(2, new AscComparator()));
//結(jié)果:[6, 4] 返回RDD按照降序的前面兩個元素,即返回該RDD最大的兩個元素
System.out.println("takeOrdered(2) = " + listRDD.takeOrdered(2, new DescComparator()));四、foreach和foreachPartition
foreach是對RDD每一個元素應用自定義的函數(shù),而foreachPartition是對RDD的每一個partition應用自定義的函數(shù),使用時需要注意下面的建議
先定義一個比較耗時的操作:
public static Integer getInitNumber(String source) {
System.out.println("get init number from " + source + ", may be take much time........");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}listRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer element) throws Exception {
//這個性能太差,遍歷每一個元素的時候都需要調(diào)用比較耗時的getInitNumber
//建議采用foreachPartition來代替foreach操作
Integer initNumber = getInitNumber("foreach");
System.out.println((element + initNumber) + "=========");
}
});
listRDD.foreachPartition(new VoidFunction<Iterator<Integer>>() {
@Override
public void call(Iterator<Integer> integerIterator) throws Exception {
//和foreach api的功能是一樣,只不過一個是將函數(shù)應用到每一條記錄,這個是將函數(shù)應用到每一個partition
//如果有一個比較耗時的操作,只需要每一分區(qū)執(zhí)行一次這個操作就行,則用這個函數(shù)
//這個耗時的操作可以是連接數(shù)據(jù)庫等操作,不需要計算每一條時候去連接數(shù)據(jù)庫,一個分區(qū)只需連接一次就行
Integer initNumber = getInitNumber("foreach");
while (integerIterator.hasNext()) {
System.out.println((integerIterator.next() + initNumber) + "=========");
}
}
});五、reduce 和 treeReduce
Integer reduceResult = listRDD.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer ele1, Integer ele2) throws Exception {
return ele1 + ele2;
}
});
//結(jié)果:19
System.out.println("reduceResult = " + reduceResult);
Integer treeReduceResult = listRDD.treeReduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}, 3); //這個3表示做3次聚合才計算出結(jié)果
//結(jié)果:19
System.out.println("treeReduceResult = " + treeReduceResult);它們倆的結(jié)果是一樣的,但是執(zhí)行流程不一樣,如下流程:

如果分區(qū)數(shù)太多的話,使用treeReduce做多次聚合,可以提高性能,如下:

六、fold
fold其實和reduce的功能類似,只不過fold多了一個初始值而已
//和reduce的功能類似,只不過是在計算每一個分區(qū)的時候需要加上初始值0,最后再將每一個分區(qū)計算出來的值相加再加上這個初始值
Integer foldResult = listRDD.fold(0, new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
//結(jié)果:19
System.out.println("foldResult = " + foldResult);七、aggregate 和 treeAggregate
//先初始化一個我們想要的返回的數(shù)據(jù)類型的初始值
//然后在每一個分區(qū)對每一個元素應用函數(shù)一(acc, value) => (acc._1 + value, acc._2 + 1)進行聚合
//最后將每一個分區(qū)生成的數(shù)據(jù)應用函數(shù)(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)進行聚合
Tuple2 aggregateResult = listRDD.aggregate(new Tuple2<Integer, Integer>(0, 0),
new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc, Integer integer) throws Exception {
return new Tuple2<>(acc._1 + integer, acc._2 + 1);
}
}, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
return new Tuple2<>(acc1._1 + acc2._1, acc1._2 + acc2._2);
}
});
//結(jié)果:(19,6)
System.out.println("aggregateResult = " + aggregateResult);
Tuple2 treeAggregateResult = listRDD.treeAggregate(new Tuple2<Integer, Integer>(0, 0),
new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc, Integer integer) throws Exception {
return new Tuple2<>(acc._1 + integer, acc._2 + 1);
}
}, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
return new Tuple2<>(acc1._1 + acc2._1, acc1._2 + acc2._2);
}
}, 2);
//結(jié)果:(19,6)
System.out.println("treeAggregateResult = " + treeAggregateResult);兩者的結(jié)果是一致的,只不過執(zhí)行流程不一樣,如下是aggregate的執(zhí)行流程:

如果RDD的分區(qū)數(shù)非常多的話,建議使用treeAggregate,如下是treeAggregate的執(zhí)行流程:

aggregate和treeAggregate的比較:
1: aggregate在combine上的操作,時間復雜度為O(n). treeAggregate的時間復雜度為O(lgn)。
n表示分區(qū)數(shù)
2: aggregate把數(shù)據(jù)全部拿到driver端,存在內(nèi)存溢出的風險。treeAggregate則不會。
3:aggregate 比 treeAggregate在最后結(jié)果的reduce操作時,多使用了一次初始值
對于以上api的原理層面的講解,可以參考spark core RDD api原理詳解,因為用文字講清楚原理性的東西是一件比較困難的事情,看了后記得也不深入
網(wǎng)頁題目:spark2.x由淺入深深到底系列六之RDDjavaapi詳解二
分享路徑:http://chinadenli.net/article44/pgjhee.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供靜態(tài)網(wǎng)站、動態(tài)網(wǎng)站、品牌網(wǎng)站建設、關(guān)鍵詞優(yōu)化、云服務器、網(wǎng)頁設計公司
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)