欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

FlinkReduce怎么用

這篇文章主要講解了“Flink Reduce怎么用”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Flink Reduce怎么用”吧!

創(chuàng)新互聯(lián)建站是專業(yè)的市中網(wǎng)站建設(shè)公司,市中接單;提供成都網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)、外貿(mào)網(wǎng)站建設(shè),網(wǎng)頁(yè)設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行市中網(wǎng)站開發(fā)網(wǎng)頁(yè)制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來合作!

Reduce算子:對(duì)數(shù)據(jù)流進(jìn)行滾動(dòng)聚合計(jì)算,并返回每次滾動(dòng)聚合計(jì)算合并后的結(jié)果

示例環(huán)境

java.version: 1.8.x
flink.version: 1.11.1

示例數(shù)據(jù)源 (項(xiàng)目碼云下載)

Flink 系例 之 搭建開發(fā)環(huán)境與數(shù)據(jù)

Reduce.java

import com.flink.examples.DataSource;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;

/**
 * @Description Reduce算子:對(duì)數(shù)據(jù)流進(jìn)行滾動(dòng)聚合計(jì)算,并返回每次滾動(dòng)聚合計(jì)算合并后的結(jié)果
 */
public class Reduce {

    /**
     * 遍歷集合,分區(qū)打印每一次滾動(dòng)聚合的結(jié)果
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        List<Tuple3<String,String,Integer>> tuple3List = DataSource.getTuple3ToList();
        //注意:使用Integer進(jìn)行分區(qū)時(shí),會(huì)導(dǎo)致分區(qū)結(jié)果不對(duì),轉(zhuǎn)換成String類型輸出key即可正確輸出
        KeyedStream<Tuple3<String,String,Integer>, String> keyedStream = env.fromCollection(tuple3List).keyBy(new KeySelector<Tuple3<String,String,Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> tuple3) throws Exception {
                //f1為性別字段,以相同f1值(性別)進(jìn)行分區(qū)
                return String.valueOf(tuple3.f1);
            }
        });
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> result =  keyedStream.reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
            @Override
            public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> t0, Tuple3<String, String, Integer> t1) throws Exception {
                int totalAge = t0.f2 + t1.f2;
                return new Tuple3<>("", t0.f1, totalAge);
            }
        });
        result.print();
        env.execute("flink Reduce job");
    }
}

打印結(jié)果

## 說明:為什么每一個(gè)分區(qū)的第一個(gè)數(shù)據(jù)對(duì)象每一個(gè)參數(shù)有值,是因?yàn)闈L動(dòng)聚合返回的是從第二數(shù)據(jù)對(duì)象向前疊加第一個(gè)數(shù)據(jù)對(duì)象,開始計(jì)算,所以第一個(gè)數(shù)據(jù)對(duì)象根本就不進(jìn)入reduce方法;
2> (張三,man,20)
2> (,man,49)
2> (,man,79)
4> (李四,girl,24)
4> (,girl,56)
4> (,girl,74)

感謝各位的閱讀,以上就是“Flink Reduce怎么用”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)Flink Reduce怎么用這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

本文標(biāo)題:FlinkReduce怎么用
標(biāo)題來源:http://chinadenli.net/article46/gigohg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站制作建站公司、電子商務(wù)、云服務(wù)器、靜態(tài)網(wǎng)站、全網(wǎng)營(yíng)銷推廣

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站托管運(yùn)營(yíng)