使用Kafka3.0.0
master | slave1 | slave2 | |
---|---|---|---|
ip | 193.168.3.34 | 193.168.3.35 | 193.168.3.36 |
org.springframework.boot spring-boot-starter-web org.projectlombok lombok org.springframework.kafka spring-kafka
三、application配置spring:
kafka:
bootstrap-servers: 192.168.3.34:9092,192.168.3.35:9092,192.168.3.36:9092 # 指定 kafka 的地址
producer: #生產(chǎn)者
retries: 0 #重復(fù)次數(shù) ,失敗不重發(fā)
batch-size: 16384 #每次批量發(fā)送消息的數(shù)量
buffer-memory: 33554432 #緩存大小達(dá)到buffer.memory就發(fā)送數(shù)據(jù)
acks: 1 # 0=生產(chǎn)者將不會(huì)等待來(lái)自服務(wù)器的任何確認(rèn) 1=leader會(huì)將記錄寫入其本地日志,但無(wú)需等待所有副本服務(wù)器的完全確認(rèn)即可做出回應(yīng) -1 =leader將等待完整的同步副本集以確認(rèn)記錄
key-serializer: org.apache.kafka.common.serialization.StringSerializer #指定 key 的序列化器
value-serializer: org.apache.kafka.common.serialization.StringSerializer #指定 value 的序列化器
consumer:
group-id: nacl #指定消費(fèi)者組的 group_id
auto-offset-reset: earliest #latest 最新的位置 , earliest最早的位置
auto-commit-interval: 100 #自動(dòng)提交offset頻率 100毫秒
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #指定 key 的反序列化器
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #指定 value 的反序列化器
listener:
concurrency: 3 #3個(gè)并行監(jiān)聽
四、SpringBoot-生產(chǎn)者import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@CrossOrigin
@RestController
public class ProducerController {// Kafka 模板用來(lái)向 kafka 發(fā)送數(shù)據(jù)
@Resource
private KafkaTemplatekafkaTemplate;
@RequestMapping("/kf")
public String data() {kafkaTemplate.send("first", "hello");
return "ok";
}
}
五、SpringBoot-消費(fèi)者import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
@Configuration
public class KafkaConsumer {// 指定要監(jiān)聽的 topic
@KafkaListener(topics = "first")
public void consumeTopic(String msg) {// 參數(shù): 收到的 value
System.out.println("收到的信息: " + msg);
}
}
六、SpringBoot-主題分區(qū)import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaTopic {@Bean
public NewTopic batchTopic() {//項(xiàng)目啟動(dòng)時(shí),自動(dòng)創(chuàng)建topic,指定分區(qū)和副本數(shù)量
return new NewTopic("first", 3, (short) 1);
}
}
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧
網(wǎng)頁(yè)名稱:【SpringBoot】整合Kafka集群-創(chuàng)新互聯(lián)
文章路徑:http://chinadenli.net/article10/dgphdo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站、小程序開發(fā)、網(wǎng)站制作、網(wǎng)頁(yè)設(shè)計(jì)公司、網(wǎng)站維護(hù)、虛擬主機(jī)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容