Stream是一个只能追加内容的数据类型。也就是说Stream这种数据类型,我们对他的添加操作,只能是向Stream的末尾追加内容,不能在头部或者中间插入内容。那追加的是什么内容呢?Stream中追加的内容其实就是一个或多个key-value pair。这些键值对不必遵循相同的结构。每一次追加的键值对都可以不同。例如第一次追加name=hello的键值对,第二次也可以变成追加desc=word的键值对。
关于Redis Stream的文章满天飞,这里不再重复,但是这些文章只是告诉你如何发消息,收消息,但是没有说如果我们的业务处理过程中出现了问题,这些没有处理的消息,怎么再次处理?
在之前消息队列时提过一个死信队列,Kafka 消费端消费重试和死信队列 https://www.javacui.com/tool/686.html ,那么Redis中,如何处理一些异常消息呢?
要理解下面的代码,首先要连接如下一些命令,为了解决这个问题,我也是查询了官方文档后,写出了下面处理代码。
XADD key ID field string [field string ...]
将指定的流条目追加到指定key的流中。 如果key不存在,作为运行这个命令的副作用,将使用流的条目自动创建key。
一个条目是由一组键值对组成的,它基本上是一个小的字典。 键值对以用户给定的顺序存储,并且读取流的命令(如XRANGE 或者 XREAD) 可以保证按照通过XADD添加的顺序返回。
XADD是唯一可以向流添加数据的Redis命令,但是还有其他命令, 例如XDEL和XTRIM,他们能够从流中删除数据。
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
XREADGROUP命令是XREAD命令的特殊版本,支持消费者组。
如果没有消费者组,仅使用XREAD,所有客户端都将获得所有到达流的条目。相反,如果使用带有XREADGROUP的消费者组,则可以创建不同的客户端组来消费到达给定流的不同的部分。例如,如果流获得新的条目A,B和C,并且有两个消费者通过消费者组读取流,其中一个客户端将会得到例如,消息A和C,另外一个客户端得到消息B,等等,以此类推。
XPENDING key group [start end count] [consumer]
通过消费者组从流中获取数据,而不是确认这些数据,具有创建待处理条目的效果。
XACK命令会立即从待处理条目列表(PEL)中移除待处理条目,因为一旦消息被成功处理,消费者组就不再需要跟踪它并记住消息的当前所有者。
XRANGE key start end [COUNT count]
此命令返回流中满足给定ID范围的条目。范围由最小和最大ID指定。所有ID在指定的两个ID之间或与其中一个ID相等(闭合区间)的条目将会被返回。
特殊ID:- 和 +,特殊ID-和+分别表示流中可能的最小ID和最大ID
XACK key group ID [ID ...]
XACK命令用于从流的消费者组的待处理条目列表(简称PEL)中删除一条或多条消息。
一旦消费者成功地处理完一条消息,它应该调用XACK,这样这个消息就不会被再次处理, 且作为一个副作用,关于此消息的PEL条目也会被清除,从Redis服务器释放内存。
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
该命令用于管理流数据结构关联的消费者组。
这里使用Jedis进行编码测试,收到消息后没有手动ACK确认,用于演示处理此类数据,直接看代码
package com.example.springboot; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import redis.clients.jedis.Jedis; import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.params.XAddParams; import redis.clients.jedis.params.XPendingParams; import redis.clients.jedis.params.XReadGroupParams; import redis.clients.jedis.resps.StreamEntry; import redis.clients.jedis.resps.StreamPendingEntry; import java.time.LocalDateTime; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.IntStream; /** * 使用jedis实现监听stream消息 */ public class JedisStreamMQTest { private static String host = "127.0.0.1"; private static int port = 6379; private static String password = "l52u27lv1Jur"; // Stream名称 private static String streamKeyName = "Javacui"; // 组名称 private static String groupName = "g1"; // 消费者名称 private static String[] consumerNames = {"c1", "c2"}; public static void main(String[] args) { JedisStreamMQTest test = new JedisStreamMQTest(); //创建群组 test.createGroup(streamKeyName, groupName); // 写入测试数据 new Thread(() -> { Jedis jedis = new Jedis(host,port); jedis.auth(password); while (true) { try { Thread.sleep(1000L); Map<String, String> map = new HashMap<>(); map.put("CurrentTime", LocalDateTime.now().toString()); map.put("BlogUrl", "www.javacui.com"); map.put("BlogAuth", "java小强"); jedis.xadd(streamKeyName, map, XAddParams.xAddParams()); } catch (Exception e) { e.printStackTrace(); } } }).start(); // 处理业务数据 test.listenerByGroup(streamKeyName, groupName, consumerNames); // 处理未ACK的数据 test.listenerNoAck(streamKeyName, groupName); } /** * 使用群组和消费者监听,该监听可以确保消息不会重复消费,因为每个组每个用户只会消费一次消息 * 使用群组和消费者的概念读取,多个线程会重复消费数据 */ private void listenerByGroup(String keyName, String groupName, String... consumerNames) { Map<String, StreamEntryID> entryIDMap = new HashMap<>(); entryIDMap.put(keyName, StreamEntryID.UNRECEIVED_ENTRY); // 创建两个线程用于演示 IntStream.range(0, 2).forEach(i -> { new Thread(() -> { Jedis jedis = new Jedis(host,port); jedis.auth(password); while (true) { try { //下面的 xreadGroup 方法等同与redis中的xreadgroup命令,将block阻塞时间设置为0表示一直阻塞知道收到消息,然后上面StreamEntryID设置为接收最新值 List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xreadGroup(groupName, consumerNames[i], XReadGroupParams.xReadGroupParams().block(0), entryIDMap); if (null != entries && !entries.isEmpty()) { for (Map.Entry<String, List<StreamEntry>> entry : entries) { System.out.println("收到数据:" + JSON.toJSONString(entry)); } } // 这里没有ACK,用于演示业务处理过程中产生异常后,由其他线程处理这些数据 } catch (Exception e) { e.printStackTrace(); } } }).start(); }); } /** * 用于处理已经读取但是没有被ACK的数据 */ private void listenerNoAck(String keyName, String groupName) { new Thread(() -> { Jedis jedis = new Jedis(host,port); jedis.auth(password); while (true) { // 通过消费者组从流中获取数据,而不是确认这些数据,具有创建待处理条目的效果 // 查询待ACK的条目,注意必须是有客户端XREAD过但是没有ACK,这里取一条,会返回待处理所有数据的最前面一条 List<StreamPendingEntry> list = jedis.xpending(keyName, groupName, XPendingParams.xPendingParams("-", "+", 1)); if(null != list && !list.isEmpty()) { // 通过最前面一条数据的ID,然后查该ID到最后的所有待ACK数据,一次提取10条 List<StreamEntry> streamEntryList = jedis.xrange(keyName, list.get(0).getID().getTime() + "", "+", 10); for (StreamEntry entry : streamEntryList) { System.out.println("收到待处理数据:" + JSONObject.toJSONString(entry)); // 确认消息,一旦消费者成功地处理完一条消息,它应该调用XACK,这样这个消息就不会被再次处理 // 且作为一个副作用,关于此消息的PEL条目也会被清除,从Redis服务器释放内存 jedis.xack(keyName, groupName, entry.getID()); } } } }).start(); } /** * 创建Group */ private void createGroup(String keyName, String groupName) { Jedis jedis = new Jedis(host,port); jedis.auth(password); try { jedis.xgroupCreate(keyName, groupName, StreamEntryID.LAST_ENTRY, true); } catch (Exception e) { //这里捕获异常的原因是可能创建时群组已经存在 } } }
END
Java小强
未曾清贫难成人,不经打击老天真。
自古英雄出炼狱,从来富贵入凡尘。
发表评论: