欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

FlinkSQL如何連接Hive并寫(xiě)入/讀取數(shù)據(jù)

這篇文章主要介紹Flink SQL如何連接Hive并寫(xiě)入/讀取數(shù)據(jù),文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!

成都創(chuàng)新互聯(lián)公司是一家專(zhuān)業(yè)提供廣德企業(yè)網(wǎng)站建設(shè),專(zhuān)注與成都網(wǎng)站建設(shè)、成都網(wǎng)站制作、HTML5建站、小程序制作等業(yè)務(wù)。10年已為廣德眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專(zhuān)業(yè)網(wǎng)站設(shè)計(jì)公司優(yōu)惠進(jìn)行中。

1. 添加依賴(lài)

    <properties>
        <flink.version>1.11.2</flink.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--       添加flink table api 集成Hive的依賴(lài)-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Hive Dependency -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.6.5-7.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.4.0</version>
        </dependency>
    </dependencies>

2. 創(chuàng)建blink版本的批處理Table執(zhí)行環(huán)境

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
  • 經(jīng)過(guò)實(shí)際測(cè)試,目前HiveTableSink 不支持流式寫(xiě)入(未實(shí)現(xiàn) AppendStreamTableSink),必須是批處理環(huán)境才可以往hive里面寫(xiě)入數(shù)據(jù),而不能將流式數(shù)據(jù)寫(xiě)入hive。例如將kafka創(chuàng)建一張臨時(shí)表,然后將表中的數(shù)據(jù)流持續(xù)插入hive,這是不可以的,官網(wǎng)上1.11版本通過(guò)flink sql-client可以實(shí)現(xiàn)hive的流式寫(xiě)入,還有待驗(yàn)證。

3. 連接文件系統(tǒng),創(chuàng)建hive catalog,對(duì)表進(jìn)行操作,類(lèi)似于Spark on Hive,flink可以直接獲取Hive的元數(shù)據(jù),并使用flink進(jìn)行計(jì)算。

        // 連接外部文件
        bbTableEnv.connect(new FileSystem().path("file:///E:/d.txt"))
                .withFormat(new Csv().fieldDelimiter(','))
                .withSchema(new Schema().field("id", DataTypes.STRING()))
                .createTemporaryTable("output");

        // 設(shè)置 hive 方言
        bbTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        // 獲取hive-site.xml目錄
        String hiveConfDir = Thread.currentThread().getContextClassLoader().getResource("").getPath().substring(1);
        HiveCatalog hive = new HiveCatalog("hive", "warningplatform", hiveConfDir);
        bbTableEnv.registerCatalog("hive", hive);

        bbTableEnv.useCatalog("hive");
        bbTableEnv.useDatabase("warningplatform");

        bbTableEnv.executeSql("insert into  test select id from    default_catalog.default_database.output");
  • 通過(guò)bbTableEnv.connect()去創(chuàng)建臨時(shí)表的方式已經(jīng)過(guò)時(shí)了,建議使用bbTableEnv.executeSql()的方式,通過(guò)DDL去創(chuàng)建臨時(shí)表,臨時(shí)表到底是屬于哪一個(gè)catalog目前還不太確定,到底是什么規(guī)則目前還不清楚。 查資料得知,臨時(shí)表與單個(gè)Flink會(huì)話(huà)的生命周期相關(guān),臨時(shí)表始終存儲(chǔ)在內(nèi)存中。 永久表需要一個(gè)catalog來(lái)管理表對(duì)應(yīng)的元數(shù)據(jù),比如hive metastore,該表將一直存在,直到明確刪除該表為止。 因此猜測(cè):default_catalog是存儲(chǔ)在內(nèi)存中,如果在切換成hive catalog之前創(chuàng)建臨時(shí)表,那我們就可以使用default_catalog.default_database.tableName來(lái)獲取這個(gè)臨時(shí)表。 如果切換了catalog再去創(chuàng)建臨時(shí)表,那我們就無(wú)法獲取到臨時(shí)表了,因?yàn)樗辉赿efault_catalog中,而且保存在內(nèi)存里面,直接查詢(xún)臨時(shí)表會(huì)去當(dāng)前的catalog里面去查找臨時(shí)表,因此一定要在default_catalog 里面創(chuàng)建臨時(shí)表。 而臨時(shí)視圖好像是存儲(chǔ)在當(dāng)前的catalog里面

  • 通過(guò)bbTableEnv.createTemporaryView()創(chuàng)建的視圖則是屬于當(dāng)前的database的

    bbTableEnv.createTemporaryView("output",bbTableEnv.sqlQuery("select * from default_catalog.default_database.output"));

  • 注意1.11版本的執(zhí)行sql的方法發(fā)生了改變,通過(guò)執(zhí)行環(huán)境的executeSql(),executeInsert()等來(lái)進(jìn)行插入或者執(zhí)行sql語(yǔ)句

以上是“Flink SQL如何連接Hive并寫(xiě)入/讀取數(shù)據(jù)”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!

分享題目:FlinkSQL如何連接Hive并寫(xiě)入/讀取數(shù)據(jù)
本文網(wǎng)址:http://chinadenli.net/article16/iecpdg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)建站、網(wǎng)站制作小程序開(kāi)發(fā)、網(wǎng)站維護(hù)網(wǎng)站營(yíng)銷(xiāo)、手機(jī)網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)