1. 程式人生 > >小規模的流處理框架.Part 1: thread pools

小規模的流處理框架.Part 1: thread pools

原文連結 作者:Tomasz Nurkiewicz 譯者:simonwang
(譯者:強力推薦這篇文章,作者設計了一個用於小流量的流式資料處理框架,並詳細給出了每一個需要注意的設計細節,對比了不同設計方案的優缺點,能夠讓你對流處理過程,某些設計模式和設計原則以及指標度量工具有一個更深刻的認識!)
GeeCON 2016上我為我的公司準備了一個程式設計競賽,這次的任務是設計並實現一個能夠滿足以下要求的系統:

系統能夠每秒處理1000個任務,每一個Event至少有2個屬性:

  • clientId-我們希望每一秒有多個任務是在同一個客戶端下處理的(譯者:不同的clientId對應不同的ClientProjection,即對應不同的一系列操作)
  • UUID-全域性唯一的

消費一個任務要花費10毫秒,為這樣的流設計一個消費者:

  1. 能夠實時的處理任務
  2. 和同一個客戶端有關的任務應該被有序地處理,例如你不能對擁有同一個clientId的任務序列使用並行處理
  3. 如果10秒內出現了重複的UUID,丟棄它。假設10秒後不會重複

有幾個關於以上要求的重要細節:

  1. 1000events/s的任務量,消耗一個event要10ms,1s內能消耗100個event,那麼為了保證實時性,就需要10個併發的消費者。
  2. events擁有聚集的ID(clientId),在1s內我們希望多個event能夠被指定到同一個給定的client上,並且我們不能夠併發地或無序地處理這些event。
  3. 我們必須以某種方式忽略重複的資訊,最可能的方法就是記住最近10s內所有的ID,這就需要暫時儲存一萬個UUID。

在這篇文章中,我會引導你們使用一些成功的方案並做一些小小的突破,你將要學習如何使用精確地有針對性的度量器來解決問題。

Naive sequential processing

我們可以在迭代器中處理這個問題,首先我們可以對API做一些假設,想象一下它會是這個樣子:

interface EventStream {

    void consume(EventConsumer consumer);

}

@FunctionalInterface
interface EventConsumer {
    Event consume(Event event);
}

@Value
class Event {

    private final Instant created = Instant.now();
    private final int clientId;
    private final UUID uuid;

}

一個典型的推送式API,和JMS很像。需要注意的是EventConsumer是阻塞的,這就意味著它不會返回新的Event,除非前一個已經被處理完畢了。這僅僅是我做出的一個假設,而且它沒有太大的違反之前的要求,這也是JMS中訊息偵聽者的工作機制。下面是一個簡單的實現,這個實現只是簡單的添加了一個工作間隔為10ms的偵聽器:

class ClientProjection implements EventConsumer {

    @Override
    public Event consume(Event event) {
        Sleeper.randSleep(10, 1);//譯者:這裡只是用睡眠來代替實際程式設計中一些耗時的操作
        return event;
    }

}

當然在現實生活中這個consumer可能會在資料庫中做一些儲存操作,或者進行遠端呼叫等等。我在睡眠時間的分佈上添加了一些隨機性,目的是使得手動測試更加貼近實際情況(譯者:實際情況中耗時操作的用時不盡相同,所以要隨機化):

class Sleeper {

    private static final Random RANDOM = new Random();

    static void randSleep(double mean, double stdDev) {
        final double micros = 1_000 * (mean + RANDOM.nextGaussian() * stdDev);
        try {
            TimeUnit.MICROSECONDS.sleep((long) micros);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}

//...

EventStream es = new EventStream();  //some real implementation here
es.consume(new ClientProjection());

以上的程式碼能夠編譯並執行,但為了滿足設計要求我們必須要插入一些度量器。最重要的度量器就是有關於資訊消費的潛伏期,這個潛伏期指的是從資訊的產生到開始處理的這段時間。我們使用 Dropwizard Metrics來實現這個潛伏期的度量:

class ClientProjection implements EventConsumer {

    private final ProjectionMetrics metrics;

    ClientProjection(ProjectionMetrics metrics) {
        this.metrics = metrics;
    }

    @Override
    public Event consume(Event event) {
        metrics.latency(Duration.between(event.getCreated(), Instant.now()));
        Sleeper.randSleep(10, 1);
        return event;
    }

}

ProjectionMetrics類的功能如下(主要就是將event的潛伏期用柱狀圖的形式表現出來):

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Slf4jReporter;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

@Slf4j
class ProjectionMetrics {

    private final Histogram latencyHist;

    ProjectionMetrics(MetricRegistry metricRegistry) {
        final Slf4jReporter reporter = Slf4jReporter.forRegistry(metricRegistry)
                .outputTo(log)
                .convertRatesTo(TimeUnit.SECONDS)
                .convertDurationsTo(TimeUnit.MILLISECONDS)
                .build();
        reporter.start(1, TimeUnit.SECONDS);
        latencyHist = metricRegistry.histogram(MetricRegistry.name(ProjectionMetrics.class, "latency"));
    }

    void latency(Duration duration) {
        latencyHist.update(duration.toMillis());
    }
}

現在當你執行這個解決方案時,你很快就會發現潛伏期的中值和第99.9%的值(分別指的是第count/2個值和第99.9%*count個值)都在無限增長:

type=HISTOGRAM, [...] count=84,   min=0,  max=795,   mean=404.88540608274104, [...]
    median=414.0,   p75=602.0,   p95=753.0,   p98=783.0,   p99=795.0,   p999=795.0
type=HISTOGRAM, [...] count=182,  min=0,  max=1688,  mean=861.1706371990878,  [...]
    median=869.0,   p75=1285.0,  p95=1614.0,  p98=1659.0,  p99=1678.0,  p999=1688.0

[...30 seconds later...]

type=HISTOGRAM, [...] count=2947, min=14, max=26945, mean=15308.138585757424, [...]
    median=16150.0, p75=21915.0, p95=25978.0, p98=26556.0, p99=26670.0, p999=26945.0

在運行了30s之後我們的應用程式處理event會出現平均15s的延遲,因此它並不具備完整的實時性,顯然缺少併發才是原因所在。我們的ClientProjection事件消費者會花費10ms去完成事件處理,所以它每秒最多可以處理100個event,然而我們需要更多的處理量。我們必須要增強ClientProjection同時不違反其他的設計要求!

Naive thread pool

最顯而易見的解決方法是對EventConsumer使用多執行緒技術,最簡單的實現途徑就是利用ExecutorService:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class NaivePool implements EventConsumer, Closeable {

    private final EventConsumer downstream;
    private final ExecutorService executorService;

    NaivePool(int size, EventConsumer downstream) {
        this.executorService = Executors.newFixedThreadPool(size);
        this.downstream = downstream;
    }

    @Override
    public Event consume(Event event) {
        executorService.submit(() -> downstream.consume(event));
        return event;
    }

    @Override
    public void close() throws IOException {
        executorService.shutdown();
    }
}

這裡我們使用了裝飾者模式。最初的ClientProjection實現EventConsumer是可行的,但我們利用加入了併發的另一個EventConsumer實現對ClientProjection進行包裝。這就允許我們能夠將更復雜的行為組合起來而不用更改ClientProjection本身,這種設計可以:

  • 解耦:不同的EventConsumer互不影響,但它們卻可以自由地組合在一起,在同一個執行緒池中工作
  • 單一職責:每個EventConsumer只做一項工作,並將自己委託給下一個元件即執行緒池
  • 開放/關閉原則:我們可以改變系統的行為卻不用修改現有實現

開放/關閉原則通常可以通過注入策略模式和模板方法模式來實現,這很簡單。整體的程式碼如下:

MetricRegistry metricRegistry =
        new MetricRegistry();
ProjectionMetrics metrics =
        new ProjectionMetrics(metricRegistry);
ClientProjection clientProjection =
        new ClientProjection(metrics);
NaivePool naivePool =
        new NaivePool(10, clientProjection);
EventStream es = new EventStream();
es.consume(naivePool);

我們寫的度量器顯示這種改良的方案確實表現的更好:

type=HISToOGRAM, count=838, min=1, max=422, mean=38.80768197277468, [...]
    median=37.0, p75=45.0, p95=51.0, p98=52.0, p99=52.0, p999=422.0
type=HISTOGRAM, count=1814, min=1, max=281, mean=47.82642776789085, [...]
    median=51.0, p75=57.0, p95=61.0, p98=62.0, p99=63.0, p999=65.0

[...30 seconds later...]

type=HISTOGRAM, count=30564, min=5, max=3838, mean=364.2904915942238, [...]
    median=352.0, p75=496.0, p95=568.0, p98=574.0, p99=1251.0, p999=3531.0

我們可以看到延遲雖然也在增長但規模卻小得多,30s後潛伏期達到了364ms。這種潛伏期增長是系統問題,我們需要更多的度量器。注意到NaivePool(你會明白為什麼這裡是naive-初級的)會開啟10條執行緒,這應該足以處理1000個event,每個要花費10ms。在實際情況下,我們需要一點額外的處理容量來避免因垃圾回收或小規模峰值負荷所帶來的問題。為了證明執行緒池才是我們的瓶頸,我們要監控它內部的佇列,這需要一點小小的工作量:

class NaivePool implements EventConsumer, Closeable {

    private final EventConsumer downstream;
    private final ExecutorService executorService;

    NaivePool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {
        LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
        String name = MetricRegistry.name(ProjectionMetrics.class, "queue");
        Gauge<Integer> gauge = queue::size;
        metricRegistry.register(name, gauge);
        this.executorService =
                new ThreadPoolExecutor(
                        size, size, 0L, TimeUnit.MILLISECONDS, queue);
        this.downstream = downstream;
    }

    @Override
    public Event consume(Event event) {
        executorService.submit(() -> downstream.consume(event));
        return event;
    }

    @Override
    public void close() throws IOException {
        executorService.shutdown();
    }
}

這裡使用ThreadPoolExecutor的目的是為了能夠提供自定義的LinkedBlockingQueue例項,接下來就可以監控佇列的長度(see:ExecutorService – 10 tips and tricks)。Gauge會週期性地呼叫queue::size,你需要的時候就會提供佇列的長度。度量器顯示執行緒池的大小確實是一個問題:

type=GAUGE, name=[...].queue, value=35
type=GAUGE, name=[...].queue, value=52

[...30 seconds later...]

type=GAUGE, name=[...].queue, value=601

不斷增長的佇列長度進一步加劇了佇列內正在等待著的task的潛伏期,將執行緒池的大小增加到10到20之間,最終佇列的長度顯示合理並且沒有失控。然而我們仍然沒有解決重複ID問題,並且也沒有解決同一個clientId可能會對它的events進行併發處理的問題。

Obscure locking

讓我們從避免對擁有相同clientId的events使用並行處理開始。如果兩個有相同clientId的event一個接一個地來,相繼進入執行緒池佇列,那麼NaivePool會幾乎同時將它們取出佇列實現並行處理。開始的時候我們可能會想到對每一個clientId加一個Lock:

@Slf4j
class FailOnConcurrentModification implements EventConsumer {

    private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();
    private final EventConsumer downstream;

    FailOnConcurrentModification(EventConsumer downstream) {
        this.downstream = downstream;
    }

    @Override
    public Event consume(Event event) {
        Lock lock = findClientLock(event);
        if (lock.tryLock()) {
            try {
                downstream.consume(event);
            } finally {
                lock.unlock();
            }
        } else {
            log.error("Client {} already being modified by another thread", event.getClientId());
        }
        return event;
    }

    private Lock findClientLock(Event event) {
        return clientLocks.computeIfAbsent(
                event.getClientId(),
                clientId -> new ReentrantLock());
    }

}

以上的程式碼完全搞錯方向了,這種設計太過於複雜,但執行程式碼至少會發現一個問題。events的處理過程就像下面這樣,由一個裝飾者包裹著另一個:

ClientProjection clientProjection =
        new ClientProjection(new ProjectionMetrics(metricRegistry));
FailOnConcurrentModification failOnConcurrentModification =
        new FailOnConcurrentModification(clientProjection);
NaivePool naivePool =
        new NaivePool(10, failOnConcurrentModification, metricRegistry);
EventStream es = new EventStream();

es.consume(naivePool);

一旦執行過一會兒錯誤資訊就會彈出來,告訴我們在其他執行緒中已經在處理擁有相同clientId的event。我們為每一個clientId都綁定了一個Lock,這樣做的目的是為了弄清楚如果其他的執行緒沒有處理的時候client的狀態。這種醜陋的方法讓我們的方案變得慘不忍睹,與其因獲取不到Lock而丟擲錯誤資訊,還不如等待一下,等待Lock被釋放:

@Slf4j
class WaitOnConcurrentModification implements EventConsumer {

    private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();
    private final EventConsumer downstream;
    private final Timer lockWait;

    WaitOnConcurrentModification(EventConsumer downstream, MetricRegistry metricRegistry) {
        this.downstream = downstream;
        lockWait = metricRegistry.timer(MetricRegistry.name(WaitOnConcurrentModification.class, "lockWait"));
    }

    @Override
    public Event consume(Event event) {
        try {
            final Lock lock = findClientLock(event);
            final Timer.Context time = lockWait.time();
            try {
                final boolean locked = lock.tryLock(1, TimeUnit.SECONDS);
                time.stop();
                if(locked) {
                    downstream.consume(event);
                }
            } finally {
                lock.unlock();
            }
        } catch (InterruptedException e) {
            log.warn("Interrupted", e);
        }
        return event;
    }

    private Lock findClientLock(Event event) {
        return clientLocks.computeIfAbsent(
                event.getClientId(),
                clientId -> new ReentrantLock());
    }

}

這次的設計和之前的很像,但不同的是tryLock()會持續1s的時間以等待指定client的Lock被釋放。如果兩個有相同clientId的event相繼出現,其中一個會獲取到Lock進行處理,而另一個會一直阻塞直到unlock()被呼叫。
這段程式碼不僅複雜,而且在某些微妙的情況下可能會發生不可預知的錯誤。例如,如果兩個有相同clientId的event幾乎在同一時刻出現,那麼誰將會是第一個?兩個event會在同一時刻請求Lock,這時我們並不能保證哪一個event會第一個得到非公平鎖,處理event的順序可能就會發生混亂。肯定會有更好的方法…

Dedicated threads

讓我們退一步,深吸一口氣。你會怎樣確保事情不會並行發生?僅僅使用一個執行緒就行了!事實上這是我們最開始的做法,但它的處理流量並不理想。我們不用關心不同clientIds的併發情況,我們只需要確保有相同clientId的events由一個專有執行緒處理就行。
你可能會想到使用一個map將clientId對映到Thread,當然這太簡單了。我們可能會創造上千個執行緒,而它們大多數的時候可能都處於空閒狀態(對於給定的clientId每秒可能只處理少數幾個event)。一個很好的折中是使用固定大小的執行緒池,每個執行緒負責指定的一些clientId。在這種方法中,兩個不同的clientId可能會在同一個執行緒中完成處理,但相同的clientId總是在同一個執行緒中處理。如果兩個有相同clientId的event出現了,它們都會被送去同一個執行緒,因此為了避免併發處理,以下實現相當簡單:

class SmartPool implements EventConsumer, Closeable {

    private final List<ExecutorService> threadPools;
    private final EventConsumer downstream;

    SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {
        this.downstream = downstream;
        List<ExecutorService> list = IntStream
                .range(0, size)
                .mapToObj(i -> Executors.newSingleThreadExecutor())
                .collect(Collectors.toList());
        //譯者:這裡使用CopyOnWriteArrayList是為了保證訪問threadPools裡面元素時是執行緒安全的
        this.threadPools = new CopyOnWriteArrayList<>(list);
    }

    @Override
    public void close() throws IOException {
        threadPools.forEach(ExecutorService::shutdown);
    }

    @Override
    public Event consume(Event event) {
        final int threadIdx = event.getClientId() % threadPools.size();
        final ExecutorService executor = threadPools.get(threadIdx);
        executor.submit(() -> downstream.consume(event));
        return event;
    }
}

關鍵點是最後的那部分:

int threadIdx = event.getClientId() % threadPools.size();
ExecutorService executor = threadPools.get(threadIdx);

這個簡單的演算法總是為相同的clientId使用同一個ExecutorService單執行緒,不同的ID可能會在同一個執行緒內處理,例如當threadPools的大小為20時,Id為7, 27, 47的client都會在索引為7的執行緒內處理。雖然一個執行緒會對應多個clientId,但只要一個clientId在同一個執行緒內處理就行了。基於這點,鎖就不需要了,順序呼叫也就得到了保障。邊注:一個clientId對應一個執行緒可能產生無法預估的後果,但一個actor對應一個clientId(例如在Akka裡面就是如此)就簡單許多。
順便為了保證安全,我為每一個執行緒池都插入了度量器以監控它們的佇列長度,實現如下:

class SmartPool implements EventConsumer, Closeable {

    private final List<LinkedBlockingQueue<Runnable>> queues;
    private final List<ExecutorService> threadPools;
    private final EventConsumer downstream;

    SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {
        this.downstream = downstream;
        this.queues = IntStream
                .range(0, size)
                .mapToObj(i -> new LinkedBlockingQueue<Runnable>())
                .collect(Collectors.toList());
        List<ThreadPoolExecutor> list = queues
                .stream()
                .map(q -> new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, q))
                .collect(Collectors.toList());
        this.threadPools = new CopyOnWriteArrayList<>(list);
        metricRegistry.register(MetricRegistry.name(ProjectionMetrics.class, "queue"), (Gauge<Double>) this::averageQueueLength);
    }

    private double averageQueueLength() {
        double totalLength =
            queues
                .stream()
                .mapToDouble(LinkedBlockingQueue::size)
                .sum();
        return totalLength / queues.size();
    }

    //...

}

如果你是偏執狂的話,可以為每一個LinkedBlockingQueue都加入一個度量器(metric)。

Deduplication and idempotency

在分散式環境中,當你的生產者有至少一次保證時(指的是將event傳送到系統的這個動作保證會發生一次),接收重複事件的情況就會經常發生。產生這種現象的原因已經超出了本文的範疇,但我們必須要學會如何處理這種問題。一種方法是為每一個資訊新增一個全域性唯一的ID(UUID),並且確保在消費者那端對同一個UUID的資訊不會處理兩次。每一個Event都有一個UUID,在滿足我們要求的情況下最直接的辦法就是簡單地將處理過的UUID儲存起來,並且對每一個新來的UUID都進行驗證。隨著時間的推移,ID會越積越多,如果使用像ConcurrentHashMap(在JDK中沒有ConcurrentHashSet)這樣的資料結構將會導致記憶體洩漏,這就是為什麼我們專門只針對10s內的重複事件進行處理。當遭遇衝突時,從技術上你可以利用ConcurrentHashMap將UUID對映到時間戳,然後使用後臺執行緒將超過10s的元素移除。當然如果你是一個Guava擁護者,Cache會使用宣告收回政策做一些小把戲:

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

class IgnoreDuplicates implements EventConsumer {

    private final EventConsumer downstream;

    private Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder()
            .expireAfterWrite(10, TimeUnit.SECONDS)
            .build();

    IgnoreDuplicates(EventConsumer downstream) {
        this.downstream = downstream;
    }

    @Override
    public Event consume(Event event) {
        final UUID uuid = event.getUuid();
        if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {
            return downstream.consume(event);
        } else {
            return event;
        }
    }
}

再一次為了保險起見,我認為應該加入兩個有用的度量器:cache的大小以及發現的重複事件數量,讓我們插入這些度量器:

class IgnoreDuplicates implements EventConsumer {

    private final EventConsumer downstream;
    private final Meter duplicates;

    private Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder()
            .expireAfterWrite(10, TimeUnit.SECONDS)
            .build();

    IgnoreDuplicates(EventConsumer downstream, MetricRegistry metricRegistry) {
        this.downstream = downstream;
        duplicates = metricRegistry.meter(MetricRegistry.name(IgnoreDuplicates.class, "duplicates"));
        metricRegistry.register(MetricRegistry.name(IgnoreDuplicates.class, "cacheSize"), (Gauge<Long>) seenUuids::size);
    }

    @Override
    public Event consume(Event event) {
        final UUID uuid = event.getUuid();
        if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {
            return downstream.consume(event);
        } else {
            duplicates.mark();
            return event;
        }
    }
}

終於,我們擁有了所有的碎片來構建我們的框架,其核心思想就是利用EventConsumer的例項相互包裝以組成管道:

  1. 首先我們應用IgnoreDuplicates以排除重複的event
  2. 然後我們呼叫SmartPool,它總是會將給定的clientId送到指定的單執行緒池,接著在那個執行緒完成後續操作
  3. 最終ClientProjection被呼叫以執行真正的業務邏輯

你可以選擇(當然也可以不這麼做)將FailOnConcurrentModification放到SmartPool與ClientProjection之間,這樣做是為了保證額外安全(設計中併發修改不應該發生)(譯者:加鎖可能會影響效能,這純粹是為了保險起見,在SmartPool中併發修改的情況幾乎不可能發生):

ClientProjection clientProjection =
        new ClientProjection(new ProjectionMetrics(metricRegistry));
FailOnConcurrentModification concurrentModification =
        new FailOnConcurrentModification(clientProjection);
SmartPool smartPool =
        new SmartPool(12, concurrentModification, metricRegistry);
IgnoreDuplicates withoutDuplicates =
        new IgnoreDuplicates(smartPool, metricRegistry);
EventStream es = new EventStream();
es.consume(withoutDuplicates);

我們花了很多的精力提出相對簡單但卻有良好結構(我希望你同意這點)的解決方案。最後處理併發問題的最好途徑就是…避免併發,並且將可能會產生資源競爭的程式碼放到一個執行緒中執行。這也是Akka actors(一個資訊由一個actor處理)和RxJava(一個資訊由Subscriber處理)所使用的思想。在下一部分我們將會了解RxJava中的宣告式解決方案。

See also: