欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

FlinkSQL中窗口的功能及實(shí)例用法

這篇文章主要介紹“FlinkSQL中窗口的功能及實(shí)例用法”,在日常操作中,相信很多人在FlinkSQL中窗口的功能及實(shí)例用法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”FlinkSQL中窗口的功能及實(shí)例用法”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

創(chuàng)新互聯(lián)公司是一家專業(yè)從事網(wǎng)站建設(shè)、做網(wǎng)站的網(wǎng)絡(luò)公司。作為專業(yè)網(wǎng)站設(shè)計(jì)公司,創(chuàng)新互聯(lián)公司依托的技術(shù)實(shí)力、以及多年的網(wǎng)站運(yùn)營經(jīng)驗(yàn),為您提供專業(yè)的成都網(wǎng)站建設(shè)、成都營銷網(wǎng)站建設(shè)及網(wǎng)站設(shè)計(jì)開發(fā)服務(wù)!

前言

時(shí)間語義,要配合窗口操作才能發(fā)揮作用。最主要的用途,當(dāng)然就是開窗口、根據(jù)時(shí)間段做計(jì)算了。下面我們就來看看 Table API 和 SQL  中,怎么利用時(shí)間字段做窗口操作。在 Table API 和 SQL 中,主要有兩種窗口:Group Windows 和 Over  Windows

一、分組窗口(Group Windows) 分組窗口(Group  Windows)會根據(jù)時(shí)間或行計(jì)數(shù)間隔,將行聚合到有限的組(Group)中,并對每個(gè)組的數(shù)據(jù)執(zhí)行一次聚合函數(shù)。 Table API 中的 Group  Windows 都是使用.window(w:GroupWindow)子句定義的,并且必須由 as 子句指定一個(gè)別名。為了按窗口對表進(jìn)行分組,窗口的別名必須在  group by 子句中,像常規(guī)的分組字段一樣引用。例子:

val table = input .window([w: GroupWindow] as 'w) .groupBy('w, 'a) .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count)

Table API 提供了一組具有特定語義的預(yù)定義 Window 類,這些類會被轉(zhuǎn)換為底層DataStream 或 DataSet 的窗口操作。

Table API 支持的窗口定義,和我們熟悉的一樣,主要也是三種:滾動(dòng)(Tumbling)、滑動(dòng)(Sliding和 會話(Session)。

1.1 滾動(dòng)窗口

滾動(dòng)窗口(Tumbling windows)要用 Tumble 類來定義,另外還有三個(gè)方法:

  • over:定義窗口長度

  • on:用來分組(按時(shí)間間隔)或者排序(按行數(shù))的時(shí)間字段

  • as:別名,必須出現(xiàn)在后面的 groupBy 中

實(shí)現(xiàn)案例

1.需求

設(shè)置滾動(dòng)窗口為10秒鐘統(tǒng)計(jì)id出現(xiàn)的次數(shù)。

2.數(shù)據(jù)準(zhǔn)備

sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9

3.代碼實(shí)現(xiàn)

package windows  import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{EnvironmentSettings, Table, Tumble} import org.apache.flink.types.Row  /**  * @Package Windows  * @File :FlinkSQLTumBlingTie.java  * @author 大數(shù)據(jù)老哥  * @date 2020/12/25 21:58  * @version V1.0  *          設(shè)置滾動(dòng)窗口  */ object FlinkSQLTumBlingTie {   def main(args: Array[String]): Unit = {     val env = StreamExecutionEnvironment.getExecutionEnvironment     env.setParallelism(1)     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)      val settings = EnvironmentSettings.newInstance()       .useBlinkPlanner()       .inStreamingMode()       .build()     val tableEnv = StreamTableEnvironment.create(env, settings)      // 讀取數(shù)據(jù)     val inputPath = "./data/sensor.txt"     val inputStream = env.readTextFile(inputPath)          // 先轉(zhuǎn)換成樣例類類型(簡單轉(zhuǎn)換操作)     val dataStream = inputStream       .map(data => {         val arr = data.split(",")         SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)       })       .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {         override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L       })      val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)     // 注冊表     tableEnv.createTemporaryView("sensor", sensorTable)     // table 實(shí)現(xiàn)     val resultTable = sensorTable       .window(Tumble over 10.seconds on 'ts as 'tw) // 每10秒統(tǒng)計(jì)一次,滾動(dòng)時(shí)間窗口       .groupBy('id, 'tw)       .select('id, 'id.count, 'tw.end)     //sql 實(shí)現(xiàn)     val sqlTable = tableEnv.sqlQuery(       """         |select         |id,         |count(id) ,         |tumble_end(ts,interval '10' second)         |from sensor         |group by         |id,         |tumble(ts,interval '10' second)         |""".stripMargin)      /***      * .window(Tumble over 10.minutes on 'rowtime as 'w) (事件時(shí)間字段 rowtime)      * .window(Tumble over 10.minutes on 'proctime as 'w)(處理時(shí)間字段 proctime)      * .window(Tumble over 10.minutes on 'proctime as 'w) (類似于計(jì)數(shù)窗口,按處理時(shí)間排序,10 行一組)      */     resultTable.toAppendStream[Row].print("talbe")     sqlTable.toRetractStream[Row].print("sqlTable")          env.execute("FlinkSQLTumBlingTie")   }    case class SensorReading(id: String, timestamp: Long, temperature: Double)  }

運(yùn)行結(jié)果

FlinkSQL中窗口的功能及實(shí)例用法

1.2 滑動(dòng)窗口

滑動(dòng)窗口(Sliding windows)要用 Slide 類來定義,另外還有四個(gè)方法:

  • over:定義窗口長度

  • every:定義滑動(dòng)步長

  • on:用來分組(按時(shí)間間隔)或者排序(按行數(shù))的時(shí)間字段

  • as:別名,必須出現(xiàn)在后面的 groupBy 中

實(shí)現(xiàn)案例

1.需求描述

設(shè)置窗口大小為10秒鐘設(shè)置滑動(dòng)距離為5秒鐘,統(tǒng)計(jì)id的出現(xiàn)的次數(shù)。

2.數(shù)據(jù)準(zhǔn)備

sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9

3.實(shí)現(xiàn)代碼

package windows  import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.{EnvironmentSettings, Slide, Table} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row import windows.FlinkSQLTumBlingTie.SensorReading  /**  * @Package windows  * @File :FlinkSQLSlideTime.java  * @author 大數(shù)據(jù)老哥  * @date 2020/12/27 22:19  * @version V1.0  *          滑動(dòng)窗口  */ object FlinkSQLSlideTime {   def main(args: Array[String]): Unit = {     //構(gòu)建運(yùn)行環(huán)境     val env = StreamExecutionEnvironment.getExecutionEnvironment     env.setParallelism(1) // 設(shè)置分區(qū)為1 方便后面測試     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件時(shí)間      val settings = EnvironmentSettings.newInstance()       .useBlinkPlanner()       .inStreamingMode()       .build()     // 創(chuàng)建表env     val tableEnv = StreamTableEnvironment.create(env, settings)      // 讀取數(shù)據(jù)     val inputPath = "./data/sensor.txt"     val inputStream = env.readTextFile(inputPath)      // 先轉(zhuǎn)換成樣例類類型(簡單轉(zhuǎn)換操作)     val dataStream = inputStream       .map(data => {         val arr = data.split(",")         SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)       })       .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {         override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L       })      val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)     // 注冊表     tableEnv.createTemporaryView("sensor", sensorTable)     // table API 實(shí)現(xiàn)     val tableApi = sensorTable.window(Slide over 10.seconds every 5.seconds on 'ts as 'w)       .groupBy('w, 'id)       .select('id, 'id.count, 'w.end)     val tableSql = tableEnv.sqlQuery(       """         |select         |id,         |count(id),         |HOP_END(ts,INTERVAL '10' SECOND, INTERVAL '5' SECOND )as w         |from sensor         |group by         |HOP(ts,INTERVAL '10' SECOND, INTERVAL '5' SECOND),id         |""".stripMargin)      tableApi.toAppendStream[Row].print("tableApi")     tableSql.toAppendStream[Row].print("tableSql")     /** .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w) (事件時(shí)間字段 rowtime) .window(Slide over 10.minutes every 5.minutes on 'proctime as 'w) (處理時(shí)間字段 proctime)  .window(Slide over 10.rows every 5.rows on 'proctime as 'w) (類似于計(jì)數(shù)窗口,按處理時(shí)間排序,10 行一組)    **/     env.execute("FlinkSQLSlideTime")   } }

4.運(yùn)行結(jié)果

FlinkSQL中窗口的功能及實(shí)例用法

1.3 會話窗口

會話窗口(Session windows)要用 Session 類來定義,另外還有三個(gè)方法:

  • withGap:會話時(shí)間間隔

  • on:用來分組(按時(shí)間間隔)或者排序(按行數(shù))的時(shí)間字段

  • as:別名,必須出現(xiàn)在后面的 groupBy 中實(shí)現(xiàn)案例

1.需求描述

設(shè)置一個(gè)session 為10秒鐘 統(tǒng)計(jì)id的個(gè)數(shù)

2.準(zhǔn)備數(shù)據(jù)

sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9

3.編寫代碼

package windows  import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.{EnvironmentSettings, Session, Table} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row import windows.FlinkSQLTumBlingTie.SensorReading  /**  * @Package windows  * @File :FlinkSqlSessionTime.java  * @author 大數(shù)據(jù)老哥  * @date 2020/12/27 22:52  * @version V1.0  */ object FlinkSqlSessionTime {   def main(args: Array[String]): Unit = {     //構(gòu)建運(yùn)行環(huán)境     val env = StreamExecutionEnvironment.getExecutionEnvironment     env.setParallelism(1) // 設(shè)置分區(qū)為1 方便后面測試     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件時(shí)間      val settings = EnvironmentSettings.newInstance()       .useBlinkPlanner()       .inStreamingMode()       .build()     // 創(chuàng)建表env     val tableEnv = StreamTableEnvironment.create(env, settings)      // 讀取數(shù)據(jù)     val inputPath = "./data/sensor.txt"     val inputStream = env.readTextFile(inputPath)      // 先轉(zhuǎn)換成樣例類類型(簡單轉(zhuǎn)換操作)     val dataStream = inputStream       .map(data => {         val arr = data.split(",")         SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)       })       .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {         override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L       })      val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)     // 注冊表     tableEnv.createTemporaryView("sensor", sensorTable)      val tableApi = sensorTable.       window(Session withGap 10.seconds on 'ts as 'w)       .groupBy('id, 'w)       .select('id, 'id.count, 'w.end)     val tableSQL = tableEnv.sqlQuery(       """         |SELECT         |id,         |COUNT(id),         |SESSION_END(ts, INTERVAL '10' SECOND) AS w         |FROM sensor         |GROUP BY         |id,         |SESSION(ts, INTERVAL '10' SECOND)         |""".stripMargin)     tableApi.toAppendStream[Row].print("tableApi")     tableSQL.toAppendStream[Row].print("tableSQL")      /**      * .window(Session withGap 10.minutes on 'rowtime as 'w) 事件時(shí)間字段 rowtime)      * .window(Session withGap 10.minutes on 'proctime as 'w) 處理時(shí)間字段 proctime)      */     env.execute("FlinkSqlSessionTime")   } }

4.運(yùn)行結(jié)果

FlinkSQL中窗口的功能及實(shí)例用法

二、 Over Windows

Over window 聚合是標(biāo)準(zhǔn) SQL 中已有的(Over 子句),可以在查詢的 SELECT 子句中定義。Over  window 聚合,會針對每個(gè)輸入行,計(jì)算相鄰行范圍內(nèi)的聚合。Over windows使用.window(w:overwindows*)子句定義,并在  select()方法中通過別名來引用。例子:

val table = input .window([w: OverWindow] as 'w) .select('a, 'b.sum over 'w, 'c.min over 'w)

Table API 提供了 Over 類,來配置 Over 窗口的屬性。可以在事件時(shí)間或處理時(shí)間,以及指定為時(shí)間間隔、或行計(jì)數(shù)的范圍內(nèi),定義 Over  windows。

無界的 over window 是使用常量指定的。也就是說,時(shí)間間隔要指定 UNBOUNDED_RANGE,或者行計(jì)數(shù)間隔要指定  UNBOUNDED_ROW。而有界的 over window 是用間隔的大小指定的。

2.1 無界的 over window

// 無界的事件時(shí)間 over window (時(shí)間字段 "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) //無界的處理時(shí)間 over window (時(shí)間字段"proctime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w) // 無界的事件時(shí)間 Row-count over window (時(shí)間字段 "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) //無界的處理時(shí)間 Row-count over window (時(shí)間字段 "rowtime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)

2.2 有界的 over window

// 有界的事件時(shí)間 over window (時(shí)間字段 "rowtime",之前 1 分鐘) .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w) // 有界的處理時(shí)間 over window (時(shí)間字段 "rowtime",之前 1 分鐘) .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w) // 有界的事件時(shí)間 Row-count over window (時(shí)間字段 "rowtime",之前 10 行) .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w) // 有界的處理時(shí)間 Row-count over window (時(shí)間字段 "rowtime",之前 10 行) .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)

2.3 代碼練習(xí)

我們可以綜合學(xué)習(xí)過的內(nèi)容,用一段完整的代碼實(shí)現(xiàn)一個(gè)具體的需求。例如,統(tǒng)計(jì)每個(gè)sensor每條數(shù)據(jù),與之前兩行數(shù)據(jù)的平均溫度。

數(shù)據(jù)準(zhǔn)備

sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9

代碼分析:

package windows  import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.{EnvironmentSettings, Over, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row  /** * @Package windows * @File :FlinkSqlTumBlingOverTime.java * @author 大數(shù)據(jù)老哥 * @date 2020/12/28 21:45 * @version V1.0 */ object FlinkSqlTumBlingOverTime {  def main(args: Array[String]): Unit = {    // 構(gòu)建運(yùn)行環(huán)境    val env = StreamExecutionEnvironment.getExecutionEnvironment    env.setParallelism(1) // 設(shè)置并行度為1方便后面進(jìn)行測試    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 設(shè)置事件時(shí)間     val settings = EnvironmentSettings.newInstance()      .useBlinkPlanner()      .inStreamingMode()      .build()    //構(gòu)建table Env    val tableEnv = StreamTableEnvironment.create(env, settings)     // 讀取數(shù)據(jù)    val inputPath = "./data/sensor.txt"    val inputStream = env.readTextFile(inputPath)    // 先轉(zhuǎn)換成樣例類類型(簡單轉(zhuǎn)換操作)    // 解析數(shù)據(jù) 封裝成樣例類    val dataStream = inputStream      .map(data => {        val arr = data.split(",")        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)      })      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L      })    // 將數(shù)據(jù)注冊成一張臨時(shí)表    val dataTable = tableEnv.fromDataStream(dataStream,'id, 'temperature, 'timestamp.rowtime as 'ts)    tableEnv.createTemporaryView("sensor",dataTable)    var tableRes= dataTable.window( Over partitionBy 'id orderBy  'ts preceding 2.rows as 'ow)     .select('id,'ts,'id.count over 'ow, 'temperature.avg over 'ow)    var tableSql= tableEnv.sqlQuery(      """        |select        |id,        |ts,        |count(id) over ow,        |avg(temperature) over ow        |from sensor        |window ow as(        | partition by id        | order by ts        | rows between 2 preceding and current row        |)        |""".stripMargin)     tableRes.toAppendStream[Row].print("tableRes")    tableSql.toAppendStream[Row].print("tableSql")    env.execute("FlinkSqlTumBlingOverTime")  }  case class SensorReading(id: String, timestamp: Long, temperature: Double)  }

FlinkSQL中窗口的功能及實(shí)例用法

運(yùn)行結(jié)果

到此,關(guān)于“FlinkSQL中窗口的功能及實(shí)例用法”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

分享名稱:FlinkSQL中窗口的功能及實(shí)例用法
當(dāng)前URL:http://chinadenli.net/article6/goghog.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)網(wǎng)站制作網(wǎng)站建設(shè)外貿(mào)建站網(wǎng)站維護(hù)網(wǎng)站改版

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)