欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

Flink中Transform怎么用

小編給大家分享一下Flink中Transform怎么用,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

創(chuàng)新互聯(lián)是一家專注于網(wǎng)站制作、成都網(wǎng)站制作與策劃設計,山陽網(wǎng)站建設哪家好?創(chuàng)新互聯(lián)做網(wǎng)站,專注于網(wǎng)站建設十多年,網(wǎng)設計領域的專業(yè)建站公司;建站業(yè)務涵蓋:山陽等地區(qū)。山陽做網(wǎng)站價格咨詢:13518219792

分組聚合
  String path = "E:\\GIT\\flink-learn\\flink-learn\\telemetering.txt";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        TupleTypeInfo<Tuple3<String, Double, Long>> typeInfo = new TupleTypeInfo<>(Types.STRING, Types.DOUBLE, Types.LONG);

        TupleCsvInputFormat<Tuple3<String, Double, Long>> tupleCsvInputFormat =
                new TupleCsvInputFormat<>(new Path(path), typeInfo);

        DataStreamSource<Tuple3<String, Double, Long>> dataStreamSource = env.createInput(tupleCsvInputFormat, typeInfo);
        //或   DataStreamSource<Tuple2<String, Double>> dataStreamSource = env.readFile(tupleCsvInputFormat, path);

        SingleOutputStreamOperator<Tuple3<String, Double, Long>> operator = dataStreamSource
                .filter(Objects::nonNull)
//                .map()
//                .flatMap()
//                .keyBy(0)
                .keyBy(tuple -> tuple.f0)
                .minBy(1);
//                .min()
//                .max(1);
//                .maxBy(1, false);
//                .sum(1);
//                .reduce();
//                .process();
        operator.print().setParallelism(1);
        env.execute();
分流/合流
String path = "E:\\GIT\\flink-learn\\flink-learn\\telemetering.txt";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        PojoTypeInfo<TelemeterDTO> typeInfo = (PojoTypeInfo<TelemeterDTO>) Types.POJO(TelemeterDTO.class);
        PojoCsvInputFormat<TelemeterDTO> inputFormat = new PojoCsvInputFormat<>(new Path(path), typeInfo, new String[]{"code", "value", "timestamp"});
        DataStreamSource<TelemeterDTO> dataStreamSource = env.createInput(inputFormat, typeInfo);

        //分流
        SplitStream<TelemeterDTO> splitStream = dataStreamSource
                .split(item -> {
                    if (item.getValue() > 100) {
                        return Collections.singletonList("high");
                    }
                    return Collections.singletonList("low");
                });

        DataStream<TelemeterDTO> highStream = splitStream.select("high");
        DataStream<TelemeterDTO> lowStream = splitStream.select("low");

        //合流
        ConnectedStreams<TelemeterDTO, TelemeterDTO> connectedStreams = lowStream.connect(highStream);
//        DataStream<TelemeterDTO> unionDataStream = lowStream.union(highStream); //需要類型一致

        SingleOutputStreamOperator<Tuple3<String, Double, Long>> operator = connectedStreams
                .map(new CoMapFunction<TelemeterDTO, TelemeterDTO, Tuple3<String, Double, Long>>() {
                    @Override
                    public Tuple3<String, Double, Long> map1(TelemeterDTO value) {
                        return Tuple3.of(value.getCode(), value.getValue(), value.getTimestamp());
                    }

                    @Override
                    public Tuple3<String, Double, Long> map2(TelemeterDTO value) {
                        return Tuple3.of(value.getCode(), value.getValue(), value.getTimestamp());
                    }
                });

        operator.print();
        env.execute();
UDF函數(shù),提供底層支持
  • MapFunction

  • FilterFunction

  • ReduceFunction

  • ProcessFunction

  • SourceFunction

  • SinkFunction

富函數(shù)

富函數(shù) 包含了生命周期,及上下文相關信息,如

  • open() 可以在算子創(chuàng)建之初建立數(shù)據(jù)庫連接

  • close() 在在算子生命結束之前關閉資源

以上是“Flink中Transform怎么用”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道!

文章名稱:Flink中Transform怎么用
標題來源:http://chinadenli.net/article6/ieihog.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供定制網(wǎng)站、網(wǎng)站導航、企業(yè)建站、面包屑導航、云服務器、網(wǎng)站設計公司

廣告

聲明:本網(wǎng)站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

小程序開發(fā)
欧美区一区二区在线观看| 日本欧美在线一区二区三区| 白白操白白在线免费观看| 欧美人妻盗摄日韩偷拍| 亚洲精品福利入口在线| 人妻久久一区二区三区精品99| 欧美成人免费一级特黄| 中文文精品字幕一区二区| 日本av一区二区不卡| 日韩欧美中文字幕人妻| 日韩一区二区免费在线观看| 黄色av尤物白丝在线播放网址| 欧美日韩国产欧美日韩| 日本丰满大奶熟女一区二区| 日韩无套内射免费精品| 在线免费视频你懂的观看| 男生和女生哪个更好色| 国产一区二区三区成人精品| 夜色福利久久精品福利| 中日韩美女黄色一级片| 亚洲精品日韩欧美精品| 亚洲第一区欧美日韩在线| 91蜜臀精品一区二区三区| 日本黄色录像韩国黄色录像| 国产又长又粗又爽免费视频| 91亚洲精品综合久久| 97精品人妻一区二区三区麻豆| 国产综合欧美日韩在线精品| 日韩精品第一区二区三区| 亚洲精品中文字幕一二三| 东京热一二三区在线免| 丰满人妻一二区二区三区av| 国产欧美日产中文一区| 午夜资源在线观看免费高清| 亚洲欧美日韩网友自拍| 国产一区二区三区四区中文| 成人欧美精品一区二区三区 | 国产av天堂一区二区三区粉嫩| 欧美丝袜诱惑一区二区| 国产熟女一区二区三区四区| 久久精品a毛片看国产成人|