Delta數(shù)據(jù)湖原來(lái)是強(qiáng)綁定于Spark引擎,而近期社區(qū)實(shí)現(xiàn)了使用Flink引擎將數(shù)據(jù)入湖,簡(jiǎn)單寫個(gè)demo使用以下。
設(shè)置下checkpoint的時(shí)間大小
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
2.2 構(gòu)建MysqlSouce使用flink-cdc-mysql依賴中的方法,輸入ip,表名等直接構(gòu)建
MySqlSourcesource = MySqlSource
.builder()
.hostname("ip")
.port(3306)
.databaseList("database")
.tableList("database.table")
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
2.3 Mysql表的Schema轉(zhuǎn)變成Flink-RowType使用flink將數(shù)據(jù)入湖時(shí),需要將source的Schema轉(zhuǎn)換成Flink的RowType
通過(guò)RowType.RowField實(shí)現(xiàn),這里定義三個(gè)字段的RowType
public static RowType getMysqlRowType(){return new RowType(Arrays.asList(
new RowType.RowField("id", new BigIntType()),
new RowType.RowField("name", new VarCharType(VarCharType.MAX_LENGTH)),
new RowType.RowField("dept_id",new IntType())
));
}
2.4 構(gòu)建Sink使用delta-flink依賴中的DeltaSink
.forRowData()方法,指定lakePath,hadoop-conf,rowType,生成Sink
public static org.apache.hadoop.conf.Configuration getHadoopConf() {org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set("parquet.compression", "SNAPPY");
return conf;
}
public static DeltaSinkcreateDeltaSink(String deltaTablePath, RowType rowType) {return DeltaSink
.forRowData(
new Path(deltaTablePath),
getHadoopConf(),
rowType).build();
}
2.5 String轉(zhuǎn)為RowDataSource端使用String類型,Sink端使用RowData類型,所以需要使用Map函數(shù)進(jìn)行一次轉(zhuǎn)換。
使用fastJson獲取每個(gè)字段的值,然后變成Flink row類型,最后使用convertor轉(zhuǎn)換為RowData
//存在于flink-table-runtime-blink_2.12依賴中
public static final DataFormatConverters.DataFormatConverterMYSQL_CONVERTER =
DataFormatConverters.getConverterForDataType(
TypeConversions.fromLogicalToDataType(getMysqlRowType())
);
public static RowData mysqlJsonToRowData(String line){String body = JSON.parseObject(line).getString("after");
Long id = JSON.parseObject(body).getLong("id");
String name = JSON.parseObject(body).getString("name");
Integer deptId = JSON.parseObject(body).getInteger("dept_id");
Row row = Row.of(id,name,deptId);
return MYSQL_CONVERTER.toInternal(row);
}
2.6 執(zhí)行依次將source,sink放入env中執(zhí)行即可
env.fromSource(source, WatermarkStrategy.noWatermarks(),"demo-mysql-cdc")
.setParallelism(2)
//將json數(shù)據(jù)轉(zhuǎn)為FlinkRowData
.map(FlinkDeltaUtil::mysqlJsonToRowData)
.sinkTo(deltaSink)
.setParallelism(1);
env.execute("flink-cdc-to-delta");
3. 源碼倉(cāng)庫(kù)地址 (https://gitee.com/zhiling-chen/demo-mysql-flink-delta)
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧
文章題目:使用flink將mysql數(shù)據(jù)入湖delta-創(chuàng)新互聯(lián)
轉(zhuǎn)載來(lái)于:http://chinadenli.net/article46/ddieeg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供移動(dòng)網(wǎng)站建設(shè)、品牌網(wǎng)站建設(shè)、微信小程序、網(wǎng)站內(nèi)鏈、小程序開發(fā)、企業(yè)建站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容