欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

Flink入門wordCount

Flink的編程模型
1、獲取Flink上下文;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2、加載、創(chuàng)建數(shù)據(jù);
DataSet
3、數(shù)據(jù)轉(zhuǎn)換;
Transformation
4、數(shù)據(jù)結(jié)果存放;
5、觸發(fā)執(zhí)行。
env.execution

創(chuàng)新互聯(lián)公司專注于企業(yè)營銷型網(wǎng)站建設(shè)、網(wǎng)站重做改版、鄂爾多斯網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5網(wǎng)站設(shè)計(jì)、購物商城網(wǎng)站建設(shè)、集團(tuán)公司官網(wǎng)建設(shè)、外貿(mào)營銷網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為鄂爾多斯等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。

下面為flink輸出wordcount數(shù)據(jù):

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class FlinkMain {

@SuppressWarnings("serial")
public static class LineSplit implements FlatMapFunction<String,Tuple2<String, Integer>>{

    @SuppressWarnings("rawtypes")
    @Override
    /**
     * @param value 原數(shù)據(jù)
     * @param out 輸出的數(shù)據(jù)
     */
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] tokens = value.split(" ");
        for (String token : tokens) {
            if(token!=null && token.length()>0){
                Tuple2 t = new Tuple2<String, Integer>(token,1);
                out.collect(t);
            }
        }
    }

}

public static void main(String[] args) throws Exception {
    //創(chuàng)建flink上下文
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    //創(chuàng)建數(shù)據(jù)集
    DataSet<String> text = env.fromElements("to be","or no to be","is question");
    //對數(shù)據(jù)集轉(zhuǎn)換
    DataSet<Tuple2<String, Integer>> count = text.flatMap(new LineSplit());
    //輸出轉(zhuǎn)換后的數(shù)據(jù)集(print中包含了env.execute執(zhí)行)
    count.print();
    System.out.println("-----------------------");
    //對數(shù)據(jù)集分組統(tǒng)計(jì)轉(zhuǎn)換,0,1是下標(biāo),對應(yīng)Tuple2類中的參數(shù)
    count = count.groupBy(0).sum(1);
    //控制臺輸出數(shù)據(jù)集
    count.print();
    System.out.println("-----------------------");
}

}

Flink使用sql方式轉(zhuǎn)換數(shù)據(jù)
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;

public class FlinkMain2 {

@SuppressWarnings({ "unchecked", "rawtypes" })
public static void main(String[] args) throws Exception {

    //創(chuàng)建flink上下文
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

    List<WordCount> list = new ArrayList();
    String workStr = "to be or no to be is question";
    String[] tokens = workStr.split(" ");
    for (String token : tokens) {
        if(token!=null && token.length()>0){
            list.add( new WordCount(token,1));
        }
    }
    //創(chuàng)建數(shù)據(jù)集
    DataSet<WordCount> input = env.fromCollection(list);
    //注冊為數(shù)據(jù)表wordCount為數(shù)據(jù)庫表,word,frequency為wordCount表字段
    tEnv.registerDataSet("wordCount", input, "word, frequency");

    Table table = tEnv.sqlQuery(" SELECT word, SUM(frequency) as frequency FROM wordCount GROUP BY word" );

    DataSet<WordCount> res = tEnv.toDataSet(table, WordCount.class);
    //控制臺輸出
    res.print();

}

public static class WordCount    {
    public String word;
    public long frequency;
    public WordCount(){}

    public WordCount(String word, long frequency) {
        this.word = word;
        this.frequency = frequency;
    }

    @Override
    public String toString() {
        return "詞語:" + word + ",詞頻:" + frequency;
    }
}

}

本文標(biāo)題:Flink入門wordCount
本文鏈接:http://chinadenli.net/article8/jiicop.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供全網(wǎng)營銷推廣、網(wǎng)頁設(shè)計(jì)公司、網(wǎng)站建設(shè)、動態(tài)網(wǎng)站、營銷型網(wǎng)站建設(shè)、云服務(wù)器

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)