Flink的編程模型
1、獲取Flink上下文;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2、加載、創(chuàng)建數(shù)據(jù);
DataSet
3、數(shù)據(jù)轉(zhuǎn)換;
Transformation
4、數(shù)據(jù)結(jié)果存放;
5、觸發(fā)執(zhí)行。
env.execution
創(chuàng)新互聯(lián)公司專注于企業(yè)營銷型網(wǎng)站建設(shè)、網(wǎng)站重做改版、鄂爾多斯網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5網(wǎng)站設(shè)計(jì)、購物商城網(wǎng)站建設(shè)、集團(tuán)公司官網(wǎng)建設(shè)、外貿(mào)營銷網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為鄂爾多斯等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。
下面為flink輸出wordcount數(shù)據(jù):
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class FlinkMain {
@SuppressWarnings("serial")
public static class LineSplit implements FlatMapFunction<String,Tuple2<String, Integer>>{
@SuppressWarnings("rawtypes")
@Override
/**
* @param value 原數(shù)據(jù)
* @param out 輸出的數(shù)據(jù)
*/
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens = value.split(" ");
for (String token : tokens) {
if(token!=null && token.length()>0){
Tuple2 t = new Tuple2<String, Integer>(token,1);
out.collect(t);
}
}
}
}
public static void main(String[] args) throws Exception {
//創(chuàng)建flink上下文
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//創(chuàng)建數(shù)據(jù)集
DataSet<String> text = env.fromElements("to be","or no to be","is question");
//對數(shù)據(jù)集轉(zhuǎn)換
DataSet<Tuple2<String, Integer>> count = text.flatMap(new LineSplit());
//輸出轉(zhuǎn)換后的數(shù)據(jù)集(print中包含了env.execute執(zhí)行)
count.print();
System.out.println("-----------------------");
//對數(shù)據(jù)集分組統(tǒng)計(jì)轉(zhuǎn)換,0,1是下標(biāo),對應(yīng)Tuple2類中的參數(shù)
count = count.groupBy(0).sum(1);
//控制臺輸出數(shù)據(jù)集
count.print();
System.out.println("-----------------------");
}
}
Flink使用sql方式轉(zhuǎn)換數(shù)據(jù)
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
public class FlinkMain2 {
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void main(String[] args) throws Exception {
//創(chuàng)建flink上下文
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
List<WordCount> list = new ArrayList();
String workStr = "to be or no to be is question";
String[] tokens = workStr.split(" ");
for (String token : tokens) {
if(token!=null && token.length()>0){
list.add( new WordCount(token,1));
}
}
//創(chuàng)建數(shù)據(jù)集
DataSet<WordCount> input = env.fromCollection(list);
//注冊為數(shù)據(jù)表wordCount為數(shù)據(jù)庫表,word,frequency為wordCount表字段
tEnv.registerDataSet("wordCount", input, "word, frequency");
Table table = tEnv.sqlQuery(" SELECT word, SUM(frequency) as frequency FROM wordCount GROUP BY word" );
DataSet<WordCount> res = tEnv.toDataSet(table, WordCount.class);
//控制臺輸出
res.print();
}
public static class WordCount {
public String word;
public long frequency;
public WordCount(){}
public WordCount(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return "詞語:" + word + ",詞頻:" + frequency;
}
}
}
本文標(biāo)題:Flink入門wordCount
本文鏈接:http://chinadenli.net/article8/jiicop.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供全網(wǎng)營銷推廣、網(wǎng)頁設(shè)計(jì)公司、網(wǎng)站建設(shè)、動態(tài)網(wǎng)站、營銷型網(wǎng)站建設(shè)、云服務(wù)器
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)