Rabbitmq配置说明

rabbitmq模式选型

路由模式

注:p 消费者,x 交换机, 橙色、黑色、绿色 三种路由键,Q 队列,C 消费者

说明:各生产者把消息发送到统一的路由交换机,通过路由器把消息发到一条或多条消费者对应的队列,实现异步; 当消费者在业务处理时报错,可统一把信息转发到延迟交换机(可设置延迟时间,不延迟也可以),再有延迟交换机发送回对应队列。

各服务配置

一、市场服务(生产者)->云工厂服务(消费者)

  1. 业务场景:市场服务修改市场名称后,云工厂服务需要修改对应的市场名称冗余字段
  2. 交换机 路由交换机名(exchange):direct.exchange 延迟交换机名(exchange):delay.exchange
  3. 队列(query): market.sync.factory
  4. 绑定路由键(routingKey): markets
  5. 绑定延迟路由键(routingKey): market.sync.factory.delay
  6. 消息内容(json格式): id:市场id type:操作类型(edit) data:拓展消息内容 例:{“id”:201601249,“type”:“edit”,“data”:{“marketName”:“广达大饭店”}}

代码实现例子(会员服务(生产者))

1、pom文件添加

<!-- mq -->
<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、application配置

###################rabbitmq#########################
spring:
  rabbitmq:
		host: 192.168.0.12
		port: 5672
		username: whrp_mq
		password: KxkpdPHTp#mt@d@X
		virtual-host: message
		listener:
			simple:
				acknowledge-mode: manual

3、生产者发消息代码

@Component
public class FactorySender {

		@Autowired
		private AmqpTemplate rabbitTemplate;

		ObjectMapper mapper = new ObjectMapper();

		public void send() {
				try {
						//传输数据创建
						Map<String, Object> map = new HashMap<>();
						map.put("id",1086090301232549888L);
						map.put("type","edit");
						Map<String, Object> data = new HashMap<>();
						data.put("nickName","广达大饭店");
						data.put("avatar","/pic/user/head/20190212174953b779522a57884cf485be7ddbaaad78e1.png");
						map.put("data",data);
						String message = mapper.writeValueAsString(map);
						//参数1:路由名 参数2:routingKey 参数3:传输内容
						rabbitTemplate.convertAndSend("direct.exchange", "member", message);
				} catch (JsonProcessingException e) {
						e.printStackTrace();
				}
		}

}

代码实现例子(评价服务(消费者))

1、pom文件添加

<!-- mq -->
<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、application配置

###################rabbitmq#########################
spring:
  rabbitmq:
		host: 192.168.0.12
		port: 5672
		username: whrp_mq
		password: KxkpdPHTp#mt@d@X
		virtual-host: message
		listener:
			simple:
				acknowledge-mode: manual

3、新增Config类

@Configuration
public class RoutingConfig {

		/**
		 * 定义路由交换器
		 * @return
		 */
		@Bean
		public DirectExchange directExchange() {
				return new DirectExchange("direct.exchange");
		}

		/**
		 * 定义延迟交换机
		 * @return
		 */
		@Bean
		CustomExchange customExchange() {
				Map<String, Object> args = new HashMap<>();
				args.put("x-delayed-type", "direct");
				//参数二为类型:必须是x-delayed-message
				return new CustomExchange("delay.exchange", "x-delayed-message", true, false, args);
		}

		/**
		 * 会员信息同步评价队列
		 * @return
		 */
		@Bean
		public Queue memberSyncEvaluationQueue() {
				return new Queue("member.sync.evaluation");
		}

		/**
		 * memberSyncEvaluationQueue队列绑定到directExchange交换机上,路由键为member
		 * @param directExchange
		 * @param factoryEsQueue
		 * @return
		 */
		@Bean
		public Binding bindingFactorySyncEs(DirectExchange directExchange, Queue memberSyncEvaluationQueue) {
				return BindingBuilder.bind(memberSyncEvaluationQueue).to(directExchange).with("member");
		}

		/**
		 * 实现延迟队列,错误重试
		 * memberSyncEvaluationQueue队列绑定到customExchange交换机上,路由键为member.sync.evaluation.delay
		 * @param directExchange
		 * @param factoryEsQueue
		 * @return
		 */
		@Bean
		public Binding bindingFactoryEsQueueDelay(CustomExchange customExchange, Queue memberSyncEvaluationQueue) {
				return BindingBuilder.bind(memberSyncEvaluationQueue).to(customExchange).with("member.sync.evaluation.delay").noargs();
		}

}

4、消费者

@Component
public class FactoryReceiver {

		@Autowired
		private AmqpTemplate rabbitTemplate;

		@RabbitListener(queues = "member.sync.evaluation")
		public void esReceiver(Message message, Channel channel, @Header(value = "count", defaultValue = "0") int count) {
				try {
						//对应业务代码
						System.out.println("esReceiver++++++++++:" + message);

						//ack确认
						channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
				} catch (Exception e) {
						e.printStackTrace();
						try {
								//重试次数设置
								if(count <= 3){
										Map<String, Object> headers = new HashMap<>(1);
										//重试次数累加
										headers.put("count", count + 1);
										//延迟时间设置(毫秒)
										headers.put("x-delay", 5*1000);
										//参数1:路由名 参数2:延迟队列routingKey
										channel.basicPublish("delay.exchange","member.sync.evaluation.delay", false, new AMQP.BasicProperties.Builder()
														.headers(headers)
														.build(), message.getBody());
								}
								channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
						} catch (Exception e1) {
								e1.printStackTrace();
						}
				}
		}

}
作者:崔启旭  创建时间:2023-04-03 16:04
 更新时间:2024-11-15 23:02