本篇內(nèi)容主要講解“flinksql怎么將數(shù)據(jù)寫入到文件中”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“flinksql怎么將數(shù)據(jù)寫入到文件中”吧!

讓客戶滿意是我們工作的目標,不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務項目有:域名與空間、虛擬主機、營銷軟件、網(wǎng)站建設、長嶺網(wǎng)站維護、網(wǎng)站推廣。
package com.jd.dataoutput;
import com.jd.data.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
public class FlinkSqlOutputFile {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> stream = env.readTextFile("/Users/liuhaijing/Desktop/flinktestword/aaa.txt");
// DataStreamSource<String> stream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<SensorReading> map = stream.map(new MapFunction<String, SensorReading>() {
public SensorReading map(String s) throws Exception {
String[] split = s.split(",");
return new SensorReading(split[0], split[1], split[2]);
}
});
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 使用 table api
Table table = tableEnv.fromDataStream(map);
// table.printSchema();
Table select = table.select("a,b");
// select.printSchema();
// 使用 sql api
// tableEnv.createTemporaryView("test", map);
// Table select = tableEnv.sqlQuery(" select a, b from test");
// select.printSchema();
// DataStream<SensorReading2> sensorReading2DataStream = tableEnv.toAppendStream(select, SensorReading2.class);
// sensorReading2DataStream.map(new MapFunction<SensorReading2, Object>() {
// @Override
// public Object map(SensorReading2 value) throws Exception {
// System.out.println(value.a+" "+ value.b);
// return null;
// }
// });
// tableEnv.connect(new FileSystem().path("/Users/liuhaijing/IdeaProjects/haijing3/spark/flinksqldemo/output/out.txt"))
// .withFormat(new Csv())
// .withSchema(
// new Schema()
// .field("a", DataTypes.STRING())
// .field("b", DataTypes.STRING()))
// .inAppendMode()
// .createTemporaryTable("outputTable");
// select.insertInto("outputTable");
tableEnv.connect(new FileSystem().path("/Users/liuhaijing/IdeaProjects/haijing3/spark/flinksqldemo/output/out.txt"))
.withFormat(new OldCsv())
.withSchema(new Schema()
.field("a", DataTypes.STRING())
).inAppendMode()
.createTemporaryTable("outputTable");
select.insertInto("outputTable");
env.execute();
}
}到此,相信大家對“flinksql怎么將數(shù)據(jù)寫入到文件中”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學習!
名稱欄目:flinksql怎么將數(shù)據(jù)寫入到文件中
分享地址:http://chinadenli.net/article40/jggheo.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站改版、全網(wǎng)營銷推廣、虛擬主機、云服務器、定制網(wǎng)站、網(wǎng)站制作
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)