Java小强个人技术博客站点    手机版
当前位置: 首页 >> DB >> Redis中处理处理没有ACK确认的Stream

Redis中处理处理没有ACK确认的Stream

15980 DB | 2023-3-8

Stream是一个只能追加内容的数据类型。也就是说Stream这种数据类型,我们对他的添加操作,只能是向Stream的末尾追加内容,不能在头部或者中间插入内容。那追加的是什么内容呢?Stream中追加的内容其实就是一个或多个key-value pair。这些键值对不必遵循相同的结构。每一次追加的键值对都可以不同。例如第一次追加name=hello的键值对,第二次也可以变成追加desc=word的键值对。


关于Redis Stream的文章满天飞,这里不再重复,但是这些文章只是告诉你如何发消息,收消息,但是没有说如果我们的业务处理过程中出现了问题,这些没有处理的消息,怎么再次处理
在之前消息队列时提过一个死信队列,Kafka 消费端消费重试和死信队列 https://www.javacui.com/tool/686.html  ,那么Redis中,如何处理一些异常消息呢?
要理解下面的代码,首先要连接如下一些命令,为了解决这个问题,我也是查询了官方文档后,写出了下面处理代码。

XADD key ID field string [field string ...]
将指定的流条目追加到指定key的流中。 如果key不存在,作为运行这个命令的副作用,将使用流的条目自动创建key。
一个条目是由一组键值对组成的,它基本上是一个小的字典。 键值对以用户给定的顺序存储,并且读取流的命令(如XRANGE 或者 XREAD) 可以保证按照通过XADD添加的顺序返回。
XADD是唯一可以向流添加数据的Redis命令,但是还有其他命令, 例如XDEL和XTRIM,他们能够从流中删除数据。

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
XREADGROUP命令是XREAD命令的特殊版本,支持消费者组。
如果没有消费者组,仅使用XREAD,所有客户端都将获得所有到达流的条目。相反,如果使用带有XREADGROUP的消费者组,则可以创建不同的客户端组来消费到达给定流的不同的部分。例如,如果流获得新的条目A,B和C,并且有两个消费者通过消费者组读取流,其中一个客户端将会得到例如,消息A和C,另外一个客户端得到消息B,等等,以此类推。

XPENDING key group [start end count] [consumer]
通过消费者组从流中获取数据,而不是确认这些数据,具有创建待处理条目的效果。
XACK命令会立即从待处理条目列表(PEL)中移除待处理条目,因为一旦消息被成功处理,消费者组就不再需要跟踪它并记住消息的当前所有者。

XRANGE key start end [COUNT count]
此命令返回流中满足给定ID范围的条目。范围由最小和最大ID指定。所有ID在指定的两个ID之间或与其中一个ID相等(闭合区间)的条目将会被返回。
特殊ID:- 和 +,特殊ID-和+分别表示流中可能的最小ID和最大ID

XACK key group ID [ID ...]
XACK命令用于从流的消费者组的待处理条目列表(简称PEL)中删除一条或多条消息。
一旦消费者成功地处理完一条消息,它应该调用XACK,这样这个消息就不会被再次处理, 且作为一个副作用,关于此消息的PEL条目也会被清除,从Redis服务器释放内存

XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
该命令用于管理流数据结构关联的消费者组。


这里使用Jedis进行编码测试,收到消息后没有手动ACK确认,用于演示处理此类数据,直接看代码

package com.example.springboot;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.resps.StreamEntry;
import redis.clients.jedis.resps.StreamPendingEntry;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
/**
 * 使用jedis实现监听stream消息
 */
public class JedisStreamMQTest {
    private static String host = "127.0.0.1";
    private static int port = 6379;
    private static String password = "l52u27lv1Jur";
    // Stream名称
    private static String streamKeyName = "Javacui";
    // 组名称
    private static String groupName = "g1";
    // 消费者名称
    private static String[] consumerNames = {"c1", "c2"};
    public static void main(String[] args) {
        JedisStreamMQTest test = new JedisStreamMQTest();
        //创建群组
        test.createGroup(streamKeyName, groupName);
        // 写入测试数据
        new Thread(() -> {
            Jedis jedis = new Jedis(host,port);
            jedis.auth(password);
            while (true) {
                try {
                    Thread.sleep(1000L);
                    Map<String, String> map = new HashMap<>();
                    map.put("CurrentTime", LocalDateTime.now().toString());
                    map.put("BlogUrl", "www.javacui.com");
                    map.put("BlogAuth", "java小强");
                    jedis.xadd(streamKeyName, map, XAddParams.xAddParams());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        // 处理业务数据
        test.listenerByGroup(streamKeyName, groupName, consumerNames);
        // 处理未ACK的数据
        test.listenerNoAck(streamKeyName, groupName);
    }
    /**
     * 使用群组和消费者监听,该监听可以确保消息不会重复消费,因为每个组每个用户只会消费一次消息
     * 使用群组和消费者的概念读取,多个线程会重复消费数据
     */
    private void listenerByGroup(String keyName, String groupName, String... consumerNames) {
        Map<String, StreamEntryID> entryIDMap = new HashMap<>();
        entryIDMap.put(keyName, StreamEntryID.UNRECEIVED_ENTRY);
        // 创建两个线程用于演示
        IntStream.range(0, 2).forEach(i -> {
            new Thread(() -> {
                Jedis jedis = new Jedis(host,port);
                jedis.auth(password);
                while (true) {
                    try {
                        //下面的 xreadGroup 方法等同与redis中的xreadgroup命令,将block阻塞时间设置为0表示一直阻塞知道收到消息,然后上面StreamEntryID设置为接收最新值
                        List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xreadGroup(groupName, consumerNames[i],
                                XReadGroupParams.xReadGroupParams().block(0), entryIDMap);
                        if (null != entries && !entries.isEmpty()) {
                            for (Map.Entry<String, List<StreamEntry>> entry : entries) {
                                System.out.println("收到数据:" + JSON.toJSONString(entry));
                            }
                        }
                        // 这里没有ACK,用于演示业务处理过程中产生异常后,由其他线程处理这些数据
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        });
    }
    /**
     * 用于处理已经读取但是没有被ACK的数据
     */
    private void listenerNoAck(String keyName, String groupName) {
        new Thread(() -> {
            Jedis jedis = new Jedis(host,port);
            jedis.auth(password);
            while (true) {
                // 通过消费者组从流中获取数据,而不是确认这些数据,具有创建待处理条目的效果
                // 查询待ACK的条目,注意必须是有客户端XREAD过但是没有ACK,这里取一条,会返回待处理所有数据的最前面一条
                List<StreamPendingEntry> list = jedis.xpending(keyName, groupName, XPendingParams.xPendingParams("-", "+", 1));
                if(null != list && !list.isEmpty()) {
                    // 通过最前面一条数据的ID,然后查该ID到最后的所有待ACK数据,一次提取10条
                    List<StreamEntry> streamEntryList = jedis.xrange(keyName, list.get(0).getID().getTime() + "", "+", 10);
                    for (StreamEntry entry : streamEntryList) {
                        System.out.println("收到待处理数据:" + JSONObject.toJSONString(entry));
                        // 确认消息,一旦消费者成功地处理完一条消息,它应该调用XACK,这样这个消息就不会被再次处理
                        // 且作为一个副作用,关于此消息的PEL条目也会被清除,从Redis服务器释放内存
                        jedis.xack(keyName, groupName, entry.getID());
                    }
                }
            }
        }).start();
    }
    /**
     * 创建Group
     */
    private void createGroup(String keyName, String groupName) {
        Jedis jedis = new Jedis(host,port);
        jedis.auth(password);
        try {
            jedis.xgroupCreate(keyName, groupName, StreamEntryID.LAST_ENTRY, true);
        } catch (Exception e) {
            //这里捕获异常的原因是可能创建时群组已经存在
        }
    }
}


END

推荐您阅读更多有关于“ 队列 stream redis 消息 确认 ”的文章

上一篇:HuTool工具箱验证JWT生成Token失败 下一篇:Redis发布订阅subscribe/publish模式

猜你喜欢

发表评论: