下载的网站模板如何安装,学网站开发容易吗,pexels免费素材网站,wordpress 分页 未找到页面背景
Skywalking默认场景下#xff0c;Tracing对于消息队列的发送场景#xff0c;无法将TraceId传递到下游消费者#xff0c;但对于微服务场景下#xff0c;是有大量消息队列的业务场景的#xff0c;这显然无法满足业务预期。
解决方案
Skywalking的官方社区中#xf…背景
Skywalking默认场景下Tracing对于消息队列的发送场景无法将TraceId传递到下游消费者但对于微服务场景下是有大量消息队列的业务场景的这显然无法满足业务预期。
解决方案
Skywalking的官方社区中有用户提出了该场景问题Skywalking在补充工具包中提供了对Kafka的tracing支持。 代码实现
dependencygroupIdorg.apache.skywalking/groupIdartifactIdapm-toolkit-kafka/artifactIdversion${skywalking.version}/version/dependency对于该工具包默认情况下是针对KafkaTemplate进行trace即如果使用KafkaTemplate发送消息代码层面无需做任何改动。
如果没有使用KafkaTemplate的场景toolkit也提供的了注解的支持
public class ConsumerThread2 extends Thread {Overridepublic void run() {Properties consumerProperties new Properties();//...consumerProperties.put()KafkaConsumerString, String consumer new KafkaConsumer(consumerProperties);consumer.subscribe(topicPattern, new NoOpConsumerRebalanceListener());while (true) {if (pollAndInvoke(consumer)) break;}consumer.close();}KafkaPollAndInvokeprivate boolean pollAndInvoke(KafkaConsumerString, String consumer) {try {Thread.sleep(1000);} catch (InterruptedException e) {}ConsumerRecordsString, String records consumer.poll(100);if (!records.isEmpty()) {OkHttpClient client new OkHttpClient.Builder().build();Request request new Request.Builder().url(http://localhost:8080/kafka-scenario/case/kafka-thread2-ping).build();Response response null;try {response client.newCall(request).execute();} catch (IOException e) {}response.body().close();return true;}return false;}
}异步线程Tracing
对于Kafka消息的发送经常会配合异步线程池的场景使用Tracing的基本原理是基于ThreadLocal进行实现的那么对于异步场景是会丢失TraceId通常的解决方式是需要手动将主线程的TraceId手动赋值给子线程但这种方式需要手动代码侵入并不友好。
幸运的是Skywalking的toolkit中提供了对于异步线程tracing的支持。
dependencygroupIdorg.apache.skywalking/groupIdartifactIdapm-toolkit-trace/artifactIdversion${skywalking.version}/version
/dependency推荐用法
ExecutorService executorService Executors.newFixedThreadPool(1);
executorService.execute(RunnableWrapper.of(new Runnable() {Override public void run() {//your code}
}));或者 TraceCrossThreadpublic static class MyCallableString implements CallableString {Overridepublic String call() throws Exception {return null;}}
...ExecutorService executorService Executors.newFixedThreadPool(1);executorService.submit(new MyCallable());PS事实上RunnableWrapper也是基于TraceCrossThread实现。
相关文档 https://skywalking.apache.org/docs/skywalking-java/v8.16.0/en/setup/service-agent/java-agent/application-toolkit-kafka/
https://skyapm.github.io/document-cn-translation-of-skywalking/zh/6.1.0/setup/service-agent/java-agent/Application-toolkit-trace-cross-thread.html
https://blog.51cto.com/knifeedge/5268667
https://blog.csdn.net/lijunwyf/article/details/107954543