Spring-Kafka 提供消费重试的机制。当消息消费失败的时候,Spring-Kafka 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 重新消费消息 。
默认情况下,Spring-Kafka 达到配置的重试次数时,【每条消息的失败重试时间,由配置的时间隔决定】Consumer 如果依然消费失败 ,那么该消息就会进入到死信队列。
Spring-Kafka 封装了消费重试和死信队列, 将正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
我们在应用中可以对死信队列中的消息进行监控重发,来使得消费者实例再次进行消费,消费端需要做幂等性的处理。

引入POM依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.6</version> </dependency>
YML配置文件
这里Kafka也安装到了本地,Kafka安装Windows版:http://www.javacui.com/tool/667.html 。
# Web配置 server: servlet: context-path: / port: 1088 # kafka 配置 spring: kafka: bootstrap-servers: 127.0.0.1:9092 consumer: enable-auto-commit: true # 是否自动提交offset auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset) # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 auto-offset-reset: latest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # 表示接受反序列化任意的类,也可限定包路径 properties: spring: json: trusted: packages: '*' producer: retries: 0 # 重试次数 # 0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据; # 1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;(数据要求快,重要性不高时用) # -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,数据一般不会丢失,延迟时间长但是可靠性高。(数据重要时用) acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1,0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。) key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer listener: missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
配置ErrorHandler,用于定制重试次数和间隔时间
package com.example.springboot.config.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
/**
* Spring-Kafka 通过实现自定义的 SeekToCurrentErrorHandler ,当 Consumer 消费消息异常的时候,进行拦截处理:
* 重试小于最大次数时,重新投递该消息给 Consumer
* 重试到达最大次数时,如果Consumer 还是消费失败时,该消息就会发送到死信队列。 死信队列的 命名规则为: 原有 Topic + .DLT 后缀 = 其死信队列的 Topic
*/
@Configuration
public class KafkaConfiguration {
private Logger logger = LoggerFactory.getLogger(getClass());
@Bean
@Primary
public ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) {
logger.warn("kafkaErrorHandler begin to Handle");
// <1> 创建 DeadLetterPublishingRecoverer 对象
ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
// <2> 创建 FixedBackOff 对象 设置重试间隔 10秒 次数为 1 次
// 创建 DeadLetterPublishingRecoverer 对象,它负责实现,在重试到达最大次数时,Consumer 还是消费失败时,该消息就会发送到死信队列。
// 注意,正常发送 1 次,重试 1 次,等于一共 2 次
BackOff backOff = new FixedBackOff(10 * 1000L, 1L);
// <3> 创建 SeekToCurrentErrorHandler 对象
return new SeekToCurrentErrorHandler(recoverer, backOff);
}
}编写生产者代码
package com.example.springboot.controller;
import cn.hutool.core.date.DateUtil;
import com.example.springboot.model.Blog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@RestController
@RequestMapping("/test/kafka")
public class MessSendController {
@Autowired
private KafkaTemplate kafkaTemplate;
private static final String messTopic = "test";
@RequestMapping("/send")
public String sendMess() {
Blog blog = Blog.builder().id(1).name("测试").isDel(false).birthday(new Date()).build();
kafkaTemplate.send(messTopic, blog);
System.out.println("客户端 消息发送完成");
return DateUtil.now();
}
}这里请求路径:http://localhost:1088/test/kafka/send 时,会发送一条消息到队列。
编写消费端代码
package com.example.springboot.listener;
import com.alibaba.fastjson.JSON;
import com.example.springboot.model.Blog;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaMessListener {
private static final String messTopic = "test";
@KafkaListener(id="KafkaMessListener", topics = messTopic, groupId = "javagroup")
public void messListener(Blog blog) {
System.out.println("消费端 收到消息:" + JSON.toJSONString(blog));
// 模拟抛出一次一行
throw new RuntimeException("MOCK Handle Exception Happened");
}
@KafkaListener(id="KafkaMessListener.DLT", topics = messTopic + ".DLT", groupId = "javagroup.DLT")
public void messListenerDLT(Blog blog) {
System.out.println("死信队列消费端 收到消息:" + JSON.toJSONString(blog));
}
}在消费消息时候,抛出一个 RuntimeException 异常,模拟消费失败。
一个监听原始队列,一个监听死信队列,死信队列的Topic的规则是,业务Topic名字+.DLT。
我们请求下我们的Controller路径发送一条消息,然后查看控制台输出
2022-08-15 14:45:04.433 INFO 13976 --- [nio-1088-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.0.0
2022-08-15 14:45:04.433 INFO 13976 --- [nio-1088-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 8cb0a5e9d3441962
2022-08-15 14:45:04.434 INFO 13976 --- [nio-1088-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1660545904433
2022-08-15 14:45:04.440 INFO 13976 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: P4ATYaB0QtS5e-6rpsHbUQ
客户端 消息发送完成
2022-08-15 14:45:42.400 INFO 13976 --- [sListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-javagroup-1, groupId=javagroup] Successfully joined group with generation Generation{generationId=7, memberId='consumer-javagroup-1-ebfc0c1a-f644-4181-9e38-fca7550431c9', protocol='range'}
2022-08-15 14:45:42.403 INFO 13976 --- [sListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-javagroup-1, groupId=javagroup] Finished assignment for group at generation 7: {consumer-javagroup-1-ebfc0c1a-f644-4181-9e38-fca7550431c9=Assignment(partitions=[test-0])}
2022-08-15 14:45:42.416 INFO 13976 --- [tener.DLT-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-javagroup.DLT-2, groupId=javagroup.DLT] Successfully joined group with generation Generation{generationId=2, memberId='consumer-javagroup.DLT-2-d69e1eee-a261-4912-bd62-8eb2cad01c9d', protocol='range'}
2022-08-15 14:45:42.416 INFO 13976 --- [tener.DLT-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-javagroup.DLT-2, groupId=javagroup.DLT] Finished assignment for group at generation 2: {consumer-javagroup.DLT-2-d69e1eee-a261-4912-bd62-8eb2cad01c9d=Assignment(partitions=[test.DLT-0])}
2022-08-15 14:45:42.434 INFO 13976 --- [sListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-javagroup-1, groupId=javagroup] Successfully synced group in generation Generation{generationId=7, memberId='consumer-javagroup-1-ebfc0c1a-f644-4181-9e38-fca7550431c9', protocol='range'}
2022-08-15 14:45:42.434 INFO 13976 --- [tener.DLT-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-javagroup.DLT-2, groupId=javagroup.DLT] Successfully synced group in generation Generation{generationId=2, memberId='consumer-javagroup.DLT-2-d69e1eee-a261-4912-bd62-8eb2cad01c9d', protocol='range'}
2022-08-15 14:45:42.434 INFO 13976 --- [sListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-javagroup-1, groupId=javagroup] Notifying assignor about the new Assignment(partitions=[test-0])
2022-08-15 14:45:42.434 INFO 13976 --- [tener.DLT-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-javagroup.DLT-2, groupId=javagroup.DLT] Notifying assignor about the new Assignment(partitions=[test.DLT-0])
2022-08-15 14:45:42.437 INFO 13976 --- [tener.DLT-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-javagroup.DLT-2, groupId=javagroup.DLT] Adding newly assigned partitions: test.DLT-0
2022-08-15 14:45:42.437 INFO 13976 --- [sListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-javagroup-1, groupId=javagroup] Adding newly assigned partitions: test-0
2022-08-15 14:45:42.452 INFO 13976 --- [sListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-javagroup-1, groupId=javagroup] Setting offset for partition test-0 to the committed offset FetchPosition{offset=3, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[cuisuqiang:9092 (id: 0 rack: null)], epoch=0}}
2022-08-15 14:45:42.452 INFO 13976 --- [tener.DLT-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-javagroup.DLT-2, groupId=javagroup.DLT] Setting offset for partition test.DLT-0 to the committed offset FetchPosition{offset=2, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[cuisuqiang:9092 (id: 0 rack: null)], epoch=0}}
2022-08-15 14:45:42.465 INFO 13976 --- [sListener-0-C-1] o.s.k.l.KafkaMessageListenerContainer : javagroup: partitions assigned: [test-0]
2022-08-15 14:45:42.465 INFO 13976 --- [tener.DLT-0-C-1] o.s.k.l.KafkaMessageListenerContainer : javagroup.DLT: partitions assigned: [test.DLT-0]
消费端 收到消息:{"birthday":"2022-08-15 14:45:04","del":false,"id":1,"name":"测试"}
2022-08-15 14:45:52.624 INFO 13976 --- [sListener-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-javagroup-1, groupId=javagroup] Seeking to offset 3 for partition test-0
2022-08-15 14:45:52.628 ERROR 13976 --- [sListener-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.springboot.listener.KafkaMessListener.messListener(com.example.springboot.model.Blog)' threw exception; nested exception is java.lang.RuntimeException: MOCK Handle Exception Happened; nested exception is java.lang.RuntimeException: MOCK Handle Exception Happened
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:208) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.ErrorHandlerAdapter.handleRemaining(ErrorHandlerAdapter.java:141) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2682) [spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2563) [spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433) [spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311) [spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982) [spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366) [spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357) [spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) [spring-kafka-2.8.6.jar:2.8.6]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_51]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_51]
at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_51]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.springboot.listener.KafkaMessListener.messListener(com.example.springboot.model.Blog)' threw exception; nested exception is java.lang.RuntimeException: MOCK Handle Exception Happened; nested exception is java.lang.RuntimeException: MOCK Handle Exception Happened
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2695) [spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665) [spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625) [spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552) [spring-kafka-2.8.6.jar:2.8.6]
... 10 common frames omitted
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) [spring-kafka-2.8.6.jar:2.8.6]
Caused by: java.lang.RuntimeException: MOCK Handle Exception Happened
at com.example.springboot.listener.KafkaMessListener.messListener(KafkaMessListener.java:17) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_51]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_51]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_51]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_51]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.15.jar:5.3.15]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.15.jar:5.3.15]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.6.jar:2.8.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) [spring-kafka-2.8.6.jar:2.8.6]
... 12 common frames omitted
消费端 收到消息:{"birthday":"2022-08-15 14:45:04","del":false,"id":1,"name":"测试"}
死信队列消费端 收到消息:{"birthday":"2022-08-15 14:45:04","del":false,"id":1,"name":"测试"}重试消费后,进入死信队列。
END
Java小强
未曾清贫难成人,不经打击老天真。
自古英雄出炼狱,从来富贵入凡尘。
发表评论: