Skip to content

Commit

Permalink
SpringBoot2.0 整合 RocketMQ ,实现请求异步处理
Browse files Browse the repository at this point in the history
  • Loading branch information
cicadasmile committed Jun 11, 2019
1 parent 8845a6b commit c7362ce
Show file tree
Hide file tree
Showing 11 changed files with 348 additions and 0 deletions.
55 changes: 55 additions & 0 deletions ware-rocket-queue/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>middle-ware-parent</artifactId>
<groupId>com.boot.parent</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<groupId>com.rocket.queue</groupId>
<artifactId>ware-rocket-queue</artifactId>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring-boot.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<version>${spring-boot.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.rocket.queue;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RocketApplication {
public static void main(String[] args) {
SpringApplication.run(RocketApplication.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.rocket.queue.controller;

import com.rocket.queue.service.FeePlatMqService;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

@RestController
public class RocketController {

@Resource
private FeePlatMqService feePlatMqService ;

@RequestMapping("/sendMsg")
public SendResult sendMsg (){
String msg = "OpenAccount Msg";
SendResult sendResult = null;
try {
sendResult = feePlatMqService.openAccountMsg(msg) ;
} catch (Exception e) {
e.printStackTrace();
}
return sendResult ;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.rocket.queue.rocket;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/**
* RocketMQ 消费者配置
*/
@Configuration
public class ConsumerConfig {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerConfig.class) ;
@Value("${rocketmq.consumer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.consumer.groupName}")
private String groupName;
@Value("${rocketmq.consumer.consumeThreadMin}")
private int consumeThreadMin;
@Value("${rocketmq.consumer.consumeThreadMax}")
private int consumeThreadMax;
@Value("${rocketmq.consumer.topics}")
private String topics;
@Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
private int consumeMessageBatchMaxSize;
@Resource
private RocketMsgListener msgListener;
@Bean
public DefaultMQPushConsumer getRocketMQConsumer(){
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.registerMessageListener(msgListener);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
try {
String[] topicTagsArr = topics.split(";");
for (String topicTags : topicTagsArr) {
String[] topicTag = topicTags.split("~");
consumer.subscribe(topicTag[0],topicTag[1]);
}
consumer.start();
}catch (MQClientException e){
e.printStackTrace();
}
return consumer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.rocket.queue.rocket;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* RocketMQ 生产者配置
*/
@Configuration
public class ProducerConfig {

private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ;
@Value("${rocketmq.producer.groupName}")
private String groupName;
@Value("${rocketmq.producer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.producer.maxMessageSize}")
private Integer maxMessageSize ;
@Value("${rocketmq.producer.sendMsgTimeout}")
private Integer sendMsgTimeout;
@Value("${rocketmq.producer.retryTimesWhenSendFailed}")
private Integer retryTimesWhenSendFailed;

@Bean
public DefaultMQProducer getRocketMQProducer() {
DefaultMQProducer producer;
producer = new DefaultMQProducer(this.groupName);
producer.setNamesrvAddr(this.namesrvAddr);
//如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
if(this.maxMessageSize!=null){
producer.setMaxMessageSize(this.maxMessageSize);
}
if(this.sendMsgTimeout!=null){
producer.setSendMsgTimeout(this.sendMsgTimeout);
}
//如果发送消息失败,设置重试次数,默认为2次
if(this.retryTimesWhenSendFailed!=null){
producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
}
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
return producer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.rocket.queue.rocket;

import com.rocket.queue.service.impl.ParamConfigService;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
/**
* 消息消费监听
*/
@Component
public class RocketMsgListener implements MessageListenerConcurrently {
private static final Logger LOG = LoggerFactory.getLogger(RocketMsgListener.class) ;
@Resource
private ParamConfigService paramConfigService ;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(list)){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt messageExt = list.get(0);
LOG.info("接受到的消息为:"+new String(messageExt.getBody()));
int reConsume = messageExt.getReconsumeTimes();
// 消息已经重试了3次,如果不需要再次消费,则返回成功
if(reConsume ==3){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
if(messageExt.getTopic().equals(paramConfigService.feePlatTopic)){
String tags = messageExt.getTags() ;
switch (tags){
case "FeeAccountTag":
LOG.info("开户 tag == >>"+tags);
break ;
default:
LOG.info("未匹配到Tag == >>"+tags);
break;
}
}
// 消息消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.rocket.queue.service;

import org.apache.rocketmq.client.producer.SendResult;

public interface FeePlatMqService {
SendResult openAccountMsg (String msgInfo) ;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.rocket.queue.service.impl;

import com.rocket.queue.service.FeePlatMqService;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class FeePlatMqServiceImpl implements FeePlatMqService {
@Resource
private DefaultMQProducer defaultMQProducer;
@Resource
private ParamConfigService paramConfigService ;
@Override
public SendResult openAccountMsg(String msgInfo) {
// 可以不使用Config中的Group
defaultMQProducer.setProducerGroup(paramConfigService.feePlatGroup);
SendResult sendResult = null;
try {
Message sendMsg = new Message(paramConfigService.feePlatTopic,
paramConfigService.feeAccountTag,
"fee_open_account_key", msgInfo.getBytes());
sendResult = defaultMQProducer.send(sendMsg);
} catch (Exception e) {
e.printStackTrace();
}
return sendResult ;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.rocket.queue.service.impl;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class ParamConfigService {
@Value("${fee-plat.fee-plat-group}")
public String feePlatGroup ;
@Value("${fee-plat.fee-plat-topic}")
public String feePlatTopic ;
@Value("${fee-plat.fee-account-tag}")
public String feeAccountTag ;
}
34 changes: 34 additions & 0 deletions ware-rocket-queue/src/main/resources/application-dev.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@

rocketmq:
# 生产者配置
producer:
isOnOff: on
# 发送同一类消息的设置为同一个group,保证唯一
groupName: FeePlatGroup
# 服务地址
namesrvAddr: 10.1.20.107:9876
# 消息最大长度 默认1024*4(4M)
maxMessageSize: 4096
# 发送消息超时时间,默认3000
sendMsgTimeout: 3000
# 发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
# 消费者配置
consumer:
isOnOff: on
# 官方建议:确保同一组中的每个消费者订阅相同的主题。
groupName: FeePlatGroup
# 服务地址
namesrvAddr: 10.1.20.107:9876
# 接收该 Topic 下所有 Tag
topics: FeePlatTopic~*;
consumeThreadMin: 20
consumeThreadMax: 64
# 设置一次消费消息的条数,默认为1条
consumeMessageBatchMaxSize: 1

# 配置 Group Topic Tag
fee-plat:
fee-plat-group: FeePlatGroup
fee-plat-topic: FeePlatTopic
fee-account-tag: FeeAccountTag
18 changes: 18 additions & 0 deletions ware-rocket-queue/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Tomcat
server:
tomcat:
uri-encoding: UTF-8
max-threads: 1000
min-spare-threads: 30
port: 7002
connection-timeout: 5000ms
servlet:
context-path: /rocket-mq

spring:
application:
name: rocket-mq
profiles:
active: dev
mvc:
throw-exception-if-no-handler-found: true

0 comments on commit c7362ce

Please sign in to comment.