Stream是一个只能追加内容的数据类型。也就是说Stream这种数据类型,我们对他的添加操作,只能是向Stream的末尾追加内容,不能在头部或者中间插入内容。那追加的是什么内容呢?Stream中追加的内容其实就是一个或多个key-value pair。这些键值对不必遵循相同的结构。每一次追加的键值对都可以不同。例如第一次追加name=hello的键值对,第二次也可以变成追加desc=word的键值对。

关于Redis Stream的文章满天飞,这里不再重复,但是这些文章只是告诉你如何发消息,收消息,但是没有说如果我们的业务处理过程中出现了问题,这些没有处理的消息,怎么再次处理?
在之前消息队列时提过一个死信队列,Kafka 消费端消费重试和死信队列 https://www.javacui.com/tool/686.html ,那么Redis中,如何处理一些异常消息呢?
要理解下面的代码,首先要连接如下一些命令,为了解决这个问题,我也是查询了官方文档后,写出了下面处理代码。
XADD key ID field string [field string ...]
将指定的流条目追加到指定key的流中。 如果key不存在,作为运行这个命令的副作用,将使用流的条目自动创建key。
一个条目是由一组键值对组成的,它基本上是一个小的字典。 键值对以用户给定的顺序存储,并且读取流的命令(如XRANGE 或者 XREAD) 可以保证按照通过XADD添加的顺序返回。
XADD是唯一可以向流添加数据的Redis命令,但是还有其他命令, 例如XDEL和XTRIM,他们能够从流中删除数据。
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
XREADGROUP命令是XREAD命令的特殊版本,支持消费者组。
如果没有消费者组,仅使用XREAD,所有客户端都将获得所有到达流的条目。相反,如果使用带有XREADGROUP的消费者组,则可以创建不同的客户端组来消费到达给定流的不同的部分。例如,如果流获得新的条目A,B和C,并且有两个消费者通过消费者组读取流,其中一个客户端将会得到例如,消息A和C,另外一个客户端得到消息B,等等,以此类推。
XPENDING key group [start end count] [consumer]
通过消费者组从流中获取数据,而不是确认这些数据,具有创建待处理条目的效果。
XACK命令会立即从待处理条目列表(PEL)中移除待处理条目,因为一旦消息被成功处理,消费者组就不再需要跟踪它并记住消息的当前所有者。
XRANGE key start end [COUNT count]
此命令返回流中满足给定ID范围的条目。范围由最小和最大ID指定。所有ID在指定的两个ID之间或与其中一个ID相等(闭合区间)的条目将会被返回。
特殊ID:- 和 +,特殊ID-和+分别表示流中可能的最小ID和最大ID
XACK key group ID [ID ...]
XACK命令用于从流的消费者组的待处理条目列表(简称PEL)中删除一条或多条消息。
一旦消费者成功地处理完一条消息,它应该调用XACK,这样这个消息就不会被再次处理, 且作为一个副作用,关于此消息的PEL条目也会被清除,从Redis服务器释放内存。
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
该命令用于管理流数据结构关联的消费者组。
这里使用Jedis进行编码测试,收到消息后没有手动ACK确认,用于演示处理此类数据,直接看代码
package com.example.springboot;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.resps.StreamEntry;
import redis.clients.jedis.resps.StreamPendingEntry;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
/**
* 使用jedis实现监听stream消息
*/
public class JedisStreamMQTest {
private static String host = "127.0.0.1";
private static int port = 6379;
private static String password = "l52u27lv1Jur";
// Stream名称
private static String streamKeyName = "Javacui";
// 组名称
private static String groupName = "g1";
// 消费者名称
private static String[] consumerNames = {"c1", "c2"};
public static void main(String[] args) {
JedisStreamMQTest test = new JedisStreamMQTest();
//创建群组
test.createGroup(streamKeyName, groupName);
// 写入测试数据
new Thread(() -> {
Jedis jedis = new Jedis(host,port);
jedis.auth(password);
while (true) {
try {
Thread.sleep(1000L);
Map<String, String> map = new HashMap<>();
map.put("CurrentTime", LocalDateTime.now().toString());
map.put("BlogUrl", "www.javacui.com");
map.put("BlogAuth", "java小强");
jedis.xadd(streamKeyName, map, XAddParams.xAddParams());
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
// 处理业务数据
test.listenerByGroup(streamKeyName, groupName, consumerNames);
// 处理未ACK的数据
test.listenerNoAck(streamKeyName, groupName);
}
/**
* 使用群组和消费者监听,该监听可以确保消息不会重复消费,因为每个组每个用户只会消费一次消息
* 使用群组和消费者的概念读取,多个线程会重复消费数据
*/
private void listenerByGroup(String keyName, String groupName, String... consumerNames) {
Map<String, StreamEntryID> entryIDMap = new HashMap<>();
entryIDMap.put(keyName, StreamEntryID.UNRECEIVED_ENTRY);
// 创建两个线程用于演示
IntStream.range(0, 2).forEach(i -> {
new Thread(() -> {
Jedis jedis = new Jedis(host,port);
jedis.auth(password);
while (true) {
try {
//下面的 xreadGroup 方法等同与redis中的xreadgroup命令,将block阻塞时间设置为0表示一直阻塞知道收到消息,然后上面StreamEntryID设置为接收最新值
List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xreadGroup(groupName, consumerNames[i],
XReadGroupParams.xReadGroupParams().block(0), entryIDMap);
if (null != entries && !entries.isEmpty()) {
for (Map.Entry<String, List<StreamEntry>> entry : entries) {
System.out.println("收到数据:" + JSON.toJSONString(entry));
}
}
// 这里没有ACK,用于演示业务处理过程中产生异常后,由其他线程处理这些数据
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
});
}
/**
* 用于处理已经读取但是没有被ACK的数据
*/
private void listenerNoAck(String keyName, String groupName) {
new Thread(() -> {
Jedis jedis = new Jedis(host,port);
jedis.auth(password);
while (true) {
// 通过消费者组从流中获取数据,而不是确认这些数据,具有创建待处理条目的效果
// 查询待ACK的条目,注意必须是有客户端XREAD过但是没有ACK,这里取一条,会返回待处理所有数据的最前面一条
List<StreamPendingEntry> list = jedis.xpending(keyName, groupName, XPendingParams.xPendingParams("-", "+", 1));
if(null != list && !list.isEmpty()) {
// 通过最前面一条数据的ID,然后查该ID到最后的所有待ACK数据,一次提取10条
List<StreamEntry> streamEntryList = jedis.xrange(keyName, list.get(0).getID().getTime() + "", "+", 10);
for (StreamEntry entry : streamEntryList) {
System.out.println("收到待处理数据:" + JSONObject.toJSONString(entry));
// 确认消息,一旦消费者成功地处理完一条消息,它应该调用XACK,这样这个消息就不会被再次处理
// 且作为一个副作用,关于此消息的PEL条目也会被清除,从Redis服务器释放内存
jedis.xack(keyName, groupName, entry.getID());
}
}
}
}).start();
}
/**
* 创建Group
*/
private void createGroup(String keyName, String groupName) {
Jedis jedis = new Jedis(host,port);
jedis.auth(password);
try {
jedis.xgroupCreate(keyName, groupName, StreamEntryID.LAST_ENTRY, true);
} catch (Exception e) {
//这里捕获异常的原因是可能创建时群组已经存在
}
}
}END
Java小强
未曾清贫难成人,不经打击老天真。
自古英雄出炼狱,从来富贵入凡尘。
发表评论: