1. 程式人生 > >Java分散式跟蹤系統Zipkin(二):Brave原始碼分析-Tracer和Span

Java分散式跟蹤系統Zipkin(二):Brave原始碼分析-Tracer和Span

Brave是Java版的Zipkin客戶端,它將收集的跟蹤資訊,以Span的形式上報給Zipkin系統。

(Zipkin是基於Google的一篇論文,名為Dapper,Dapper在荷蘭語裡是“勇敢的”的意思,這也是Brave的命名的原因)

我們一般不會手動編寫Trace相關的程式碼,Brave提供了一些開箱即用的庫,來幫助我們對某些特定的庫類來進行追蹤,比如servlet,springmvc,mysql,okhttp3,httpclient等,這些都可以在下面頁面中找到:

我們先來看看一個簡單的Demo來演示下Brave的基本使用,這對我們後續分析Brave的原理和其他類庫的使用有很大幫助

TraceDemo

package tracing;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.context.log4j2.ThreadContextCurrentTraceContext;
import brave.propagation.B3Propagation;
import brave.propagation.ExtraFieldPropagation;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.reporter.AsyncReporter;
import
zipkin2.reporter.Sender; import zipkin2.reporter.okhttp3.OkHttpSender; import java.util.concurrent.TimeUnit; public class TraceDemo { public static void main(String[] args) { Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans"); AsyncReporter asyncReporter = AsyncReporter.builder(sender) .closeTimeout(500
, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); Tracing tracing = Tracing.newBuilder() .localServiceName("tracer-demo") .spanReporter(asyncReporter) .propagationFactory(ExtraFieldPropagation.newFactory(B3Propagation.FACTORY, "user-name")) .currentTraceContext(ThreadContextCurrentTraceContext.create()) .build(); Tracer tracer = tracing.tracer(); Span span = tracer.newTrace().name("encode").start(); try { doSomethingExpensive(); } finally { span.finish(); } Span twoPhase = tracer.newTrace().name("twoPhase").start(); try { Span prepare = tracer.newChild(twoPhase.context()).name("prepare").start(); try { prepare(); } finally { prepare.finish(); } Span commit = tracer.newChild(twoPhase.context()).name("commit").start(); try { commit(); } finally { commit.finish(); } } finally { twoPhase.finish(); } sleep(1000); } private static void doSomethingExpensive() { sleep(500); } private static void commit() { sleep(500); } private static void prepare() { sleep(500); } private static void sleep(long milliseconds) { try { TimeUnit.MILLISECONDS.sleep(milliseconds); } catch (InterruptedException e) { e.printStackTrace(); } } }

啟動Zipkin,然後執行TraceDemo,在Zipkin的UI介面中能查到兩條跟蹤資訊

點選第一條跟蹤資訊,可以看到有一條Span(encode),耗時500ms左右
encode跟蹤資訊

本條跟蹤資訊對應的程式碼片段為:

Tracer tracer = tracing.tracer();
Span span = tracer.newTrace().name("encode").start();
try {
    doSomethingExpensive();
} finally {
    span.finish();
}

由Tracer建立一個新的Span,名為encode,然後呼叫start方法開始計時,之後執行一個比較耗時的方法doSomethingExpensive,最後呼叫finish方法結束計時,完成並記錄一條跟蹤資訊。

這段程式碼實際上向Zipkin上報的資料為:

[
  {
    "traceId": "16661f6cb5d58903",
    "id": "16661f6cb5d58903",
    "name": "encode",
    "timestamp": 1510043590522358,
    "duration": 499867,
    "binaryAnnotations": [
      {
        "key": "lc",
        "value": "",
        "endpoint": {
          "serviceName": "tracer-demo",
          "ipv4": "192.168.99.1"
        }
      }
    ]
  }
]

然後我們再來看第二條稍微複雜的跟蹤資訊,可以看到一條名為twoPhase的Span,總耗時為1000ms,它有2個子Span,分別名為prepare和commit,兩者分別耗時500ms

twoPhase跟蹤資訊

這條跟蹤資訊對應的程式碼片段為

Span twoPhase = tracer.newTrace().name("twoPhase").start();
try {
    Span prepare = tracer.newChild(twoPhase.context()).name("prepare").start();
    try {
    prepare();
    } finally {
    prepare.finish();
    }
    Span commit = tracer.newChild(twoPhase.context()).name("commit").start();
    try {
    commit();
    } finally {
    commit.finish();
    }
} finally {
    twoPhase.finish();
}

這段程式碼實際上向Zipkin上報的資料為:

[
  {
    "traceId": "89e051d5394b90b1",
    "id": "89e051d5394b90b1",
    "name": "twophase",
    "timestamp": 1510043591038983,
    "duration": 1000356,
    "binaryAnnotations": [
      {
        "key": "lc",
        "value": "",
        "endpoint": {
          "serviceName": "tracer-demo",
          "ipv4": "192.168.99.1"
        }
      }
    ]
  },
  {
    "traceId": "89e051d5394b90b1",
    "id": "60568c4903793b8d",
    "name": "prepare",
    "parentId": "89e051d5394b90b1",
    "timestamp": 1510043591039919,
    "duration": 499246,
    "binaryAnnotations": [
      {
        "key": "lc",
        "value": "",
        "endpoint": {
          "serviceName": "tracer-demo",
          "ipv4": "192.168.99.1"
        }
      }
    ]
  },
  {
    "traceId": "89e051d5394b90b1",
    "id": "ce14448169d01d2f",
    "name": "commit",
    "parentId": "89e051d5394b90b1",
    "timestamp": 1510043591539304,
    "duration": 499943,
    "binaryAnnotations": [
      {
        "key": "lc",
        "value": "",
        "endpoint": {
          "serviceName": "tracer-demo",
          "ipv4": "192.168.99.1"
        }
      }
    ]
  }
]

Span

首先看下Span的實現類RealSpan

該類依賴幾個核心類

Recorder,用於記錄Span

Reporter,用於上報Span給Zipkin

MutableSpan,Span的包裝類,提供各種API操作Span

MutableSpanMap,以TraceContext為Key,MutableSpan為Value的Map結構,用於記憶體中存放所有的Span

RealSpan兩個核心方法start, finish

public Span start(long timestamp) {
  recorder().start(context(), timestamp);
  return this;
}

public void finish(long timestamp) {
  recorder().finish(context(), timestamp);
}

分別呼叫Recorder的start和finish方法,獲取跟TraceContext繫結的Span資訊,記錄開始時間和結束時間,並在結束時,呼叫reporter的report方法,上報給Zipkin

public void start(TraceContext context, long timestamp) {
  if (noop.get()) return;
  spanMap.getOrCreate(context).start(timestamp);
} 

public void finish(TraceContext context, long finishTimestamp) {
  MutableSpan span = spanMap.remove(context);
  if (span == null || noop.get()) return;
  synchronized (span) {
    span.finish(finishTimestamp);
    reporter.report(span.toSpan());
  }
}

BoundedAsyncReporter

Reporter的實現類AsyncReporter,而AsyncReporter的實現類是BoundedAsyncReporter

static final class BoundedAsyncReporter<S> extends AsyncReporter<S> {
    static final Logger logger = Logger.getLogger(BoundedAsyncReporter.class.getName());
    final AtomicBoolean closed = new AtomicBoolean(false);
    final BytesEncoder<S> encoder;
    final ByteBoundedQueue pending;
    final Sender sender;
    final int messageMaxBytes;
    final long messageTimeoutNanos;
    final long closeTimeoutNanos;
    final CountDownLatch close;
    final ReporterMetrics metrics;

    BoundedAsyncReporter(Builder builder, BytesEncoder<S> encoder) {
      this.pending = new ByteBoundedQueue(builder.queuedMaxSpans, builder.queuedMaxBytes);
      this.sender = builder.sender;
      this.messageMaxBytes = builder.messageMaxBytes;
      this.messageTimeoutNanos = builder.messageTimeoutNanos;
      this.closeTimeoutNanos = builder.closeTimeoutNanos;
      this.close = new CountDownLatch(builder.messageTimeoutNanos > 0 ? 1 : 0);
      this.metrics = builder.metrics;
      this.encoder = encoder;
    }
}

BoundedAsyncReporter中的幾個重要的類:
- BytesEncoder - Span的編碼器,將Span編碼成二進位制,便於sender傳送給Zipkin
- ByteBoundedQueue - 類似於BlockingQueue,是一個既有數量限制,又有位元組數限制的阻塞佇列
- Sender - 將編碼後的二進位制資料,傳送給Zipkin
- ReporterMetrics - Span的report相關的統計資訊
- BufferNextMessage - Consumer,Span資訊的消費者,依靠Sender上報Span資訊

    public <S> AsyncReporter<S> build(BytesEncoder<S> encoder) {
      if (encoder == null) throw new NullPointerException("encoder == null");

      if (encoder.encoding() != sender.encoding()) {
        throw new IllegalArgumentException(String.format(
            "Encoder doesn't match Sender: %s %s", encoder.encoding(), sender.encoding()));
      }

      final BoundedAsyncReporter<S> result = new BoundedAsyncReporter<>(this, encoder);

      if (messageTimeoutNanos > 0) { // Start a thread that flushes the queue in a loop.
        final BufferNextMessage consumer =
            new BufferNextMessage(sender, messageMaxBytes, messageTimeoutNanos);
        final Thread flushThread = new Thread(() -> {
          try {
            while (!result.closed.get()) {
              result.flush(consumer);
            }
          } finally {
            for (byte[] next : consumer.drain()) result.pending.offer(next);
            result.close.countDown();
          }
        }, "AsyncReporter(" + sender + ")");
        flushThread.setDaemon(true);
        flushThread.start();
      }
      return result;
    }

當messageTimeoutNanos大於0時,啟動一個守護執行緒flushThread,一直迴圈呼叫BoundedAsyncReporter的flush方法,將記憶體中的Span資訊上報給Zipkin
而當messageTimeoutNanos等於0時,客戶端需要手動呼叫flush方法來上報Span資訊

再來看下BoundedAsyncReporter中的close方法

    @Override public void close() {
      if (!closed.compareAndSet(false, true)) return; // already closed
      try {
        // wait for in-flight spans to send
        if (!close.await(closeTimeoutNanos, TimeUnit.NANOSECONDS)) {
          logger.warning("Timed out waiting for in-flight spans to send");
        }
      } catch (InterruptedException e) {
        logger.warning("Interrupted waiting for in-flight spans to send");
        Thread.currentThread().interrupt();
      }
      int count = pending.clear();
      if (count > 0) {
        metrics.incrementSpansDropped(count);
        logger.warning("Dropped " + count + " spans due to AsyncReporter.close()");
      }
    }

這個close方法和FlushThread中while迴圈相呼應,在close方法中,首先將closed變數置為true,然後呼叫close.await(),等待close訊號量(CountDownLatch)的釋放,此處程式碼會阻塞,一直到FlushThread中finally中呼叫result.close.countDown();
而在close方法中將closed變數置為true後,FlushThread中的while迴圈將結束執行,然後執行finally程式碼塊,系統會將記憶體中還未上報的Span,新增到queue(result.pending)中,然後呼叫result.close.countDown(); close方法中阻塞的程式碼會繼續執行,將呼叫metrics.incrementSpansDropped(count)將這些Span的數量新增到metrics統計資訊中

@Override public void report(S span) {
  if (span == null) throw new NullPointerException("span == null");

  metrics.incrementSpans(1);
  byte[] next = encoder.encode(span);
  int messageSizeOfNextSpan = sender.messageSizeInBytes(Collections.singletonList(next));
  metrics.incrementSpanBytes(next.length);
  if (closed.get() ||
      // don't enqueue something larger than we can drain
      messageSizeOfNextSpan > messageMaxBytes ||
      !pending.offer(next)) {
    metrics.incrementSpansDropped(1);
  }
}

前面看到在Recorder的finish方法中,會呼叫Reporter的report方法,此處report方法,將span轉化成位元組陣列,然後計算出messageSize,新增到queue(pending)中,並記錄相應的統計資訊

接下來看看兩個flush方法,其中flush()方法,是public的,供外部手動呼叫,而flush(BufferNextMessage bundler)是在FlushThread中迴圈呼叫

@Override public final void flush() {
  flush(new BufferNextMessage(sender, messageMaxBytes, 0));
}

void flush(BufferNextMessage bundler) {
  if (closed.get()) throw new IllegalStateException("closed");

  //將佇列中的資料,全部提取到BufferNextMessage中,直到buffer(bundler)滿為止
  pending.drainTo(bundler, bundler.remainingNanos());

  // record after flushing reduces the amount of gauge events vs on doing this on report
  metrics.updateQueuedSpans(pending.count);
  metrics.updateQueuedBytes(pending.sizeInBytes);

  // loop around if we are running, and the bundle isn't full
  // if we are closed, try to send what's pending
  if (!bundler.isReady() && !closed.get()) return;

  // Signal that we are about to send a message of a known size in bytes
  metrics.incrementMessages();
  metrics.incrementMessageBytes(bundler.sizeInBytes());
  List<byte[]> nextMessage = bundler.drain();

  try {
    sender.sendSpans(nextMessage).execute();
  } catch (IOException | RuntimeException | Error t) {
    // In failure case, we increment messages and spans dropped.
    int count = nextMessage.size();
    Call.propagateIfFatal(t);
    metrics.incrementMessagesDropped(t);
    metrics.incrementSpansDropped(count);
    if (logger.isLoggable(FINE)) {
      logger.log(FINE,
          format("Dropped %s spans due to %s(%s)", count, t.getClass().getSimpleName(),
              t.getMessage() == null ? "" : t.getMessage()), t);
    }
    // Raise in case the sender was closed out-of-band.
    if (t instanceof IllegalStateException) throw (IllegalStateException) t;
  }
}

flush中大致分下面幾步
1. 先將佇列pending中的資料,全部提取到BufferNextMessage(bundler)中,直到bundler滿為止
2. 當bundler準備好,即isReady()返回true,將bundler中的message全部取出來
3. 將取出來的所有message,呼叫Sender的sendSpans方法,傳送到Zipkin

ByteBoundedQueue

類似於BlockingQueue,是一個既有數量限制,又有位元組數限制的阻塞佇列,提供了offer,drainTo,clear三個方法,供呼叫者向queue裡存放,提取和清空資料

final class ByteBoundedQueue {

  final ReentrantLock lock = new ReentrantLock(false);
  final Condition available = lock.newCondition();

  final int maxSize;
  final int maxBytes;

  final byte[][] elements;
  int count;
  int sizeInBytes;
  int writePos;
  int readPos;

  ByteBoundedQueue(int maxSize, int maxBytes) {
    this.elements = new byte[maxSize][];
    this.maxSize = maxSize;
    this.maxBytes = maxBytes;
  }
}

ByteBoundedQueue接受兩個int引數,maxSize是queue接受的最大數量,maxBytes是queue接受的最大位元組數
ByteBoundedQueue中使用一個二維byte陣列elements來儲存message,並使用writePos和readPos兩個遊標,分別記錄寫和讀的位置
ByteBoundedQueue中使用了最典型的可重入鎖ReentrantLock,使offer,drainTo,clear等方法是執行緒安全的

/**
 * Returns true if the element could be added or false if it could not due to its size.
 */
boolean offer(byte[] next) {
  lock.lock();
  try {
    if (count == elements.length) return false;
    if (sizeInBytes + next.length > maxBytes) return false;

    elements[writePos++] = next;

    if (writePos == elements.length) writePos = 0; // circle back to the front of the array

    count++;
    sizeInBytes += next.length;

    available.signal(); // alert any drainers
    return true;
  } finally {
    lock.unlock();
  }
}

offer方法是新增message到queue中,使用了標準的try-lock結構,即先獲取鎖,然後finally裡釋放鎖,在獲取鎖以後
當count等於elements.length時,意味著queue是滿的,則不能繼續新增
當sizeInBytes + next.length > maxBytes時,意味著該訊息加進佇列會超出佇列位元組大小限制,也不能新增新message
如果上面兩個條件都不滿足,則表明可以繼續新增message,將writePos+1,並將message放於writePos+1處
當writePos到達陣列尾部,則將writePos置為0,讓下一次新增從陣列頭部開始
然後將count計數器加1,並更新位元組總數
最後呼叫available.signal()來通知其他在lock上等待的執行緒(在drainTo方法中阻塞的執行緒)繼續競爭執行緒資源

/** Blocks for up to nanosTimeout for elements to appear. Then, consume as many as possible. */
int drainTo(Consumer consumer, long nanosTimeout) {
  try {
    // This may be called by multiple threads. If one is holding a lock, another is waiting. We
    // use lockInterruptibly to ensure the one waiting can be interrupted.
    lock.lockInterruptibly();
    try {
      long nanosLeft = nanosTimeout;
      while (count == 0) {
        if (nanosLeft <= 0) return 0;
        nanosLeft = available.awaitNanos(nanosLeft);
      }
      return doDrain(consumer);
    } finally {
      lock.unlock();
    }
  } catch (InterruptedException e) {
    return 0;
  }
}

drainTo方法是提取message到Consumer中消費,如果當時queue裡沒有訊息,則每次等待nanosTimeout,直到queue裡存入訊息為止
當while迴圈退出,表明queue中已經有新的message新增進來,可以消費,則呼叫doDrain方法。

int doDrain(Consumer consumer) {
  int drainedCount = 0;
  int drainedSizeInBytes = 0;
  while (drainedCount < count) {
    byte[] next = elements[readPos];

    if (next == null) break;
    if (consumer.accept(next)) {
      drainedCount++;
      drainedSizeInBytes += next.length;

      elements[readPos] = null;
      if (++readPos == elements.length) readPos = 0; // circle back to the front of the array
    } else {
      break;
    }
  }
  count -= drainedCount;
  sizeInBytes -= drainedSizeInBytes;
  return drainedCount;
}

doDrain裡依然是一個while迴圈,當drainedCount小於count,即提取的message數量總數小於queue裡訊息總數時,嘗試呼叫consumer.accept方法
如果accept方法返回true,則將drainedCount加1,並且drainedSizeInBytes加上當前訊息的位元組數
如果accept方法返回false,則跳出迴圈,將queue的count減掉提取的總訊息數drainedCount,sizeInBytes減去提取的總位元組數drainedSizeInBytes

int clear() {
  lock.lock();
  try {
    int result = count;
    count = sizeInBytes = readPos = writePos = 0;
    Arrays.fill(elements, null);
    return result;
  } finally {
    lock.unlock();
  }
}

clear方法,清空佇列,這個方法比較簡單,就是將所有東西清零,該方法在Reporter的close方法中會被使用

BufferNextMessage

BufferNextMessage是ByteBoundedQueue.Consumer的預設實現

final class BufferNextMessage implements ByteBoundedQueue.Consumer {
  private final Sender sender;
  private final int maxBytes;
  private final long timeoutNanos;
  private final List<byte[]> buffer = new LinkedList<>();

  long deadlineNanoTime;
  int sizeInBytes;
  boolean bufferFull;

  BufferNextMessage(Sender sender, int maxBytes, long timeoutNanos) {
    this.sender = sender;
    this.maxBytes = maxBytes;
    this.timeoutNanos = timeoutNanos;
  }
}

BufferNextMessage中使用一個LinkedList來儲存接收的messages

  @Override
  public boolean accept(byte[] next) {
    buffer.add(next); // speculatively add to the buffer so we can size it
    int x = sender.messageSizeInBytes(buffer);
    int y = maxBytes;
    int includingNextVsMaxBytes = (x < y) ? -1 : ((x == y) ? 0 : 1);

    // If we can fit queued spans and the next into one message...
    if (includingNextVsMaxBytes <= 0) {
      sizeInBytes = x;

      if (includingNextVsMaxBytes == 0) {
        bufferFull = true;
      }
      return true;
    } else {
      buffer.remove(buffer.size() - 1);
      return false; // we couldn't fit the next message into this buffer
    }
  }

accept方法,先將message放入buffer,然後呼叫sender的messageSizeInBytes方法統計下所有buffer訊息的總位元組數includingNextVsMaxBytes
當includingNextVsMaxBytes大於該buffer的最大位元組數maxBytes,則將加入到buffer的message移除
當includingNextVsMaxBytes等於該buffer的最大位元組數maxBytes,則將該buffer標記為已滿狀態,即bufferFull = true

  long remainingNanos() {
    if (buffer.isEmpty()) {
      deadlineNanoTime = System.nanoTime() + timeoutNanos;
    }
    return Math.max(deadlineNanoTime - System.nanoTime(), 0);
  }

  boolean isReady() {
    return bufferFull || remainingNanos() <= 0;
  }

remainingNanos方法中,當buffer為空,則重置一個deadlineNanoTime,其值為當前系統時間加上timeoutNanos,當系統時間超過這個時間或者buffer滿了的時候, isReady會返回true,即buffer為準備就緒狀態

  List<byte[]> drain() {
    if (buffer.isEmpty()) return Collections.emptyList();
    ArrayList<byte[]> result = new ArrayList<>(buffer);
    buffer.clear();
    sizeInBytes = 0;
    bufferFull = false;
    deadlineNanoTime = 0;
    return result;
  }

drain方法返回buffer裡的所有資料,並將buffer清空

isReady方法和drain方法,在BoundedAsyncReporter的flush方法中會被使用

void flush(BufferNextMessage bundler) {
    // ...
    if (!bundler.isReady() && !closed.get()) return;
    // ...
    List<byte[]> nextMessage = bundler.drain();
    // ...
    sender.sendSpans(nextMessage).execute();
}

因為flush是會一直不間斷被呼叫,而這裡先呼叫bundler.isReady()方法,當返回true後才取出所有堆積的訊息,一起打包傳送給zipkin提高效率

再回過頭來看看BoundedAsyncReporter裡手動flush方法

@Override public final void flush() {
  flush(new BufferNextMessage(sender, messageMaxBytes, 0));
}

在我們分析完BufferNextMessage原始碼後,我們很容易得出結論:這裡構造BufferNextMessage傳入的timeoutNanos為0,所以BufferNextMessage的isReady()方法會永遠返回true。
這意味著每次我們手動呼叫flush方法,會立即將queue的資料用BufferNextMessage填滿,並打包傳送給Zipkin,至於queue裡剩下的資料,需要等到下次FlushThread迴圈執行flush方法的時候被髮送

至此,我們已經分析過Tracer和Span相關的原始碼,這對我們後續看Brave和其他框架整合有很大幫助:
Span/RealSpan
Recorder
Reporter/AsyncReporter/BoundedAsyncReporter
BufferNextMessage
ByteBoundedQueue

在下一篇博文中,會繼續分析Tracing的初始化過程,以及相關原始碼

相關推薦

Java分散式跟蹤系統ZipkinBrave原始碼分析-TracerSpan

Brave是Java版的Zipkin客戶端,它將收集的跟蹤資訊,以Span的形式上報給Zipkin系統。 (Zipkin是基於Google的一篇論文,名為Dapper,Dapper在荷蘭語裡是“勇敢的”的意思,這也是Brave的命名的原因) 我們一般

Java分散式跟蹤系統ZipkinBrave原始碼分析-BraveSpringMVC整合

上一篇博文中,我們分析了Brave是如何在普通Web專案中使用的,這一篇博文我們繼續分析Brave和SpringMVC專案的整合方法及原理。 我們分兩個部分來介紹和SpringMVC的整合,及XML配置方式和Annotation註解方式 pom.xml新

Java分散式跟蹤系統ZipkinZipkin原始碼分析-KafkaCollector

前面幾篇博文中,都是使用OkHttpSender來上報Trace資訊給Zipkin,這在生產環境中,當業務量比較大的時候,可能會成為一個性能瓶頸,這一篇博文我們來使用KafkaSender將Trace資訊先寫入到Kafka中,然後Zipkin使用KafkaC

Java分散式跟蹤系統ZipkinZipkin原始碼分析-Zipkin原始碼結構

前面花了大量篇幅來介紹Brave的使用,一直把Zipkin當黑盒在使用,現在來逐漸撥開Zipkin的神祕面紗。 Zipkin的原始碼地址為:https://github.com/openzipkin/zipkin Zipkin的原始碼結構 - zi

Java視角理解系統結構CPU快取

從Java視角理解系統結構連載, 關注我的微博(連結)瞭解最新動態 眾所周知, CPU是計算機的大腦, 它負責執行程式的指令; 記憶體負責存資料, 包括程式自身資料. 同樣大家都知道, 記憶體比CPU慢很多. 其實在30年前, CPU的頻率和記憶體匯流排的頻率在同一個級別, 訪問記憶體只比訪問

Java 並發編程對象的不變性安全的公布對象

不一致 字段 更新 要求 nts ava 然而 caching mut 一、不變性 滿足同步需求的還有一種方法是使用不可變對象(Immutable Object)。到眼下為止,我們介紹了很多與原子性和可見性相關的問題,比如得到失效數據。丟失更新操作

JAVA並行框架Fork/Join同步異步

arp ont ack sso util private div ext string 在Fork/Join框架中,提交任務的時候,有同步和異步兩種方式。 invokeAll()的方法是同步的,也就是任務提交後,這個方法不會返回直到所有的任務都處理完了。 fork方法是異步

基於中臺思想的物流系統設計構建物流訂單能力

一、引言 物流訂單能力作為基礎能力,需要設計一套穩定的訂單模型,以及一套能夠在高併發環境下持續可用的介面。這些介面作為原子介面,供上層業務複用。上層業務無論多麼複雜,通過這些原子介面,最終都會收斂到穩定的訂單模型中來,這也是區分基礎能力和產品服務的一個重要的邊界。 本文通過以下5點來介紹如何構建一套物流訂

Java——深入理解Class物件Class物件的載入及其獲取方式

上一篇部落格Java——深入理解Class物件(一)帶大家簡單認識了一下Java中Class物件。 現在帶大家瞭解一下Class物件的載入及其獲取方式。 1.Class物件的載入 在Java——深入理解Class物件(一)我們已提到過,Class物件是由JVM載入的,那它必然不會是胡亂載

從新手到系統管理員Linux新手學習Shell指令碼程式設計的五個例子

本文由 [茶話匯] – [Qing] 編譯自 [Avishek Kumar] 轉載請註明出處 例子一:繪製特殊圖形 [code language=”bash”] #!/bin/bash MAX_NO=0 echo -n "Enter Number between (5 to 9) : " re

【網路爬蟲】【java】微博爬蟲如何抓取HTML頁面及HttpClient使用

一、寫在前面         上篇文章以網易微博爬蟲為例,給出了一個很簡單的微博爬蟲的爬取過程,大概說明了網路爬蟲其實也就這麼回事,或許初次看到這個例子覺得有些複雜,不過沒有關係,上篇文章給的例子只是讓大家對爬蟲過程有所瞭解。接下來的系列裡,將一步一步地剖析每個過程。 現

Java實現JVM支援介面、類物件

1. 概述我的 JVM 已經能夠執行HelloWorld了,並且有了基本的 JVM 骨架,包括執行時資料結構的定義(棧、棧幀、運算元棧等),執行時的邏輯控制等。但它還沒有類和物件的概念,比如無法執行下面這更復雜的HelloWorld:public interface SpeakerInterface {

JAVA多執行緒入門JAVA中如何寫多執行緒

第一種方式:繼承Thread 步驟: 1.建立執行緒類,繼承自Thread + 重寫run,run中寫執行緒體,執行緒體就是mian()函式裡面的寫法 2.使用執行緒: 2.1 建立執行緒物件 2.2 執行緒物件.start() 步驟展示: 1. public

Java中,IO流字元流

import java.io.*; /** * 字元流 1) 編碼問題 2)認識文字和文字檔案 java的文字(char)是16位無符號整數,是字元的unicode編碼(雙位元組編碼) 檔案是byte byte byte ...的資料序列 文字檔案是文字(char)序列

跟廠長學PHP內核源碼分析的環境與工具

compiler one upload info org print fin 圖形界面 waiting 本文主要介紹分析源碼的方式,其中包含環境的搭建、分析工具的安裝以及源碼調試的基本操作。 一、工具清單 PHP7.0.12 GDB CLion 二、源碼下載及安裝

faster rcnn pytorch 復現系列generate_anchors原始碼解析

目錄​ 1. 總函式 generate_anchors 2. 函式分功能寫,首先是ratios的實現,其次是scale的實現 3. anchor2WHXY函式+WsHsXsYs2anchors函式[s表示複數] 4.  _ratio_enum(anchor,r

白話Spring原始碼spring原始碼分享的思路

做事先列個大綱,這樣思路清晰了才不會亂。 這次spring原始碼系列的部落格每個字我都堅持自己手敲,然後文采不好,但是真實。希望大家喜歡。 大綱: 1.spring框架的理解 2.beanfactory:怎麼建立bean的,怎麼載入xml中bean的定義的 3.AOP 4.a

軟體安裝集合linux埠訪問telnetnc安裝使用

一、實現功能 兩個linux常用的埠資料傳送軟體的安裝和配置 二、安裝和配置 1.telnet (1)安裝 yum -y install telnet (2)使用 telnet ibeifeng.com 44444 2.nc (1)安裝 yum inst

零基礎從頭學習SwiftSwift中的變數常量

個人部落格站已經上線了,網址 www.llwjy.com ~歡迎各位吐槽~-------------------------------------------------------------------------------------------------   

NVIDIA Jetson TX2使用筆記使用JetPack刷機安裝Package

NVIDIA JetPack SDK is the most comprehensive solution for building AI applications. Use the JetPack installer to flash your Jetson Developer Kit with the