欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

使用flink將mysql數(shù)據(jù)入湖delta-創(chuàng)新互聯(lián)

使用flink將mysql數(shù)據(jù)入湖delta 1.簡(jiǎn)介

Delta數(shù)據(jù)湖原來(lái)是強(qiáng)綁定于Spark引擎,而近期社區(qū)實(shí)現(xiàn)了使用Flink引擎將數(shù)據(jù)入湖,簡(jiǎn)單寫個(gè)demo使用以下。

讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來(lái)自于我們對(duì)這個(gè)行業(yè)的熱愛。我們立志把好的技術(shù)通過(guò)有效、簡(jiǎn)單的方式提供給客戶,將通過(guò)不懈努力成為客戶在信息化領(lǐng)域值得信任、有價(jià)值的長(zhǎng)期合作伙伴,公司提供的服務(wù)項(xiàng)目有:主機(jī)域名雅安服務(wù)器托管、營(yíng)銷軟件、網(wǎng)站建設(shè)、連云網(wǎng)站維護(hù)、網(wǎng)站推廣。
  • Flink 1.13.0
  • delta 1.0.0
  • flink-mysql-cdc 2.1.0
2.Mysql入湖代碼 2.1 Flink運(yùn)行環(huán)境

設(shè)置下checkpoint的時(shí)間大小

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
2.2 構(gòu)建MysqlSouce

使用flink-cdc-mysql依賴中的方法,輸入ip,表名等直接構(gòu)建

MySqlSourcesource = MySqlSource
    .builder()
    .hostname("ip")
    .port(3306)
    .databaseList("database")
    .tableList("database.table")
    .username("username")
    .password("password")
    .deserializer(new JsonDebeziumDeserializationSchema())
    .build();
2.3 Mysql表的Schema轉(zhuǎn)變成Flink-RowType

使用flink將數(shù)據(jù)入湖時(shí),需要將source的Schema轉(zhuǎn)換成Flink的RowType

通過(guò)RowType.RowField實(shí)現(xiàn),這里定義三個(gè)字段的RowType

public static RowType getMysqlRowType(){return new RowType(Arrays.asList(
        new RowType.RowField("id", new BigIntType()),
        new RowType.RowField("name", new VarCharType(VarCharType.MAX_LENGTH)),
        new RowType.RowField("dept_id",new IntType())
    ));
}
2.4 構(gòu)建Sink

使用delta-flink依賴中的DeltaSink
.forRowData()方法,指定lakePath,hadoop-conf,rowType,生成Sink

public static org.apache.hadoop.conf.Configuration getHadoopConf() {org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
    conf.set("parquet.compression", "SNAPPY");
    return conf;
}

public static DeltaSinkcreateDeltaSink(String deltaTablePath, RowType rowType) {return DeltaSink
        .forRowData(
        new Path(deltaTablePath),
        getHadoopConf(),
        rowType).build();
}
2.5 String轉(zhuǎn)為RowData

Source端使用String類型,Sink端使用RowData類型,所以需要使用Map函數(shù)進(jìn)行一次轉(zhuǎn)換。

使用fastJson獲取每個(gè)字段的值,然后變成Flink row類型,最后使用convertor轉(zhuǎn)換為RowData

//存在于flink-table-runtime-blink_2.12依賴中 
public static final DataFormatConverters.DataFormatConverterMYSQL_CONVERTER =
            DataFormatConverters.getConverterForDataType(
                    TypeConversions.fromLogicalToDataType(getMysqlRowType())
            );

public static RowData mysqlJsonToRowData(String line){String body = JSON.parseObject(line).getString("after");
    Long id = JSON.parseObject(body).getLong("id");
    String name = JSON.parseObject(body).getString("name");
    Integer deptId = JSON.parseObject(body).getInteger("dept_id");
    Row row = Row.of(id,name,deptId);
    return MYSQL_CONVERTER.toInternal(row);
}
2.6 執(zhí)行

依次將source,sink放入env中執(zhí)行即可

env.fromSource(source, WatermarkStrategy.noWatermarks(),"demo-mysql-cdc")
    .setParallelism(2)
    //將json數(shù)據(jù)轉(zhuǎn)為FlinkRowData
    .map(FlinkDeltaUtil::mysqlJsonToRowData)
    .sinkTo(deltaSink)
    .setParallelism(1);

env.execute("flink-cdc-to-delta");
3. 源碼

倉(cāng)庫(kù)地址 (https://gitee.com/zhiling-chen/demo-mysql-flink-delta)

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧

文章題目:使用flink將mysql數(shù)據(jù)入湖delta-創(chuàng)新互聯(lián)
轉(zhuǎn)載來(lái)于:http://chinadenli.net/article46/ddieeg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供移動(dòng)網(wǎng)站建設(shè)品牌網(wǎng)站建設(shè)、微信小程序網(wǎng)站內(nèi)鏈、小程序開發(fā)企業(yè)建站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都app開發(fā)公司