spring batch中基于RabbitMQ遠(yuǎn)程分區(qū)Step是怎樣的,相信很多沒有經(jīng)驗(yàn)的人對(duì)此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。
十年的樂昌網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。營銷型網(wǎng)站的優(yōu)勢(shì)是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整樂昌建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。成都創(chuàng)新互聯(lián)公司從事“樂昌網(wǎng)站設(shè)計(jì)”,“樂昌網(wǎng)站推廣”以來,每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。
前言碎語
小編構(gòu)建的實(shí)例可為主服務(wù),從服務(wù),主從混用等模式,可以大大提高spring batch在單機(jī)處理時(shí)的時(shí)效。
項(xiàng)目源碼:https://gitee.com/kailing/partitionjob
spring batch遠(yuǎn)程分區(qū)Step的原理
master節(jié)點(diǎn)將數(shù)據(jù)根據(jù)相關(guān)邏輯(ID,hash),拆分成一段一段要處理的數(shù)據(jù)集,然后將數(shù)據(jù)集放到消息中間件中(ActiveMQ,RabbitMQ ),從節(jié)點(diǎn)監(jiān)聽到消息,獲取消息,讀取消息中的數(shù)據(jù)集處理并發(fā)回結(jié)果。如下圖:

下面按原理分步驟實(shí)施,完成springbatch的遠(yuǎn)程分區(qū)實(shí)例
第一步,首先引入相關(guān)依賴
見:https://gitee.com/kailing/partitionjob/blob/master/pom.xml
分區(qū)job主要依賴為:spring-batch-integration,提供了遠(yuǎn)程通訊的能力
第二步,Master節(jié)點(diǎn)數(shù)據(jù)分發(fā)
@Profile({"master", "mixed"})
@Bean
public Job job(@Qualifier("masterStep") Step masterStep) {
return jobBuilderFactory.get("endOfDayjob")
.start(masterStep)
.incrementer(new BatchIncrementer())
.listener(new JobListener())
.build();
}
@Bean("masterStep")
public Step masterStep(@Qualifier("slaveStep") Step slaveStep,
PartitionHandler partitionHandler,
DataSource dataSource) {
return stepBuilderFactory.get("masterStep")
.partitioner(slaveStep.getName(), new ColumnRangePartitioner(dataSource))
.step(slaveStep)
.partitionHandler(partitionHandler)
.build();
}master節(jié)點(diǎn)關(guān)鍵部分是,他的Step需要設(shè)置從節(jié)點(diǎn)Step的Name,和一個(gè)數(shù)據(jù)分區(qū)器,數(shù)據(jù)分區(qū)器需要實(shí)現(xiàn)Partitioner接口,它返回一個(gè)Map<String, ExecutionContext>的數(shù)據(jù)結(jié)構(gòu),這個(gè)結(jié)構(gòu)完整的描述了每個(gè)從節(jié)點(diǎn)需要處理的分區(qū)片段。ExecutionContext保存了從節(jié)點(diǎn)要處理的數(shù)據(jù)邊界,當(dāng)然,ExecutionContext里的參數(shù)是根據(jù)你的業(yè)務(wù)來的,我這里,已數(shù)據(jù)ID為邊界劃分了每個(gè)區(qū)。具體的Partitioner實(shí)現(xiàn)如下:
/**
* Created by kl on 2018/3/1.
* Content :根據(jù)數(shù)據(jù)ID分片
*/
public class ColumnRangePartitioner implements Partitioner {
private JdbcOperations jdbcTemplate;
ColumnRangePartitioner(DataSource dataSource){
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int min = jdbcTemplate.queryForObject("SELECT MIN(arcid) from kl_article", Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(arcid) from kl_article", Integer.class);
int targetSize = (max - min) / gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
}
}第三步,Integration配置
spring batch Integration提供了遠(yuǎn)程分區(qū)通訊能力,Spring Integration擁有豐富的通道適配器(例如JMS和AMQP),基于ActiveMQ,RabbitMQ等中間件都可以實(shí)現(xiàn)遠(yuǎn)程分區(qū)處理。本文使用RabbitMQ來做為通訊的中間件。關(guān)于RabbitMQ的安裝等不在本篇范圍,下面代碼描述了如何配置MQ連接,以及spring batch分區(qū)相關(guān)隊(duì)列,消息適配器等。
/**
* Created by kl on 2018/3/1.
* Content :遠(yuǎn)程分區(qū)通訊
*/
@Configuration
@ConfigurationProperties(prefix = "spring.rabbit")
public class IntegrationConfiguration {
private String host;
private Integer port=5672;
private String username;
private String password;
private String virtualHost;
private int connRecvThreads=5;
private int channelCacheSize=10;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(connRecvThreads);
executor.initialize();
connectionFactory.setExecutor(executor);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setChannelCacheSize(channelCacheSize);
return connectionFactory;
}
@Bean
public MessagingTemplate messageTemplate() {
MessagingTemplate messagingTemplate = new MessagingTemplate(outboundRequests());
messagingTemplate.setReceiveTimeout(60000000l);
return messagingTemplate;
}
@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "outboundRequests")
public AmqpOutboundEndpoint amqpOutboundEndpoint(AmqpTemplate template) {
AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(template);
endpoint.setExpectReply(true);
endpoint.setOutputChannel(inboundRequests());
endpoint.setRoutingKey("partition.requests");
return endpoint;
}
@Bean
public Queue requestQueue() {
return new Queue("partition.requests", false);
}
@Bean
@Profile({"slave","mixed"})
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(inboundRequests());
adapter.afterPropertiesSet();
return adapter;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("partition.requests");
container.setAutoStartup(false);
return container;
}
@Bean
public PollableChannel outboundStaging() {
return new NullChannel();
}
@Bean
public QueueChannel inboundRequests() {
return new QueueChannel();
}第四步,從節(jié)點(diǎn)接收分區(qū)信息并處理
@Bean
@Profile({"slave","mixed"})
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
BeanFactoryStepLocator stepLocator = new BeanFactoryStepLocator();
stepLocator.setBeanFactory(this.applicationContext);
stepExecutionRequestHandler.setStepLocator(stepLocator);
stepExecutionRequestHandler.setJobExplorer(this.jobExplorer);
return stepExecutionRequestHandler;
}
@Bean("slaveStep")
public Step slaveStep(MyProcessorItem processorItem,
JpaPagingItemReader reader) {
CompositeItemProcessor itemProcessor = new CompositeItemProcessor();
List<ItemProcessor> processorList = new ArrayList<>();
processorList.add(processorItem);
itemProcessor.setDelegates(processorList);
return stepBuilderFactory.get("slaveStep")
.<Article, Article>chunk(1000)//事務(wù)提交批次
.reader(reader)
.processor(itemProcessor)
.writer(new PrintWriterItem())
.build();
}從節(jié)點(diǎn)最關(guān)鍵的地方在于StepExecutionRequestHandler,他會(huì)接收MQ消息中間件中的消息,并從分區(qū)信息中獲取到需要處理的數(shù)據(jù)邊界,如下ItemReader:
@Bean(destroyMethod = "")
@StepScope
public JpaPagingItemReader<Article> jpaPagingItemReader(
@Value("#{stepExecutionContext['minValue']}") Long minValue,
@Value("#{stepExecutionContext['maxValue']}") Long maxValue) {
System.err.println("接收到分片參數(shù)["+minValue+"->"+maxValue+"]");
JpaPagingItemReader<Article> reader = new JpaPagingItemReader<>();
JpaNativeQueryProvider queryProvider = new JpaNativeQueryProvider<>();
String sql = "select * from kl_article where arcid >= :minValue and arcid <= :maxValue";
queryProvider.setSqlQuery(sql);
queryProvider.setEntityClass(Article.class);
reader.setQueryProvider(queryProvider);
Map queryParames= new HashMap();
queryParames.put("minValue",minValue);
queryParames.put("maxValue",maxValue);
reader.setParameterValues(queryParames);
reader.setEntityManagerFactory(entityManagerFactory);
return reader;
}中的minValuemin,maxValue,正是前文中Master節(jié)點(diǎn)分區(qū)中設(shè)置的值
如上,已經(jīng)完成了整個(gè)spring batch 遠(yuǎn)程分區(qū)處理的實(shí)例,需要注意的是,一個(gè)實(shí)例,即可主可從可主從,是有spring profile來控制的,細(xì)心的人可能會(huì)發(fā)現(xiàn)@Profile({"master", "mixed"})等注解,所以如果你在測試的時(shí)候,別忘了在spring boot中配置好spring.profiles.active=slave等。
看完上述內(nèi)容,你們掌握spring batch中基于RabbitMQ遠(yuǎn)程分區(qū)Step是怎樣的的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!
網(wǎng)頁題目:springbatch中基于RabbitMQ遠(yuǎn)程分區(qū)Step是怎樣的
分享鏈接:http://chinadenli.net/article32/gspcpc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信公眾號(hào)、微信小程序、軟件開發(fā)、網(wǎng)站設(shè)計(jì)、電子商務(wù)、網(wǎng)站改版
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)