欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

Flink中如何進行TableAPI、SQL與Kafka消息獲取

這篇文章將為大家詳細講解有關(guān)Flink中如何進行TableAPI 、SQL 與 Kafka 消息獲取,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

創(chuàng)新互聯(lián)技術(shù)團隊10多年來致力于為客戶提供成都網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè)、品牌網(wǎng)站設(shè)計、營銷型網(wǎng)站建設(shè)、搜索引擎SEO優(yōu)化等服務(wù)。經(jīng)過多年發(fā)展,公司擁有經(jīng)驗豐富的技術(shù)團隊,先后服務(wù)、推廣了近1000家網(wǎng)站,包括各類中小企業(yè)、企事單位、高校等機構(gòu)單位。

使用Tbale&SQL與Flink Kafka連接器從kafka的消息隊列中獲取數(shù)據(jù)

示例環(huán)境

java.version: 1.8.xflink.version: 1.11.1kafka:2.11

示例數(shù)據(jù)源 (項目碼云下載)

Flink 系例 之 搭建開發(fā)環(huán)境與數(shù)據(jù)

示例模塊 (pom.xml)

Flink 系例 之 TableAPI & SQL 與 示例模塊

SelectToKafka.java

package com.flink.examples.kafka;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @Description 使用Tbale&SQL與Flink Kafka連接器從kafka的消息隊列中獲取數(shù)據(jù)
 */
public class SelectToKafka {
    /**
     官方參考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
     開始偏移位置
     config選項scan.startup.mode指定Kafka使用者的啟動模式。有效的枚舉是:
         group-offsets:從特定消費者組的ZK / Kafka經(jīng)紀(jì)人中的承諾抵消開始。
         earliest-offset:從最早的偏移量開始。
         latest-offset:從最新的偏移量開始。
         timestamp:從每個分區(qū)的用戶提供的時間戳開始。
         specific-offsets:從每個分區(qū)的用戶提供的特定偏移量開始。
     默認選項值group-offsets表示從ZK / Kafka經(jīng)紀(jì)人中最后提交的偏移量消費

     一致性保證
     sink.semantic選項來選擇三種不同的操作模式:
         NONE:Flink不能保證任何事情。產(chǎn)生的記錄可能會丟失或可以重復(fù)。
         AT_LEAST_ONCE (默認設(shè)置):這樣可以確保不會丟失任何記錄(盡管它們可以重復(fù))。
         EXACTLY_ONCE:Kafka事務(wù)將用于提供一次精確的語義。每當(dāng)您使用事務(wù)寫入Kafka時,請不要忘記為使用Kafka記錄的任何應(yīng)用程序設(shè)置所需的設(shè)置isolation.level(read_committed 或read_uncommitted-后者是默認值)。
     */
    static String table_sql = "CREATE TABLE KafkaTable (\n" +
            "  `user_id` BIGINT,\n" +
            "  `item_id` BIGINT,\n" +
            "  `behavior` STRING,\n" +
            "  `ts` TIMESTAMP(3)\n" +
            ") WITH (\n" +
            "  'connector' = 'kafka',\n" +
            "  'topic' = 'user_behavior',\n" +
            "  'properties.bootstrap.servers' = '192.168.110.35:9092',\n" +
            "  'properties.group.id' = 'testGroup',\n" +
            "  'scan.startup.mode' = 'earliest-offset',\n" +
            "  'format' = 'json'\n" +
            ")";

    public static void main(String[] args) throws Exception {
        //構(gòu)建StreamExecutionEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //默認流時間方式
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        //構(gòu)建EnvironmentSettings 并指定Blink Planner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        //構(gòu)建StreamTableEnvironment
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
        //注冊kafka數(shù)據(jù)維表
        tEnv.executeSql(table_sql);

        String sql = "select user_id,item_id,behavior,ts from KafkaTable";
        Table table = tEnv.sqlQuery(sql);
        //打印字段結(jié)構(gòu)
        table.printSchema();

        //table 轉(zhuǎn)成 dataStream 流
        DataStream<Row> behaviorStream = tEnv.toAppendStream(table, Row.class);
        behaviorStream.print();

        env.execute();
    }
}

打印結(jié)果

root
 |-- user_id: BIGINT
 |-- item_id: BIGINT
 |-- behavior: STRING
 |-- ts: TIMESTAMP(3)

3> 1,1,normal,2021-01-26T10:25:44

關(guān)于Flink中如何進行TableAPI 、SQL 與 Kafka 消息獲取就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

文章名稱:Flink中如何進行TableAPI、SQL與Kafka消息獲取
當(dāng)前網(wǎng)址:http://chinadenli.net/article28/jiisjp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供營銷型網(wǎng)站建設(shè)商城網(wǎng)站、軟件開發(fā)、域名注冊搜索引擎優(yōu)化、Google

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)公司