環(huán)境 flink1.7.2
創(chuàng)新互聯(lián)公司專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于成都網(wǎng)站建設(shè)、做網(wǎng)站、綦江網(wǎng)絡(luò)推廣、成都微信小程序、綦江網(wǎng)絡(luò)營(yíng)銷、綦江企業(yè)策劃、綦江品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運(yùn)營(yíng)等,從售前售中售后,我們都將竭誠(chéng)為您服務(wù),您的肯定,是我們最大的嘉獎(jiǎng);創(chuàng)新互聯(lián)公司為所有大學(xué)生創(chuàng)業(yè)者提供綦江建站搭建服務(wù),24小時(shí)服務(wù)熱線:18980820575,官方網(wǎng)址:chinadenli.net
增加flink1.7.2 的lib 中的jar, 否則會(huì)報(bào)類找不到
avro-1.8.2.jar
flink-connector-kafka-0.10_2.12-1.7.2.jar
flink-connector-kafka-base_2.12-1.7.2.jar
flink-json-1.7.2.jar
kafka-clients-0.11.0.0.jar
flink-avro-1.7.2.jar
flink-connector-kafka-0.11_2.12-1.7.2.jar
flink-core-1.7.2.jar
flink-python_2.12-1.7.2.jar log4j-1.2.17.jar
flink-cep_2.12-1.7.2.jar
flink-connector-kafka-0.9_2.12-1.7.2.jar
flink-dist_2.12-1.7.2.jar
flink-table_2.12-1.7.2.jar
slf4j-log4j12-1.7.15.jartables:
- name: myTable
type: source
update-mode: append
connector:
property-version: 1
type: kafka
version: 0.11
topic: im-message-topic2
startup-mode: earliest-offset
properties:
- key: bootstrap.servers
value: kafkaip:9092
- key: group.id
value: testGroup
format:
property-version: 1
type: json
schema: "ROW(sessionId STRING, fromUid STRING, toUid STRING, chatType STRING, type STRING,msgId STRING, msg STRING, timestampSend STRING)"
schema:
- name: sessionId
type: STRING
- name: fromUid
type: STRING
- name: toUid
type: STRING
- name: chatType
type: STRING
- name: type
type: STRING
- name: msgId
type: STRING
- name: msg
type: STRING
- name: rowTime
type: TIMESTAMP
rowtime:
timestamps:
type: "from-field"
from: "timestampSend"
watermarks:
type: "periodic-bounded"
delay: "60"
- name: procTime
type: TIMESTAMP
proctime: true
./bin/sql-client.sh embedded
select * from myTable;
然后使用 MATCH_RECOGNIZE 的sql
SELECT * FROM myTable MATCH_RECOGNIZE
( PARTITION BY sessionId ORDER BY rowTime MEASURES
e2.procTime as answerTime, LAST(e1.procTime) as customer_event_time,
e2.fromUid as empUid,
e1.procTime as askTime,
1 as total_talk
ONE ROW PER MATCH AFTER MATCH SKIP TO LAST e2
PATTERN (e1 e2) DEFINE e1 as e1.type = 'yonghu',
e2 as e2.type = 'guanjia' );上面是使用sql-client 不用謝代碼,當(dāng)然也可以寫代碼,下面是對(duì)應(yīng)的程序
public static void main(String[] arg) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
tableEnv.connect(new Kafka()
.version("0.11")
.topic("im-message-topic3")
//.property("zookeeper.connect","")
.property("bootstrap.servers","kafkaip:9092")
.startFromEarliest()
.sinkPartitionerRoundRobin()//Flink分區(qū)隨機(jī)映射到kafka分區(qū)
).withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
).withSchema(new Schema()
.field("sessionId", Types.STRING).from("sessionId")
.field("fromUid", Types.STRING).from("fromUid")
.field("toUid", Types.STRING).from("toUid")
.field("chatType", Types.STRING).from("chatType")
.field("type", Types.STRING).from("type")
.field("msgId", Types.STRING).from("msgId")
.field("msg", Types.STRING).from("msg")
// .field("timestampSend", Types.SQL_TIMESTAMP)
.field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("timestampSend")
.watermarksPeriodicBounded(1000)
)
.field("proctime", Types.SQL_TIMESTAMP).proctime()
).inAppendMode().registerTableSource("myTable");
Table tb2 = tableEnv.sqlQuery(
"SELECT " +
"answerTime, customer_event_time, empUid, noreply_counts, total_talk " +
"FROM myTable" +
" " +
"MATCH_RECOGNIZE ( " +
"PARTITION BY sessionId " +
"ORDER BY rowtime " +
"MEASURES " +
"e2.rowtime as answerTime, "+
"LAST(e1.rowtime) as customer_event_time, " +
"e2.fromUid as empUid, " +
"1 as noreply_counts, " +
"e1.rowtime as askTime," +
"1 as total_talk " +
"ONE ROW PER MATCH " +
"AFTER MATCH SKIP TO LAST e2 " +
"PATTERN (e1 e2) " +
"DEFINE " +
"e1 as e1.type = 'yonghu', " +
"e2 as e2.type = 'guanjia' " +
")"+
""
);
DataStream<Row> appendStream =tableEnv.toAppendStream(tb2, Row.class);
System.out.println("schema is:");
tb2.printSchema();
appendStream.writeAsText("/usr/local/whk", WriteMode.OVERWRITE);
logger.info("stream end");
Table tb3 = tableEnv.sqlQuery("select sessionId, type from myTable");
DataStream<Row> temp =tableEnv.toAppendStream(tb3, Row.class);
tb3.printSchema();
temp.writeAsText("/usr/local/whk2", WriteMode.OVERWRITE);
env.execute("msg test");
}大功告成,其實(shí)里面坑很多。
注意:如果使用了 TimeCharacteristic.EventTime, 請(qǐng)不用再使用procTime。
名稱欄目:flinksql-clentMATCH_RECOGNIZEkafka例子
當(dāng)前網(wǎng)址:http://chinadenli.net/article22/jighjc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App設(shè)計(jì)、移動(dòng)網(wǎng)站建設(shè)、外貿(mào)建站、微信公眾號(hào)、云服務(wù)器、動(dòng)態(tài)網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)