Java小强个人技术博客站点    手机版
当前位置: 首页 >> DB >> Redis发布订阅subscribe/publish模式

Redis发布订阅subscribe/publish模式

9620 DB | 2023-3-8

在官网的文档介绍中有一行介绍:Redis是一个快速稳定的发布/订阅消息系统。
Redis提供了发布与订阅的功能,可以用于消息的传输,Redis的发布订阅机制包括三部分,发布者、订阅者和Channel(主题或者队列)。


其原生命令以及相关介绍
http://www.redis.cn/topics/pubsub.html  。

Redis可以提供基本的发布订阅功能,但毕竟不像消息队列那种专业级别,所以会存在以下缺点:
redis无法对消息持久化存储,消息一旦被发送,如果没有订阅者接收,数据会丢失。
消息队列提供了消息传输保障,当客户端连接超时或事物回滚的等情况发生时,消息会重新发布给订阅者。redis没有该保障,导致的结果就是在订阅者断线超时或其他异常情况时,将会丢失所有发布者发布的信息
若订阅者订阅了频道,但自己读取消息的速度很慢的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃。


使用Jedis编码发布订阅功能

首先编写一个监听类PubSubListener,用于实现编码各种操作的业务逻辑

package com.example.springboot.listener;
import redis.clients.jedis.JedisPubSub;
public class PubSubListener extends JedisPubSub {
    // 取得订阅的消息后的处理
    public void onMessage(String channel, String message) {
        System.out.println("收到消息:" + channel + "=" + message);
    }
    // 初始化订阅时候的处理
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("初始订阅:" + channel + "=" + subscribedChannels);
    }
    // 取消订阅时候的处理
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println("取消订阅:" + channel + "=" + subscribedChannels);
    }
    // 初始化按表达式的方式订阅时候的处理
    public void onPSubscribe(String pattern, int subscribedChannels) {
        System.out.println("初始订阅P:" + pattern + "=" + subscribedChannels);
    }
    // 取消按表达式的方式订阅时候的处理
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        System.out.println("取消订阅P:" + pattern + "=" + subscribedChannels);
    }
    // 取得按表达式的方式订阅的消息后的处理
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println("收到消息P:" + pattern + "=" + channel + "=" + message);
    }
}


其次编码测试类,用户启动订阅,和测试发布消息

package com.example.springboot;
import com.example.springboot.listener.PubSubListener;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.Jedis;
public class RedisSubscribe {
    private static final String My_Blog = "Javacui";
    private static Jedis jedis;
    static {
        jedis = new Jedis("127.0.0.1",6379);
        jedis.auth("l52u27lv1Jur");
    }
    /**
     * 订阅给定的一个或多个频道的信息
     */
    @Test
    public void subscribe(){
        final PubSubListener listener = new PubSubListener();
        jedis.subscribe(listener, My_Blog);
    }
    /**
     * 将信息 message 发送到指定的频道 channel
     * 返回值:接收到信息 message 的订阅者数量
     */
    @Test
    public void publish(){
        jedis.publish(My_Blog, "java小强");
        System.out.println("客户端发布消息成功");
    }
}



集成到SpringBoot中进行编码实现

首先编码消息收到后的业务逻辑代码类

package com.example.springboot.listener;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
/**
 * Redis消息业务处理
 */
@Component
public class RedisMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String body = new String(message.getBody());
        String channel = new String(message.getChannel());
        System.out.println("接收消息:" + channel + " -> " + body);
    }
}


其次配置把监听类注入到RedisMessageListenerContainer中,这里可以编码多个监听

package com.example.springboot.config;
import com.example.springboot.listener.RedisMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/**
 * 配置Redis监听
 */
@Configuration
public class RedisMessageConfig {
    /**
     * 监听主题
     */
    private static final String My_Blog = "Javacui";
    /**
     * 注入消息监听容器
     */
    @Autowired
    private RedisMessageListener redisMessageListener;
    @Bean
    public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory){
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        //订阅主题
        redisMessageListenerContainer.addMessageListener(new MessageListenerAdapter(redisMessageListener), new PatternTopic(My_Blog));
        return redisMessageListenerContainer;
    }
}


编写一个Controller来模拟发送消息

package com.example.springboot.controller;
import cn.hutool.core.date.DateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/test")
public class MessSendController {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    private static final String My_Blog = "Javacui";
    @RequestMapping("/redis/send")
    public String sendRedisMess() {
        stringRedisTemplate.convertAndSend(My_Blog, "Java小强 " + DateUtil.now());
        System.out.println("客户端 消息发送完成");
        return DateUtil.now();
    }
}


END

推荐您阅读更多有关于“ 队列 redis 消息 发布订阅 publish ”的文章

上一篇:Redis中处理处理没有ACK确认的Stream 下一篇:使用Redis集合List实现消息队列

猜你喜欢

发表评论: