做网站原价商品打个横线,做网站的时候网站的第一个字母怎么在网站标题前面显示 比如谷歌g一样,html设计软件,濮阳的网站建设如何为启用重试和死信发布的消费者的 Spring Kafka 实现编写集成测试。
Kafka 非阻塞重试
Kafka 中的非阻塞重试是通过为主主题配置重试主题来完成的。如果需要#xff0c;还可以配置其他死信主题。如果所有重试均已用尽#xff0c;事件将转发至 DLT。公共领域提供了大量资… 如何为启用重试和死信发布的消费者的 Spring Kafka 实现编写集成测试。
Kafka 非阻塞重试
Kafka 中的非阻塞重试是通过为主主题配置重试主题来完成的。如果需要还可以配置其他死信主题。如果所有重试均已用尽事件将转发至 DLT。公共领域提供了大量资源来了解技术细节。
要测试什么
在代码中为重试机制编写集成测试时这可能是一项具有挑战性的工作。
如何测试该事件是否已重试所需的次数 如何测试仅在发生某些异常时才执行重试而对于其他异常则不执行重试如果上次重试中异常已解决如何测试是否未进行另一次重试在n-1次重试尝试失败后如何测试重试中的第n次尝试是否成功当所有重试尝试都用完后如何测试事件是否已发送到死信队列
让我们看一些代码。您可以找到很多很好的文章展示如何使用 Spring Kafka 设置非阻塞重试。下面给出了一种这样的实现。这是使用Spring-Kafka 的RetryableTopic和DltHandler 注释来完成的。
设置可重试消费者
Slf4j
Component
RequiredArgsConstructor
public class CustomEventConsumer {private final CustomEventHandler handler;RetryableTopic(attempts ${retry.attempts},backoff Backoff(delayExpression ${retry.delay},multiplierExpression ${retry.delay.multiplier}),topicSuffixingStrategy TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,dltStrategy FAIL_ON_ERROR,autoStartDltHandler true,autoCreateTopics false,include {CustomRetryableException.class})KafkaListener(topics ${topic}, id ${default-consumer-group:default})public void consume(CustomEvent event, Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {try {log.info(Received event on topic {}, topic);handler.handleEvent(event);} catch (Exception e) {log.error(Error occurred while processing event, e);throw e;}}DltHandlerpublic void listenOnDlt(Payload CustomEvent event) {log.error(Received event on dlt.);handler.handleEventFromDlt(event);}}
如果您注意到上面的代码片段include参数包含CustomRetryableException.class. 这告诉使用者仅在该方法抛出 CustomRetryableException 时才重试CustomEventHandler#handleEvent。您可以根据需要添加任意数量。还有一个排除参数但一次可以使用其中任何一个参数。
${retry.attempts}在发布到 DLT 之前事件处理应重试最多次数。
设置测试基础设施
要编写集成测试您需要确保拥有一个正常运行的 Kafka 代理首选嵌入式和一个功能齐全的发布者。让我们设置我们的基础设施
EnableKafka
SpringBootTest
DirtiesContext(classMode DirtiesContext.ClassMode.AFTER_CLASS)
EmbeddedKafka(partitions 1,brokerProperties {listeners ${kafka.broker.listeners}, port ${kafka.broker.port}},controlledShutdown true,topics {test, test-retry-0, test-retry-1, test-dlt}
)
ActiveProfiles(test)
class DocumentEventConsumerIntegrationTest {Autowiredprivate KafkaTemplateString, CustomEvent testKafkaTemplate;// tests}
** 配置是从 application-test.yml 文件导入的。
使用嵌入式 kafka 代理时重要的是要提及要创建的主题。它们不会自动创建。在本例中我们创建四个主题即
test, test-retry-0, test-retry-1, test-dlt
我们已将最大重试尝试次数设置为 3 次。每个主题对应于每次重试尝试。因此如果 3 次重试都用尽则应将事件转发到 DLT。
测试用例
如果第一次尝试消费成功则不应重试。
CustomEventHandler#handleEvent这可以通过该方法仅被调用一次的事实来测试。还可以添加对 Log 语句的进一步测试。 Testvoid test_should_not_retry_if_consumption_is_successful() throws ExecutionException, InterruptedException {CustomEvent event new CustomEvent(Hello);// GIVENdoNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));// WHENtestKafkaTemplate.send(test, event).get();// THENverify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));}
如果引发不可重试的异常则不应重试。
在这种情况下该CustomEventHandler#handleEvent方法应该只调用一次 Testvoid test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {CustomEvent event new CustomEvent(Hello);// GIVENdoThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));// WHENtestKafkaTemplate.send(test, event).get();// THENverify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));}
如果抛出 a则重试配置的最大次数RetryableException并在重试用完后将其发布到死信主题。
在这种情况下该CustomEventHandler#handleEvent方法应被调用三次maxRetries次并且CustomEventHandler#handleEventFromDlt该方法应被调用一次。 Testvoid test_should_retry_maximum_times_and_publish_to_dlt_if_retryable_exception_raised() throws ExecutionException, InterruptedException {CustomEvent event new CustomEvent(Hello);// GIVENdoThrow(CustomRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));// WHENtestKafkaTemplate.send(test, event).get();// THENverify(customEventHandler, timeout(10000).times(maxRetries)).handleEvent(any(CustomEvent.class));verify(customEventHandler, timeout(2000).times(1)).handleEventFromDlt(any(CustomEvent.class));}
**在验证阶段添加了相当大的超时以便在测试完成之前可以考虑指数退避延迟。这很重要如果设置不当可能会导致断言失败。
应该重试直到RetryableException解决并且如果引发不可重试的异常或消费最终成功则不应继续重试。
测试已设置为RetryableException先抛出 a 然后再抛出 a NonRetryable exception以便重试一次。 Testvoid test_should_retry_until_retryable_exception_is_resolved_by_non_retryable_exception() throws ExecutionException,InterruptedException {CustomEvent event new CustomEvent(Hello);// GIVENdoThrow(CustomRetryableException.class).doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));// WHENtestKafkaTemplate.send(test, event).get();// THENverify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));}Testvoid test_should_retry_until_retryable_exception_is_resolved_by_successful_consumption() throws ExecutionException,InterruptedException {CustomEvent event new CustomEvent(Hello);// GIVENdoThrow(CustomRetryableException.class).doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));// WHENtestKafkaTemplate.send(test, event).get();// THENverify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));}
结论
因此您可以看到集成测试是策略、超时、延迟和验证的混合和匹配以确保 Kafka 事件驱动架构的重试机制万无一失。