Rabbitmq配置说明
rabbitmq模式选型
路由模式
注:p 消费者,x 交换机, 橙色、黑色、绿色 三种路由键,Q 队列,C 消费者
说明:各生产者把消息发送到统一的路由交换机,通过路由器把消息发到一条或多条消费者对应的队列,实现异步; 当消费者在业务处理时报错,可统一把信息转发到延迟交换机(可设置延迟时间,不延迟也可以),再有延迟交换机发送回对应队列。
各服务配置
一、市场服务(生产者)->云工厂服务(消费者)
- 业务场景:市场服务修改市场名称后,云工厂服务需要修改对应的市场名称冗余字段
- 交换机 路由交换机名(exchange):direct.exchange 延迟交换机名(exchange):delay.exchange
- 队列(query): market.sync.factory
- 绑定路由键(routingKey): markets
- 绑定延迟路由键(routingKey): market.sync.factory.delay
- 消息内容(json格式): id:市场id type:操作类型(edit) data:拓展消息内容 例:{“id”:201601249,“type”:“edit”,“data”:{“marketName”:“广达大饭店”}}
代码实现例子(会员服务(生产者))
1、pom文件添加
<!-- mq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、application配置
###################rabbitmq#########################
spring:
rabbitmq:
host: 192.168.0.12
port: 5672
username: whrp_mq
password: KxkpdPHTp#mt@d@X
virtual-host: message
listener:
simple:
acknowledge-mode: manual
3、生产者发消息代码
@Component
public class FactorySender {
@Autowired
private AmqpTemplate rabbitTemplate;
ObjectMapper mapper = new ObjectMapper();
public void send() {
try {
//传输数据创建
Map<String, Object> map = new HashMap<>();
map.put("id",1086090301232549888L);
map.put("type","edit");
Map<String, Object> data = new HashMap<>();
data.put("nickName","广达大饭店");
data.put("avatar","/pic/user/head/20190212174953b779522a57884cf485be7ddbaaad78e1.png");
map.put("data",data);
String message = mapper.writeValueAsString(map);
//参数1:路由名 参数2:routingKey 参数3:传输内容
rabbitTemplate.convertAndSend("direct.exchange", "member", message);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
代码实现例子(评价服务(消费者))
1、pom文件添加
<!-- mq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、application配置
###################rabbitmq#########################
spring:
rabbitmq:
host: 192.168.0.12
port: 5672
username: whrp_mq
password: KxkpdPHTp#mt@d@X
virtual-host: message
listener:
simple:
acknowledge-mode: manual
3、新增Config类
@Configuration
public class RoutingConfig {
/**
* 定义路由交换器
* @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.exchange");
}
/**
* 定义延迟交换机
* @return
*/
@Bean
CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
//参数二为类型:必须是x-delayed-message
return new CustomExchange("delay.exchange", "x-delayed-message", true, false, args);
}
/**
* 会员信息同步评价队列
* @return
*/
@Bean
public Queue memberSyncEvaluationQueue() {
return new Queue("member.sync.evaluation");
}
/**
* memberSyncEvaluationQueue队列绑定到directExchange交换机上,路由键为member
* @param directExchange
* @param factoryEsQueue
* @return
*/
@Bean
public Binding bindingFactorySyncEs(DirectExchange directExchange, Queue memberSyncEvaluationQueue) {
return BindingBuilder.bind(memberSyncEvaluationQueue).to(directExchange).with("member");
}
/**
* 实现延迟队列,错误重试
* memberSyncEvaluationQueue队列绑定到customExchange交换机上,路由键为member.sync.evaluation.delay
* @param directExchange
* @param factoryEsQueue
* @return
*/
@Bean
public Binding bindingFactoryEsQueueDelay(CustomExchange customExchange, Queue memberSyncEvaluationQueue) {
return BindingBuilder.bind(memberSyncEvaluationQueue).to(customExchange).with("member.sync.evaluation.delay").noargs();
}
}
4、消费者
@Component
public class FactoryReceiver {
@Autowired
private AmqpTemplate rabbitTemplate;
@RabbitListener(queues = "member.sync.evaluation")
public void esReceiver(Message message, Channel channel, @Header(value = "count", defaultValue = "0") int count) {
try {
//对应业务代码
System.out.println("esReceiver++++++++++:" + message);
//ack确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
try {
//重试次数设置
if(count <= 3){
Map<String, Object> headers = new HashMap<>(1);
//重试次数累加
headers.put("count", count + 1);
//延迟时间设置(毫秒)
headers.put("x-delay", 5*1000);
//参数1:路由名 参数2:延迟队列routingKey
channel.basicPublish("delay.exchange","member.sync.evaluation.delay", false, new AMQP.BasicProperties.Builder()
.headers(headers)
.build(), message.getBody());
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
作者:崔启旭 创建时间:2023-04-03 16:04
更新时间:2024-11-15 23:02
更新时间:2024-11-15 23:02