Rocketmq广播消费模式怎么扩展消费者

试想一下:

        某消费者组中只有两个Consumer,他们订阅了同一个Topic,他们消费逻辑不同,使用的是广播消费模式,组中每个Consumer都会收到该topic下的消息。

        此时由于某种原因你想扩展一下这两个Consumer机器,提升一下其他代码(两台Consumer除消费消息外还有其他功能)性能,于是你一个Consumer加了一台。

        然后你会发现你Consumer重复消费消息了,如果消费逻辑不满足幂等性的话还会出重复消费错误,而且broker发送该topic消息多增加了一倍。也就是此时广播消费模式不利于消费者的可扩展性?所以我也网上找了一下,说广播模式多用于更新每台机器的内存级缓存(小白一个,还有其他功能,希望各位指教一下!!)。

        那有没有利于扩展的广播模式呢?有的,就是利用多个consumer组采用负载均衡消费模式订阅同一个topic,看下方:

  代码如下:

  1. @Component
  2. @RocketMQMessageListener(topic = "topic1",consumerGroup="group1")
  3. public class MessageListener1_1 implements RocketMQListener<String> {
  4. @Override
  5. public void onMessage(String str) {
  6. System.out.println(Thread.currentThread().getName()+"消费完成:"+str);
  7. }
  8. }
  1. @Component
  2. @RocketMQMessageListener(topic = "topic1",consumerGroup="group2")
  3. public class MessageListener1_2 implements RocketMQListener<String> {
  4. @Override
  5. public void onMessage(String str) {
  6. System.out.println(Thread.currentThread().getName()+"消费完成:"+str);
  7. }
  8. }

结果如下,可以看出不需要广播模式就能都消费到。

 如果两个消费者组订阅了一个topic,那么两个消费者组都能消费到同一条消息,就相当于广播模式了,且两个组中的消费逻辑可以不同,这样各组的横向扩展便有了意义。