欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

基于Flink+kafka實時告警-創(chuàng)新互聯(lián)

引出問題

項目使用告警系統(tǒng)的邏輯是將實時數(shù)據(jù)保存到本地數(shù)據(jù)庫再使用定時任務(wù)做判斷,然后產(chǎn)生告警數(shù)據(jù)。這種方式存在告警的延時實在是太高了。數(shù)據(jù)從產(chǎn)生到保存,從保存到判斷都會存在時間差,按照保存數(shù)據(jù)定時5分鐘一次,定時任務(wù)5分鐘一次。最高會產(chǎn)生10分鐘的誤差,這種告警就沒什么意義了。

讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務(wù)項目有:域名注冊虛擬主機、營銷軟件、網(wǎng)站建設(shè)、鹽湖網(wǎng)站維護、網(wǎng)站推廣。demo設(shè)計

為了簡單的還原業(yè)務(wù)場景,做了簡單的demo假設(shè)

實現(xiàn)一個對于學(xué)生成績評價的實時處理程序
數(shù)學(xué)成績,基準(zhǔn)范圍是90-140,超出告警
物理成績,基準(zhǔn)范圍是60-95,超出告警

環(huán)境搭建

使用windows環(huán)境演示

準(zhǔn)備工作

1、安裝jdk

2、安裝zookeeper

解壓壓縮包

zoo_sample.cfg將它重命名為zoo.cfg

修改配置 dataDir=D://tools//apache-zookeeper-3.5.10-bin//data

配置環(huán)境變量

3、安裝kafka

解壓壓縮包

修改config/server.properties

log.dirs=D://tools//kafka_2.11-2.1.0//log

flink程序代碼

pom

org.apache.flinkflink-java1.13.0org.apache.flinkflink-streaming-java_2.121.13.0org.apache.flinkflink-clients_2.121.13.0org.apache.flinkflink-connector-kafka_2.121.13.0org.projectlomboklombok1.18.12providedcom.alibabafastjson1.2.62org.apache.flinkflink-connector-kafka_2.111.10.0

主程序

public class StreamAlertDemo {

	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);
		Properties properties = new Properties();
		properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		FlinkKafkaConsumerkafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
		DataStreamSourceinputDataStream = env.addSource(kafkaConsumer);

		DataStreamresultStream = inputDataStream.flatMap(new AlertFlatMapper());
		resultStream.print().setParallelism(4);

		resultStream.addSink(new FlinkKafkaProducer<>("demo",new SimpleStringSchema(),properties));
		env.execute();
	}

}
主程序,配置告警規(guī)則后期可以使用推送或者拉去方式獲取數(shù)據(jù)
public class RuleMap {

	private RuleMap(){}

	public final static MapinitialRuleMap;

	private static List ruleList = new ArrayList<>();

	private static ListruleStringList = new ArrayList<>(Arrays.asList(
			"{\"target\":\"MathVal\",\"type\":\"0\",\"criticalVal\":90,\"descInfo\":\"You Math score is too low\"}",
			"{\"target\":\"MathVal\",\"type\":\"2\",\"criticalVal\":140,\"descInfo\":\"You Math score is too high\"}",
			"{\"target\":\"PhysicsVal\",\"type\":\"0\",\"criticalVal\":60,\"descInfo\":\"You Physics score is too low\"}",
			"{\"target\":\"PhysicsVal\",\"type\":\"2\",\"criticalVal\":95,\"descInfo\":\"You Physics score is too high\"}"));

	static {
		for (String i : ruleStringList) {
			ruleList.add(JSON.parseObject(i, AlertRule.class));
		}
		initialRuleMap = ruleList.stream().collect(Collectors.groupingBy(AlertRule::getTarget));
	}


}

AlertFlatMapper,處理告警邏輯

public class AlertFlatMapper implements FlatMapFunction{

	@Override
	public void flatMap(String inVal, Collectorout) throws Exception {
		Achievement user = JSON.parseObject(inVal, Achievement.class);
		MapinitialRuleMap = RuleMap.initialRuleMap;
		List resList = new ArrayList<>();
		List mathRule = initialRuleMap.get("MathVal");
		for (AlertRule rule : mathRule) {
			if (checkVal(user.getMathVal(), rule.getCriticalVal(), rule.getType())) {
				resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
			}
		}
		List physicsRule = initialRuleMap.get("PhysicsVal");
		for (AlertRule rule : physicsRule) {
			if (checkVal(user.getPhysicsVal(), rule.getCriticalVal(), rule.getType())) {
				resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
			}
		}
		String result = JSON.toJSONString(resList);
		out.collect(result);
	}

	private static boolean checkVal(Integer actVal, Integer targetVal, Integer type) {
		switch (type) {
			case 0:
				return actVal< targetVal;
			case 1:
				return actVal.equals(targetVal);
			case 2:
				return actVal >targetVal;
			default:
				return false;
		}
	}
}

三個實體類

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class Achievement implements Serializable {

    private static final long serialVersionUID = -1L;

    private String name;

    private Integer mathVal;

    private Integer physicsVal;

}

@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertInfo implements Serializable {
    
    private static final long serialVersionUID = -1L;

    private String name;

    private String descInfo;

}

@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertRule implements Serializable {

	private static final long serialVersionUID = -1L;

	private String target;

	//0小于 1等于 2大于
	private Integer type;

	private Integer criticalVal;

	private String descInfo;
}
項目演示

創(chuàng)建kafka生產(chǎn)者 test
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

創(chuàng)建kafka消費者 demo
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo --from-beginning

啟動flink應(yīng)用

給topic test發(fā)送消息

{"name":"liu","MathVal":45,"PhysicsVal":76}

消費topic demo

告警系統(tǒng)架構(gòu)

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧

本文名稱:基于Flink+kafka實時告警-創(chuàng)新互聯(lián)
標(biāo)題來源:http://chinadenli.net/article36/ddgosg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站維護定制網(wǎng)站、網(wǎng)站改版網(wǎng)站營銷、軟件開發(fā)動態(tài)網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)