欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

Hadoop系列(三)——分布式計(jì)算框架MapReduce

一、MapReduce概述

Hadoop MapReduce 是一個(gè)分布式計(jì)算框架,用于編寫(xiě)批處理應(yīng)用程序。編寫(xiě)好的程序可以提交到 Hadoop 集群上用于并行處理大規(guī)模的數(shù)據(jù)集。

創(chuàng)新互聯(lián)建站網(wǎng)站建設(shè)提供從項(xiàng)目策劃、軟件開(kāi)發(fā),軟件安全維護(hù)、網(wǎng)站優(yōu)化(SEO)、網(wǎng)站分析、效果評(píng)估等整套的建站服務(wù),主營(yíng)業(yè)務(wù)為網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì),app軟件定制開(kāi)發(fā)以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠(chéng)的服務(wù)。創(chuàng)新互聯(lián)建站深信只要達(dá)到每一位用戶的要求,就會(huì)得到認(rèn)可,從而選擇與我們長(zhǎng)期合作。這樣,我們也可以走得更遠(yuǎn)!

MapReduce 作業(yè)通過(guò)將輸入的數(shù)據(jù)集拆分為獨(dú)立的塊,這些塊由 map 以并行的方式處理,框架對(duì) map 的輸出進(jìn)行排序,然后輸入到 reduce 中。MapReduce 框架專門(mén)用于 <key,value> 鍵值對(duì)處理,它將作業(yè)的輸入視為一組 <key,value> 對(duì),并生成一組 <key,value> 對(duì)作為輸出。輸出和輸出的 keyvalue 都必須實(shí)現(xiàn)Writable 接口。

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

二、MapReduce編程模型簡(jiǎn)述

這里以詞頻統(tǒng)計(jì)為例進(jìn)行說(shuō)明,MapReduce 處理的流程如下:

Hadoop 系列(三)—— 分布式計(jì)算框架 MapReduce

  1. input: 讀取文本文件;

  2. splitting: 將文件按照行進(jìn)行拆分,此時(shí)得到的 K1 行數(shù),V1 表示對(duì)應(yīng)行的文本內(nèi)容;

  3. mapping: 并行將每一行按照空格進(jìn)行拆分,拆分得到的 List(K2,V2),其中 K2 代表每一個(gè)單詞,由于是做詞頻統(tǒng)計(jì),所以 V2 的值為 1,代表出現(xiàn) 1 次;
  4. shuffling:由于 Mapping 操作可能是在不同的機(jī)器上并行處理的,所以需要通過(guò) shuffling 將相同 key 值的數(shù)據(jù)分發(fā)到同一個(gè)節(jié)點(diǎn)上去合并,這樣才能統(tǒng)計(jì)出最終的結(jié)果,此時(shí)得到 K2 為每一個(gè)單詞,List(V2) 為可迭代集合,V2 就是 Mapping 中的 V2;
  5. Reducing: 這里的案例是統(tǒng)計(jì)單詞出現(xiàn)的總次數(shù),所以 Reducing 對(duì) List(V2) 進(jìn)行歸約求和操作,最終輸出。

MapReduce 編程模型中 splittingshuffing 操作都是由框架實(shí)現(xiàn)的,需要我們自己編程實(shí)現(xiàn)的只有 mappingreducing,這也就是 MapReduce 這個(gè)稱呼的來(lái)源。

三、combiner & partitioner

Hadoop 系列(三)—— 分布式計(jì)算框架 MapReduce

3.1 InputFormat & RecordReaders

InputFormat 將輸出文件拆分為多個(gè) InputSplit,并由 RecordReadersInputSplit 轉(zhuǎn)換為標(biāo)準(zhǔn)的<key,value>鍵值對(duì),作為 map 的輸出。這一步的意義在于只有先進(jìn)行邏輯拆分并轉(zhuǎn)為標(biāo)準(zhǔn)的鍵值對(duì)格式后,才能為多個(gè) map 提供輸入,以便進(jìn)行并行處理。

3.2 Combiner

combinermap 運(yùn)算后的可選操作,它實(shí)際上是一個(gè)本地化的 reduce 操作,它主要是在 map 計(jì)算出中間文件后做一個(gè)簡(jiǎn)單的合并重復(fù) key 值的操作。這里以詞頻統(tǒng)計(jì)為例:

map 在遇到一個(gè) hadoop 的單詞時(shí)就會(huì)記錄為 1,但是這篇文章里 hadoop 可能會(huì)出現(xiàn) n 多次,那么 map 輸出文件冗余就會(huì)很多,因此在 reduce 計(jì)算前對(duì)相同的 key 做一個(gè)合并操作,那么需要傳輸?shù)臄?shù)據(jù)量就會(huì)減少,傳輸效率就可以得到提升。

但并非所有場(chǎng)景都適合使用 combiner,使用它的原則是 combiner 的輸出不會(huì)影響到 reduce 計(jì)算的最終輸入,例如:求總數(shù),最大值,最小值時(shí)都可以使用 combiner,但是做平均值計(jì)算則不能使用 combiner

不使用 combiner 的情況:

<div align="center"> <img width="600px" src="https://raw.githubusercontent.com/heibaiying/BigData-Notes/master/pictures/mapreduce-without-combiners.png"/> </div>
使用 combiner 的情況:

<div align="center"> <img width="600px" src="https://raw.githubusercontent.com/heibaiying/BigData-Notes/master/pictures/mapreduce-with-combiners.png"/> </div>

可以看到使用 combiner 的時(shí)候,需要傳輸?shù)?reducer 中的數(shù)據(jù)由 12keys,降低到 10keys。降低的幅度取決于你 keys 的重復(fù)率,下文詞頻統(tǒng)計(jì)案例會(huì)演示用 combiner 降低數(shù)百倍的傳輸量。

3.3 Partitioner

partitioner 可以理解成分類器,將 map 的輸出按照 key 值的不同分別分給對(duì)應(yīng)的 reducer,支持自定義實(shí)現(xiàn),下文案例會(huì)給出演示。

四、MapReduce詞頻統(tǒng)計(jì)案例

4.1 項(xiàng)目簡(jiǎn)介

這里給出一個(gè)經(jīng)典的詞頻統(tǒng)計(jì)的案例:統(tǒng)計(jì)如下樣本數(shù)據(jù)中每個(gè)單詞出現(xiàn)的次數(shù)。

Spark   HBase
Hive    Flink   Storm   Hadoop  HBase   Spark
Flink
HBase   Storm
HBase   Hadoop  Hive    Flink
HBase   Flink   Hive    Storm
Hive    Flink   Hadoop
HBase   Hive
Hadoop  Spark   HBase   Storm
HBase   Hadoop  Hive    Flink
HBase   Flink   Hive    Storm
Hive    Flink   Hadoop
HBase   Hive

為方便大家開(kāi)發(fā),我在項(xiàng)目源碼中放置了一個(gè)工具類 WordCountDataUtils,用于模擬產(chǎn)生詞頻統(tǒng)計(jì)的樣本,生成的文件支持輸出到本地或者直接寫(xiě)到 HDFS 上。

項(xiàng)目代碼下載地址:hadoop-word-count

4.2 項(xiàng)目依賴

想要進(jìn)行 MapReduce 編程,需要導(dǎo)入 hadoop-client 依賴:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>${hadoop.version}</version>
</dependency>

4.3 WordCountMapper

將每行數(shù)據(jù)按照指定分隔符進(jìn)行拆分。這里需要注意在 MapReduce 中必須使用 Hadoop 定義的類型,因?yàn)?Hadoop 預(yù)定義的類型都是可序列化,可比較的,所有類型均實(shí)現(xiàn)了 WritableComparable 接口。

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, 
                                                                      InterruptedException {
        String[] words = value.toString().split("\t");
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));
        }
    }

}

WordCountMapper 對(duì)應(yīng)下圖的 Mapping 操作:

Hadoop 系列(三)—— 分布式計(jì)算框架 MapReduce

WordCountMapper 繼承自 Mappe 類,這是一個(gè)泛型類,定義如下:

WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
   ......
}
  • KEYIN: mapping 輸入 key 的類型,即每行的偏移量 (每行第一個(gè)字符在整個(gè)文本中的位置),Long 類型,對(duì)應(yīng) Hadoop 中的 LongWritable 類型;
  • VALUEIN: mapping 輸入 value 的類型,即每行數(shù)據(jù);String 類型,對(duì)應(yīng) Hadoop 中 Text 類型;
  • KEYOUTmapping 輸出的 key 的類型,即每個(gè)單詞;String 類型,對(duì)應(yīng) Hadoop 中 Text 類型;
  • VALUEOUTmapping 輸出 value 的類型,即每個(gè)單詞出現(xiàn)的次數(shù);這里用 int 類型,對(duì)應(yīng) IntWritable 類型。

4.4 WordCountReducer

在 Reduce 中進(jìn)行單詞出現(xiàn)次數(shù)的統(tǒng)計(jì):

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, 
                                                                                  InterruptedException {
        int count = 0;
        for (IntWritable value : values) {
            count += value.get();
        }
        context.write(key, new IntWritable(count));
    }
}

如下圖,shuffling 的輸出是 reduce 的輸入。這里的 key 是每個(gè)單詞,values 是一個(gè)可迭代的數(shù)據(jù)類型,類似 (1,1,1,...)

Hadoop 系列(三)—— 分布式計(jì)算框架 MapReduce

4.4 WordCountApp

組裝 MapReduce 作業(yè),并提交到服務(wù)器運(yùn)行,代碼如下:

/**
 * 組裝作業(yè) 并提交到集群運(yùn)行
 */
public class WordCountApp {

    // 這里為了直觀顯示參數(shù) 使用了硬編碼,實(shí)際開(kāi)發(fā)中可以通過(guò)外部傳參
    private static final String HDFS_URL = "hdfs://192.168.0.107:8020";
    private static final String HADOOP_USER_NAME = "root";

    public static void main(String[] args) throws Exception {

        //  文件輸入路徑和輸出路徑由外部傳參指定
        if (args.length < 2) {
            System.out.println("Input and output paths are necessary!");
            return;
        }

        // 需要指明 hadoop 用戶名,否則在 HDFS 上創(chuàng)建目錄時(shí)可能會(huì)拋出權(quán)限不足的異常
        System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);

        Configuration configuration = new Configuration();
        // 指明 HDFS 的地址
        configuration.set("fs.defaultFS", HDFS_URL);

        // 創(chuàng)建一個(gè) Job
        Job job = Job.getInstance(configuration);

        // 設(shè)置運(yùn)行的主類
        job.setJarByClass(WordCountApp.class);

        // 設(shè)置 Mapper 和 Reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 設(shè)置 Mapper 輸出 key 和 value 的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 設(shè)置 Reducer 輸出 key 和 value 的類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 如果輸出目錄已經(jīng)存在,則必須先刪除,否則重復(fù)運(yùn)行程序時(shí)會(huì)拋出異常
        FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME);
        Path outputPath = new Path(args[1]);
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }

        // 設(shè)置作業(yè)輸入文件和輸出文件的路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, outputPath);

        // 將作業(yè)提交到群集并等待它完成,參數(shù)設(shè)置為 true 代表打印顯示對(duì)應(yīng)的進(jìn)度
        boolean result = job.waitForCompletion(true);

        // 關(guān)閉之前創(chuàng)建的 fileSystem
        fileSystem.close();

        // 根據(jù)作業(yè)結(jié)果,終止當(dāng)前運(yùn)行的 Java 虛擬機(jī),退出程序
        System.exit(result ? 0 : -1);

    }
}

需要注意的是:如果不設(shè)置 Mapper 操作的輸出類型,則程序默認(rèn)它和 Reducer 操作輸出的類型相同。

4.5 提交到服務(wù)器運(yùn)行

在實(shí)際開(kāi)發(fā)中,可以在本機(jī)配置 hadoop 開(kāi)發(fā)環(huán)境,直接在 IDE 中啟動(dòng)進(jìn)行測(cè)試。這里主要介紹一下打包提交到服務(wù)器運(yùn)行。由于本項(xiàng)目沒(méi)有使用除 Hadoop 外的第三方依賴,直接打包即可:

# mvn clean package

使用以下命令提交作業(yè):

hadoop jar /usr/appjar/hadoop-word-count-1.0.jar \
com.heibaiying.WordCountApp \
/wordcount/input.txt /wordcount/output/WordCountApp

作業(yè)完成后查看 HDFS 上生成目錄:

# 查看目錄
hadoop fs -ls /wordcount/output/WordCountApp

# 查看統(tǒng)計(jì)結(jié)果
hadoop fs -cat /wordcount/output/WordCountApp/part-r-00000

Hadoop 系列(三)—— 分布式計(jì)算框架 MapReduce

五、詞頻統(tǒng)計(jì)案例進(jìn)階之Combiner

5.1 代碼實(shí)現(xiàn)

想要使用 combiner 功能只要在組裝作業(yè)時(shí),添加下面一行代碼即可:

// 設(shè)置 Combiner
job.setCombinerClass(WordCountReducer.class);

5.2 執(zhí)行結(jié)果

加入 combiner 后統(tǒng)計(jì)結(jié)果是不會(huì)有變化的,但是可以從打印的日志看出 combiner 的效果:

沒(méi)有加入 combiner 的打印日志:

Hadoop 系列(三)—— 分布式計(jì)算框架 MapReduce

加入 combiner 后的打印日志如下:

Hadoop 系列(三)—— 分布式計(jì)算框架 MapReduce

這里我們只有一個(gè)輸入文件并且小于 128M,所以只有一個(gè) Map 進(jìn)行處理。可以看到經(jīng)過(guò) combiner 后,records 由 3519 降低為 6(樣本中單詞種類就只有 6 種),在這個(gè)用例中 combiner 就能極大地降低需要傳輸?shù)臄?shù)據(jù)量。

六、詞頻統(tǒng)計(jì)案例進(jìn)階之Partitioner

6.1 默認(rèn)的Partitioner

這里假設(shè)有個(gè)需求:將不同單詞的統(tǒng)計(jì)結(jié)果輸出到不同文件。這種需求實(shí)際上比較常見(jiàn),比如統(tǒng)計(jì)產(chǎn)品的銷量時(shí),需要將結(jié)果按照產(chǎn)品種類進(jìn)行拆分。要實(shí)現(xiàn)這個(gè)功能,就需要用到自定義 Partitioner

這里先介紹下 MapReduce 默認(rèn)的分類規(guī)則:在構(gòu)建 job 時(shí)候,如果不指定,默認(rèn)的使用的是 HashPartitioner:對(duì) key 值進(jìn)行哈希散列并對(duì) numReduceTasks 取余。其實(shí)現(xiàn)如下:

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

6.2 自定義Partitioner

這里我們繼承 Partitioner 自定義分類規(guī)則,這里按照單詞進(jìn)行分類:

public class CustomPartitioner extends Partitioner<Text, IntWritable> {

    public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
        return WordCountDataUtils.WORD_LIST.indexOf(text.toString());
    }
}

在構(gòu)建 job 時(shí)候指定使用我們自己的分類規(guī)則,并設(shè)置 reduce 的個(gè)數(shù):

// 設(shè)置自定義分區(qū)規(guī)則
job.setPartitionerClass(CustomPartitioner.class);
// 設(shè)置 reduce 個(gè)數(shù)
job.setNumReduceTasks(WordCountDataUtils.WORD_LIST.size());

6.3 執(zhí)行結(jié)果

執(zhí)行結(jié)果如下,分別生成 6 個(gè)文件,每個(gè)文件中為對(duì)應(yīng)單詞的統(tǒng)計(jì)結(jié)果:

Hadoop 系列(三)—— 分布式計(jì)算框架 MapReduce

參考資料

  1. 分布式計(jì)算框架 MapReduce
  2. Apache Hadoop 2.9.2 > MapReduce Tutorial
  3. MapReduce - Combiners

更多大數(shù)據(jù)系列文章可以參見(jiàn) GitHub 開(kāi)源項(xiàng)目大數(shù)據(jù)入門(mén)指南

標(biāo)題名稱:Hadoop系列(三)——分布式計(jì)算框架MapReduce
瀏覽地址:http://chinadenli.net/article44/gpcdhe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App開(kāi)發(fā)外貿(mào)建站、微信公眾號(hào)、ChatGPT商城網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

搜索引擎優(yōu)化
国产高清在线不卡一区| 久久福利视频在线观看| 久久综合日韩精品免费观看| 人体偷拍一区二区三区| 99国产精品国产精品九九| 中文字幕久热精品视频在线| 亚洲熟妇熟女久久精品 | 国产成人人人97超碰熟女| 亚洲免费观看一区二区三区| 国产精品丝袜一二三区| 亚洲天堂有码中文字幕视频| 熟女少妇久久一区二区三区| 中国一区二区三区不卡| 日韩一区二区三区免费av| 在线观看免费午夜福利| 亚洲综合香蕉在线视频| 日本成人三级在线播放| 国产精品成人一区二区在线| 亚洲熟妇av一区二区三区色堂| 狠狠亚洲丁香综合久久| 欧美一区二区三区视频区| 国产欧美日韩精品自拍| 国产色偷丝袜麻豆亚洲| 日本黄色美女日本黄色| 国产传媒免费观看视频| 内用黄老外示儒术出处| 日韩日韩日韩日韩在线| 久久永久免费一区二区| 高清在线精品一区二区| 色综合伊人天天综合网中文| 精品久久av一二三区| 亚洲欧美国产中文色妇| 精品国产亚洲av久一区二区三区| 91香蕉国产观看免费人人| 麻豆tv传媒在线观看| 免费在线播放一区二区| 九九热国产这里只有精品| 国产自拍欧美日韩在线观看| 太香蕉久久国产精品视频| 国产欧美日韩精品一区二| 中文字幕区自拍偷拍区|