1. 前言

https://lolico.me/2020/06/28/Using-stream-to-implement-message-queue-in-springboot/
https://docs.spring.io/spring-data/redis/docs/2.3.6.RELEASE/reference/html/#redis.streams
https://gitee.com/chunanyong/springrain/tree/master/springrain-frame-cache-redis

Redis5新增了一个Stream的数据类型,这个类型作为消息队列来使用时弥补了List和Pub/Sub的不足并且提供了更强大的功能,比如ack机制以及消费者组等概念,在有轻量消息队列使用需求时,使用这个新类型那是再好不过了.
注意:SpringBoot版本需要大于2.2(即spring-boot-starter-data-redis需要大于2.2).Redis推荐6.0+

软件环节越少,运维成本越小,架构越简单,springrain中使用redis做了 缓存,分布式锁,原子计数器,消息队列.后期考虑基于redisearch实现全文检索.

2.项目实现

基于 spring-boot-starter-data-redis实现

1
2
3
4
5
6
7
##application.yml中配置redis
spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password:   ###密码,可以不设置####
    timeout: 7200000

pom中依赖spring-boot-starter-data-redis

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
 <!-- 整体依赖 springboot -->
<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.4.4</version>
  <relativePath/> <!-- lookup parent from repository -->
</parent>


<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- 使用fst序列化实现 -->
<dependency>
  <groupId>de.ruedigermoeller</groupId>
  <artifactId>fst</artifactId>
  <version>2.57</version>
 </dependency>

RedisCacheConfig的代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
 * 缓存的配置,自定义 cacheManager 用于实现替换.
 *
 * @author springrain
 */
@Configuration("configuration-RedisCacheConfig")
public class RedisCacheConfig {

    @Resource
    private RedisConnectionFactory redisConnectionFactory;

    // 序列化配置 解析任意对象
    public static FstSerializer fstSerializer =  new FstSerializer();
    // 2.序列化String类型
    public static StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

    /**
     * 实际使用的redisTemplate,可以注入到代码中,操作redis
     * @return
     */
    @Bean("redisTemplate")
    public RedisTemplate<String, Object> redisTemplate() {

        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        // 连接工厂
        redisTemplate.setConnectionFactory(redisConnectionFactory);


        //设置默认的序列化器
        redisTemplate.setDefaultSerializer(fstSerializer);

        // value序列化方式采用fstSerializer
        redisTemplate.setValueSerializer(fstSerializer);
        // hash的value序列化方式采用fstSerializer
        redisTemplate.setHashValueSerializer(fstSerializer);


        // key采用String的序列化方式
        redisTemplate.setKeySerializer(stringRedisSerializer);
        // hash的key也采用String的序列化方式
        redisTemplate.setHashKeySerializer(stringRedisSerializer);


        redisTemplate.afterPropertiesSet();

        return redisTemplate;
    }



    /**
     * 基于redis的cacheManager,使用spring-data-redis的RedisCacheManager
     *
     * @return CacheManager 缓存管理器
     */

    @Bean("cacheManager")
    public CacheManager cacheManager() {
        RedisCacheManager redisCacheManager =
                RedisCacheManager.builder(redisConnectionFactory)
                        .cacheDefaults(defaultCacheConfig(-1))
                        .transactionAware()
                        .build();
        return  redisCacheManager;
    }

    /**
     * 默认的配置
     * @param millis 默认的超时时间,单位毫秒
     * @return RedisCacheConfiguration 默认的配置
     */
    private RedisCacheConfiguration defaultCacheConfig(long millis) {

        //默认配置
        RedisCacheConfiguration defaultCacheConfig = RedisCacheConfiguration.defaultCacheConfig();

        //设置默认的失效时间,单位毫秒
        if (millis>0) {
            defaultCacheConfig.entryTtl(Duration.ofMillis(millis));
        }
        //设置序列化方式
        defaultCacheConfig.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(stringRedisSerializer))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(fstSerializer))
                .disableCachingNullValues();
        return defaultCacheConfig;
    }
}

AbstractMessageProducerConsumerListener.java 抽象类实现的监听器,接口无法注入spring的bean,使用抽象类作为隔离Redis API的方式,方便后期更换MQ实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364

/**
 * 因为接口不能注入springBean,使用抽象类实现,主要用于隔离了Redis Stream API,方便后期更换MQ的实现.
 * 如果未确认消息消费,Redis Stream 暂时没有重试的API,项目启动时使用 retryFailMessage() 重试一次,业务代码可以自行调度retryFailMessage()方法
 * 使用生产消费的group模式,用于多个消费者并行消费,只有group模式才有ack应答.如果要订阅发布,每个客户端创建一个group变通实现.
 * 订阅发布模式,使用 $ 符号订阅最新的消息,目前监听器存在问题,不能正常消费,原因待查
 * 子类继承之后注入,需要使用IMessageProducerConsumerListener接口,例如
 * <code>
 * @Component("userMessageProducerConsumerListener")
 * public class UserMessageProducerConsumerListener extends AbstractMessageProducerConsumerListener<User>
 * </code>
 *
 * <code>
 * @Resource
 * IMessageProducerConsumerListener<User> userMessageProducerConsumerListener;
 * </code>
 * @param <T> 需要放入队列的对象
 */
public abstract class AbstractMessageProducerConsumerListener<T> implements StreamListener<String, ObjectRecord<String, T>>, IMessageProducerConsumerListener<T>,Closeable {
    private Logger logger = LoggerFactory.getLogger(getClass());
    //默认的线程池
    //private final Executor defaultExecutor = new SimpleAsyncTaskExecutor();

    // 默认batchSize
    private final int defaultBatchSize=100;

    //泛型的类型
    private final Class<T> genericClass = ClassUtils.getActualTypeGenericSuperclass(getClass());

    //监听的容器
    private StreamMessageListenerContainer<String, ObjectRecord<String, T>> container = null;

    @Resource
    private RedisConnectionFactory redisConnectionFactory;

    @Resource
    private RedisTemplate redisTemplate;

    /**
     * 消息队列的名称,redis里就是stream的名称
     *
     * @return
     */
    public abstract String getQueueName();

    /**
     * 批量消费的数量
     *
     * @return
     */
    public int getBatchSize() {
        return defaultBatchSize;
    }

    /**
     * 消费者的名称
     *
     * @return
     */
    public abstract String getConsumerName();

    /**
     * group的名称,如果为空,默认是 getQueueName()+"_defaultGroupName"
     *
     * @return
     */
    public String getGroupName() {
        return getQueueName() + "_defaultGroupName";
    }

    /**
     * 指定监听器的线程池
     *
     * @return
     */
    public Executor getExecutor() {
        return null;
    }

    /**
     * spring-data-redis 实现的 stream 原生消费者回调方法,依赖Redis ObjectRecord API,业务中不要直接调用!!!!!!.
     * 使用自行实现的onMessage(T value, String queueName, String messageId, Long messageTime) 方法
     *
     * @param message 需要消费者处理的消息
     */
    @Override
    public void onMessage(ObjectRecord<String, T> message) {
        try {
            RecordId recordId = messageSuccessRecordId(message);
            if (recordId != null) {
                //消息确认ack
                redisTemplate.opsForStream().acknowledge(getQueueName(), getGroupName(), recordId);
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }


    }


    /**
     * 消费消息,隔离Redis API,如果返回true则自动应答,如果返回false,认为消息处理失败
     *
     * @param messageObjectDto
     * @return
     */
    @Override
    public abstract boolean onMessage(MessageObjectDto<T> messageObjectDto) throws Exception;

    /**
     * 初始化监听器
     */
    @PostConstruct
    private void registerConsumerListener() {
        try {

            String className = getClass().toString();
            if (StringUtils.isBlank(getQueueName())) {
                logger.error(className + "的getQueueName()为空,registerConsumerListener()方法执行失败.");
                return;
            }
            if (StringUtils.isBlank(getGroupName())) {
                logger.error(className + "的getGroupName()为空,registerConsumerListener()方法执行失败.");
                return;
            }
            if (StringUtils.isBlank(getConsumerName())) {
                logger.error(className + "的getConsumerName()为空,registerConsumerListener()方法执行失败.");
                return;
            }


            int batchSize = getBatchSize();
            if (batchSize < 1) {
                batchSize = defaultBatchSize;
            }

            Executor executor = getExecutor();
            if (executor == null) {
                executor = new SimpleAsyncTaskExecutor();
            }

            // 增加自定义的 BytesToTimestampConverter 类型转换器.
            // spring jdbc 把 datetime 类型解析成了 java.sql.timestamp,spring-data-redis并没用提供BytesToTimestampConverter,造成无法转换类型
            CustomConversions customConversions = new RedisCustomConversions(Arrays.asList(new BytesToTimestampConverter()));
            // 使用 ObjectHashMapper 构造函数 注册自定义的转换器
            ObjectHashMapper objectHashMapper= new ObjectHashMapper(customConversions);


            //监听器的配置项
            StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, T>> options =
                    StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                            .batchSize(batchSize) //一批次拉取的最大count数
                            .executor(executor)  //线程池
                            .pollTimeout(Duration.ZERO) //阻塞式轮询
                            //设置默认的序列化器,要和 redisTemplate 保持一致!!!!!!!!!!!!!!!!!!!!!
                            //默认 targetType 会设置序列化器是  RedisSerializer.byteArray,这里手动初始化objectMapper,并设置自定义转换器和序列化器.
                            .objectMapper(objectHashMapper)
                            .keySerializer(RedisCacheConfig.stringRedisSerializer)
                            .hashKeySerializer(RedisCacheConfig.stringRedisSerializer)
                            .hashValueSerializer(RedisCacheConfig.fstSerializer)
                            //.serializer(RedisCacheConfig.fstSerializer)
                            .targetType(genericClass) //目标类型(消息内容的类型),如果objectMapper为空,会设置默认的ObjectHashMapper
                            .build();
            container = StreamMessageListenerContainer.create(redisConnectionFactory, options);

            //检查创建group组
            prepareChannelAndGroup(redisTemplate.opsForStream(), getQueueName(), getGroupName());

            // 通过xread命令也就是非消费者组模式直接读取,或者使用xreadgroup命令在消费者组中命令一个消费者去消费一条记录,
            // 我们可以通过0.>.$分别表示第一条记录.最后一次未被消费的记录和最新一条记录,
            // 比如创建消费者组时不能使用>表示最后一次未被消费的记录,比如0表示从第一条开始并且包括第一条,
            // $表示从最新一条开始但并不是指当前Stream的最后一条记录,是表示下一个xadd添加的那一条记录,所以说$在非消费者组模式的阻塞读取下才有意义!


            // 消费者
            Consumer consumer = Consumer.from(getGroupName(), getConsumerName());

            // 需要手动回复应答 ACK
            // container.receive(consumer, StreamOffset.fromStart(getQueueName()), this);
            // container.receive(consumer, StreamOffset.create(getQueueName(),ReadOffset.latest()), this);
            container.receive(consumer, StreamOffset.create(getQueueName(), ReadOffset.lastConsumed()), this);
            container.start();


            //开启线程,重试异常的消息
            executor.execute(() -> {
                //重试失败的消息
                try {
                    retryFailMessage();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            });


        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }

    }


    /**
     * 重试消息,项目启动时会重试一次,业务代码自行实现根据调度重试
     * 避免死循环,最多1000次.如果单次返回的所有消息都是异常的,终止重试.
     * 如果全部重试成功,返回null.如果还有部分失败,就返回失败的消息记录
     *
     * @return 返回重试失败的消息记录对象
     */
    @Override
    public List<MessageObjectDto<T>> retryFailMessage() throws Exception {

        int batchSize = getBatchSize();
        if (batchSize < 1) {
            batchSize = defaultBatchSize;
        }
        //消费者
        Consumer consumer = Consumer.from(getGroupName(), getConsumerName());
        //设置配置
        StreamReadOptions streamReadOptions = StreamReadOptions.empty().count(batchSize).block(Duration.ofSeconds(5));
        List<ObjectRecord<String, T>> retryFailMessageList = new ArrayList<>();
        //避免死循环,最多1000次.如果单次返回的所有消息都是异常的,退出循环
        for (int i = 0; i < 1000; i++) {
            List<ObjectRecord<String, T>> readList = redisTemplate.opsForStream().read(genericClass, consumer, streamReadOptions, StreamOffset.fromStart(getQueueName()));
            //如果已经没有异常的消息,退出循环
            if (CollectionUtils.isEmpty(readList)) {
                break;
            }
            //如果返回的消息全部都是异常的,退出循环
            if (retryFailMessageList.containsAll(readList)) {
                break;
            }

            // 遍历异常的消息
            for (ObjectRecord<String, T> message : readList) {
                RecordId recordId = messageSuccessRecordId(message);
                //处理成功
                if (recordId != null) {
                    //消息确认ack
                    redisTemplate.opsForStream().acknowledge(getQueueName(), getGroupName(), recordId);
                } else {//处理失败,记录下来
                    retryFailMessageList.add(message);
                }
            }
        }
        // 没有失败的消息记录
        if (CollectionUtils.isEmpty(retryFailMessageList)) {
            return null;
        }
        //返回处理异常的消息
        List<MessageObjectDto<T>> retryFailMessageObjectList = new ArrayList<>();
        for (ObjectRecord<String, T> message : retryFailMessageList) {
            retryFailMessageObjectList.add(objectRecord2MessageObject(message));
        }
        return retryFailMessageObjectList;
    }


    /**
     * 在初始化容器时,如果key对应的stream或者group不存在时会抛出异常,所以我们需要提前检查并且初始化.
     *
     * @param ops
     * @param queueName
     * @param group
     */
    private void prepareChannelAndGroup(StreamOperations<String, ?, ?> ops, String queueName, String group) {
        String status = "OK";
        try {
            StreamInfo.XInfoGroups groups = ops.groups(queueName);
            if (groups.stream().noneMatch(xInfoGroup -> group.equals(xInfoGroup.groupName()))) {
                //status = ops.createGroup(queueName, group);
                status = ops.createGroup(queueName, ReadOffset.from("0-0"), group);
            }
        } catch (Exception exception) {
            RecordId initialRecord = ops.add(ObjectRecord.create(queueName, "Initial Record"));
            Assert.notNull(initialRecord, "Cannot initialize stream with key '" + queueName + "'");
            status = ops.createGroup(queueName, ReadOffset.from(initialRecord), group);
        } finally {
            Assert.isTrue("OK".equals(status), "Cannot create group with name '" + group + "'");
        }
    }


    /**
     * 生产者向消息队列发送消息
     *
     * @param message
     * @return
     */
    @Override
    public MessageObjectDto<T> sendProducerMessage(T message) throws Exception {
        if (message == null) {
            return null;
        }

        try {
            ObjectRecord<String, T> record = Record.of(message).withStreamKey(getQueueName());
            //StreamRecords.newRecord()
            //ObjectRecord record = Record.of(message).withStreamKey(queueName);
            RecordId recordId = redisTemplate.opsForStream().add(record);
            // return recordId.getValue();
            return new MessageObjectDto<T>(message, getQueueName(), recordId.getValue(), recordId.getTimestamp());
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw e;
            //return null;
        }
    }

    /**
     * 消息消费是否成功,成功返回RecordId,失败返回null
     *
     * @param message
     * @return
     */
    private RecordId messageSuccessRecordId(ObjectRecord<String, T> message) {
        RecordId recordId = message.getId();
        MessageObjectDto<T> messageObjectRecord = objectRecord2MessageObject(message);
        try {
            boolean ok = onMessage(messageObjectRecord);
            if (ok) {
                return recordId;
            } else {
                return null;
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return null;
        }
    }


    /**
     * ObjectRecord2MessageObject 类型转换
     *
     * @param message
     * @return
     */
    private MessageObjectDto<T> objectRecord2MessageObject(ObjectRecord<String, T> message) {
        RecordId recordId = message.getId();
        String messageId = recordId.getValue();
        Long messageTime = recordId.getTimestamp();
        String queueName = message.getStream();
        T messageObject = message.getValue();

        MessageObjectDto<T> messageObjectRecord = new MessageObjectDto<>(messageObject, queueName, messageId, messageTime);


        return messageObjectRecord;

    }


    @Override
    public void close() throws IOException {
        if (container != null) {
            container.stop();
        }

    }

}

  1. 使用

子类监听器,声明为SpringBean,例如 @Component("userMessageProducerConsumerListener")

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component("userMessageProducerConsumerListener")
public class UserMessageProducerConsumerListener extends AbstractMessageProducerConsumerListener<User> {
    @Override
    public String getQueueName() {
        return "queue_user";
    }

    @Override
    public String getConsumerName() {
        return "consumer_user";
    }

    @Override
    public boolean onMessage(MessageObjectDto<User> messageObjectRecord) throws Exception {
        messageObjectRecord.getMessageObject();
        return true;
    }


}


Controller中注入子类的监听器,注意使用IMessageProducerConsumerListener接口声明

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
    @Resource
    private IMessageProducerConsumerListener<User> userMessageProducerConsumerListener;

    /**
     * 健康检查
     *
     * @return
     */
    @RequestMapping(value = "/checkHealth", method = RequestMethod.GET)
    public ReturnDatas<String> checkHealth() {
        User user=new User();
        user.setAccount("1111111111111");
        user.setPassword("2222222222222");
        //发送到消息队列
        userMessageProducerConsumerListener.sendProducerMessage(user);
       //重试消费失败的消息
       //userConsumerListener.retryFailMessage();
        return ReturnDatas.getSuccessReturnDatas();
    }

4.重试

Redis Stream 暂时没有重试的API,项目启动时使用 retryFailMessage() 重试一次,业务代码可以自行调度retryFailMessage()方法.
避免死循环,最多1000次.如果单次返回的所有消息都是异常的,终止重试.

1
2
  //重试消费失败的消息,返回重试再次失败的消息
  List<MessageObjectDto<User>> retryFailMessageObjectList=userMessageProducerConsumerListener.retryFailMessage();