如何寫一個Skywalking trace外掛

javaagent 原理

美團技術團隊-Java 動態除錯技術原理及實踐

類圖

實現

ConsumeMessageConcurrentlyInstrumentation

  1. public class ConsumeMessageConcurrentlyInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
  2. // 需要增強的類
  3. private static final String ENHANCE_CLASS = "com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently";
  4. // 需要增強的方法
  5. private static final String CONSUMER_MESSAGE_METHOD = "consumeMessage";
  6. // 增加的方法對應的攔截器
  7. private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.ons.v1.MessageConcurrentlyConsumeInterceptor";
  8. // 構造器不需要攔截
  9. @Override
  10. public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
  11. return new ConstructorInterceptPoint[0];
  12. }
  13. @Override
  14. public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
  15. return new InstanceMethodsInterceptPoint[] {
  16. // 新增一個攔截器
  17. new InstanceMethodsInterceptPoint() {
  18. @Override
  19. public ElementMatcher<MethodDescription> getMethodsMatcher() {
  20. // 方法匹配
  21. return named(CONSUMER_MESSAGE_METHOD);
  22. }
  23. @Override
  24. public String getMethodsInterceptor() {
  25. return INTERCEPTOR_CLASS;
  26. }
  27. @Override
  28. public boolean isOverrideArgs() {
  29. return false;
  30. }
  31. }
  32. };
  33. }
  34. @Override
  35. protected ClassMatch enhanceClass() {
  36. // 需要增強的類
  37. return HierarchyMatch.byHierarchyMatch(new String[] {ENHANCE_CLASS});
  38. }
  39. }

AbstractMessageConsumeInterceptor

  1. public abstract class AbstractMessageConsumeInterceptor implements InstanceMethodsAroundInterceptor {
  2. public static final String CONSUMER_OPERATION_NAME_PREFIX = "OnsRocketMQ/";
  3. // 在方法前增強
  4. @Override
  5. public final void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
  6. Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
  7. // 拿到方法引數,轉換成訊息列表
  8. List<MessageExt> msgs = (List<MessageExt>) allArguments[0];
  9. // 從訊息中中獲取TraceId等Context資訊
  10. ContextCarrier contextCarrier = getContextCarrierFromMessage(msgs.get(0));
  11. // 建立一個entry span
  12. AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + msgs.get(0)
  13. .getTopic() + "/Consumer", contextCarrier);
  14. span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
  15. SpanLayer.asMQ(span);
  16. for (int i = 1; i < msgs.size(); i++) {
  17. ContextManager.extract(getContextCarrierFromMessage(msgs.get(i)));
  18. }
  19. }
  20. // 異常處理
  21. @Override
  22. public final void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
  23. Class<?>[] argumentsTypes, Throwable t) {
  24. ContextManager.activeSpan().log(t);
  25. }
  26. private ContextCarrier getContextCarrierFromMessage(MessageExt message) {
  27. ContextCarrier contextCarrier = new ContextCarrier();
  28. CarrierItem next = contextCarrier.items();
  29. while (next.hasNext()) {
  30. next = next.next();
  31. next.setHeadValue(message.getUserProperty(next.getHeadKey()));
  32. }
  33. return contextCarrier;
  34. }
  35. }

MessageConcurrentlyConsumeInterceptor

  1. public class MessageConcurrentlyConsumeInterceptor extends AbstractMessageConsumeInterceptor {
  2. // 在方法後處理
  3. @Override
  4. public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
  5. Object ret) throws Throwable {
  6. // 獲取消費狀態
  7. ConsumeConcurrentlyStatus status = (ConsumeConcurrentlyStatus) ret;
  8. if (status == ConsumeConcurrentlyStatus.RECONSUME_LATER) {
  9. // 消費狀態為重試,則設定span出現錯誤
  10. AbstractSpan activeSpan = ContextManager.activeSpan();
  11. activeSpan.errorOccurred();
  12. Tags.MQ_STATUS.set(activeSpan, status.name());
  13. }
  14. // 停止span
  15. ContextManager.stopSpan();
  16. return ret;
  17. }
  18. }

專案:apm-ons-1.x-plugin

參考文件

  1. apm-ons-1.x-plugin
  2. 美團技術團隊-Java 動態除錯技術原理及實踐

分享並記錄所學所見