Skip to content

Commit

Permalink
SpringBoot2.0 整合 RocketMQ ,实现请求异步处理
Browse files Browse the repository at this point in the history
  • Loading branch information
cicadasmile committed Sep 14, 2019
1 parent 9617053 commit 900d609
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.rocket.queue.controller;

import com.rocket.queue.service.FeePlatMqService;
import com.rocket.queue.service.RocketMqService;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
Expand All @@ -10,14 +10,14 @@
public class RocketController {

@Resource
private FeePlatMqService feePlatMqService ;
private RocketMqService rocketMqService ;

@RequestMapping("/sendMsg")
public SendResult sendMsg (){
String msg = "OpenAccount Msg";
SendResult sendResult = null;
try {
sendResult = feePlatMqService.openAccountMsg(msg) ;
sendResult = rocketMqService.openAccountMsg(msg) ;
} catch (Exception e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeCo
if(reConsume ==3){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
if(messageExt.getTopic().equals(paramConfigService.feePlatTopic)){
if(messageExt.getTopic().equals(paramConfigService.rocketTopic)){
String tags = messageExt.getTags() ;
switch (tags){
case "FeeAccountTag":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

import org.apache.rocketmq.client.producer.SendResult;

public interface FeePlatMqService {
public interface RocketMqService {
SendResult openAccountMsg (String msgInfo) ;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import org.springframework.stereotype.Service;
@Service
public class ParamConfigService {
@Value("${fee-plat.fee-plat-group}")
public String feePlatGroup ;
@Value("${fee-plat.fee-plat-topic}")
public String feePlatTopic ;
@Value("${fee-plat.fee-account-tag}")
public String feeAccountTag ;
@Value("${rocket.group}")
public String rocketGroup ;
@Value("${rocket.topic}")
public String rocketTopic ;
@Value("${rocket.tag}")
public String rocketTag ;
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
package com.rocket.queue.service.impl;

import com.rocket.queue.service.FeePlatMqService;
import com.rocket.queue.service.RocketMqService;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class FeePlatMqServiceImpl implements FeePlatMqService {
public class RocketMqServiceImpl implements RocketMqService {
@Resource
private DefaultMQProducer defaultMQProducer;
@Resource
private ParamConfigService paramConfigService ;
@Override
public SendResult openAccountMsg(String msgInfo) {
// 可以不使用Config中的Group
defaultMQProducer.setProducerGroup(paramConfigService.feePlatGroup);
defaultMQProducer.setProducerGroup(paramConfigService.rocketGroup);
SendResult sendResult = null;
try {
Message sendMsg = new Message(paramConfigService.feePlatTopic,
paramConfigService.feeAccountTag,
Message sendMsg = new Message(paramConfigService.rocketTopic,
paramConfigService.rocketTag,
"fee_open_account_key", msgInfo.getBytes());
sendResult = defaultMQProducer.send(sendMsg);
} catch (Exception e) {
Expand Down
16 changes: 8 additions & 8 deletions ware-rocket-queue/src/main/resources/application-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ rocketmq:
producer:
isOnOff: on
# 发送同一类消息的设置为同一个group,保证唯一
groupName: FeePlatGroup
groupName: CicadaGroup
# 服务地址
namesrvAddr: 10.1.20.107:9876
namesrvAddr: 127.0.0.1:9876
# 消息最大长度 默认1024*4(4M)
maxMessageSize: 4096
# 发送消息超时时间,默认3000
Expand All @@ -17,9 +17,9 @@ rocketmq:
consumer:
isOnOff: on
# 官方建议:确保同一组中的每个消费者订阅相同的主题。
groupName: FeePlatGroup
groupName: CicadaGroup
# 服务地址
namesrvAddr: 10.1.20.107:9876
namesrvAddr: 127.0.0.1:9876
# 接收该 Topic 下所有 Tag
topics: FeePlatTopic~*;
consumeThreadMin: 20
Expand All @@ -28,7 +28,7 @@ rocketmq:
consumeMessageBatchMaxSize: 1

# 配置 Group Topic Tag
fee-plat:
fee-plat-group: FeePlatGroup
fee-plat-topic: FeePlatTopic
fee-account-tag: FeeAccountTag
rocket:
group: rocketGroup
topic: rocketTopic
tag: rocketTag

0 comments on commit 900d609

Please sign in to comment.