這篇文章將為大家詳細講解有關(guān)Flink中如何進行TableAPI 、SQL 與 Kafka 消息獲取,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
創(chuàng)新互聯(lián)技術(shù)團隊10多年來致力于為客戶提供成都網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè)、品牌網(wǎng)站設(shè)計、營銷型網(wǎng)站建設(shè)、搜索引擎SEO優(yōu)化等服務(wù)。經(jīng)過多年發(fā)展,公司擁有經(jīng)驗豐富的技術(shù)團隊,先后服務(wù)、推廣了近1000家網(wǎng)站,包括各類中小企業(yè)、企事單位、高校等機構(gòu)單位。
使用Tbale&SQL與Flink Kafka連接器從kafka的消息隊列中獲取數(shù)據(jù)
示例環(huán)境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
示例數(shù)據(jù)源 (項目碼云下載)
Flink 系例 之 搭建開發(fā)環(huán)境與數(shù)據(jù)
示例模塊 (pom.xml)
Flink 系例 之 TableAPI & SQL 與 示例模塊
SelectToKafka.java
package com.flink.examples.kafka; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; /** * @Description 使用Tbale&SQL與Flink Kafka連接器從kafka的消息隊列中獲取數(shù)據(jù) */ public class SelectToKafka { /** 官方參考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html 開始偏移位置 config選項scan.startup.mode指定Kafka使用者的啟動模式。有效的枚舉是: group-offsets:從特定消費者組的ZK / Kafka經(jīng)紀(jì)人中的承諾抵消開始。 earliest-offset:從最早的偏移量開始。 latest-offset:從最新的偏移量開始。 timestamp:從每個分區(qū)的用戶提供的時間戳開始。 specific-offsets:從每個分區(qū)的用戶提供的特定偏移量開始。 默認選項值group-offsets表示從ZK / Kafka經(jīng)紀(jì)人中最后提交的偏移量消費 一致性保證 sink.semantic選項來選擇三種不同的操作模式: NONE:Flink不能保證任何事情。產(chǎn)生的記錄可能會丟失或可以重復(fù)。 AT_LEAST_ONCE (默認設(shè)置):這樣可以確保不會丟失任何記錄(盡管它們可以重復(fù))。 EXACTLY_ONCE:Kafka事務(wù)將用于提供一次精確的語義。每當(dāng)您使用事務(wù)寫入Kafka時,請不要忘記為使用Kafka記錄的任何應(yīng)用程序設(shè)置所需的設(shè)置isolation.level(read_committed 或read_uncommitted-后者是默認值)。 */ static String table_sql = "CREATE TABLE KafkaTable (\n" + " `user_id` BIGINT,\n" + " `item_id` BIGINT,\n" + " `behavior` STRING,\n" + " `ts` TIMESTAMP(3)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'user_behavior',\n" + " 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'format' = 'json'\n" + ")"; public static void main(String[] args) throws Exception { //構(gòu)建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //默認流時間方式 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //構(gòu)建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //構(gòu)建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //注冊kafka數(shù)據(jù)維表 tEnv.executeSql(table_sql); String sql = "select user_id,item_id,behavior,ts from KafkaTable"; Table table = tEnv.sqlQuery(sql); //打印字段結(jié)構(gòu) table.printSchema(); //table 轉(zhuǎn)成 dataStream 流 DataStream<Row> behaviorStream = tEnv.toAppendStream(table, Row.class); behaviorStream.print(); env.execute(); } }
打印結(jié)果
root |-- user_id: BIGINT |-- item_id: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3) 3> 1,1,normal,2021-01-26T10:25:44
關(guān)于Flink中如何進行TableAPI 、SQL 與 Kafka 消息獲取就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
文章名稱:Flink中如何進行TableAPI、SQL與Kafka消息獲取
當(dāng)前網(wǎng)址:http://chinadenli.net/article28/jiisjp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供營銷型網(wǎng)站建設(shè)、商城網(wǎng)站、軟件開發(fā)、域名注冊、搜索引擎優(yōu)化、Google
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)