1. 程式人生 > >小規模的流處理框架.Part 2: RxJava 1.x/2.x

小規模的流處理框架.Part 2: RxJava 1.x/2.x

原文連結 作者:Tomasz Nurkiewicz 譯者:simonwang
part 1: thread pools中,我們設計並實現了一個相對簡單的實時處理events的系統。在閱讀本文之前你應該確保已經讀懂了Part1的那篇文章,這裡重新闡述一遍系統的設計要求:

系統能夠每秒處理1000個任務,每一個Event至少有2個屬性:

  • clientId-我們希望每一秒有多個任務是在同一個客戶端下處理的(譯者:不同的clientId對應不同的ClientProjection,即對應不同的一系列操作)
  • UUID-全域性唯一的

消費一個任務要花費10毫秒,為這樣的流設計一個消費者:

  1. 能夠實時的處理任務
  2. 和同一個客戶端有關的任務應該被有序地處理,例如你不能對擁有同一個clientId的任務序列使用並行處理
  3. 如果10秒內出現了重複的UUID,丟棄它。假設10秒後不會重複

到目前為止我們提出了執行緒池和共享快取結合的設計,而在這篇文章中我們會使用RxJava進行實現。開始之前,我從沒有提到EventStream是如何實現的,僅僅是給出了API:

interface EventStream {

    void consume(EventConsumer consumer);

}

事實上為了能夠進行測試,我建立了一個RxJava流,它所有的行為都符合設計要求:

@Slf4j
class EventStream {

    void consume(EventConsumer consumer) {
        observe()
            .subscribe(
                consumer::consume,
                e -> log.error("Error emitting event", e)
        );
    }

    Observable<Event> observe() {
        return Observable
                .interval(1, TimeUnit.MILLISECONDS)
                .delay(x -> Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS))
                .map(x -> new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID()))
                .flatMap(this::occasionallyDuplicate, 100)
                .observeOn(Schedulers.io());
    }

    private Observable<Event> occasionallyDuplicate(Event x) {
        final Observable<Event> event = Observable.just(x);
        if (Math.random() >= 0.01) {
            return event;
        }
        final Observable<Event> duplicated =
                event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS);
        return event.concatWith(duplicated);
    }

}

雖然你們沒必要明白這個流模擬器是怎麼工作的,但它的工作過程相當有趣。首先我們使用interval()產生一個每毫秒輸出一個Long型值(0,1,2)的穩定流(這是因為設計要求系統每秒能處理1000個event)。然後我們使用delay()對每個event進行0到1000微秒內的隨機延遲,在這之後events出現的時機就變得不可預測,就更符合真實情況。最終我們使用map()將每個Long型值對映到一個隨機的Event上,每個Event都包含一個1000到1100(inclusive-exclusive)之間的clientId。
最後一點就有趣了,我們想模擬隨機的重複事件。為了做到這點,我們使用flatMap()將每個event對映到自身(99%情況下)。然而在剩下的1%情況中,我們將event返回兩次,第二次出現的時間延遲了10ms到5s。實際應用時,重複的event與第一次出現的event之間會相隔幾百個其他的event,這就使得流的行為更加符合真實情況。
有兩種方法可以與EventStream進行互動-基於回撥的consume()和基於流的observer()。我們可以利用Observable快速地建立處理管道,這種方法的功能和part1中的非常的像但更加簡單。

Missing backpressure

首先利用RxJava實現最初的方案非常簡短:

EventStream es = new EventStream();
EventConsumer clientProjection = new ClientProjection(
        new ProjectionMetrics(
                new MetricRegistry()));

es.observe()
        .subscribe(
                clientProjection::consume,
                e -> log.error("Fatal error", e)
        );

(ClientProjection,ProjectionMetrics等來自於part1).使用以上的程式碼幾乎會立刻丟擲MissingBackpressureException,這也是預料之中的。你們記得嗎,我們在part1中最初的方案會執行的越來越慢是因為處理event的潛伏期越來越長。RxJava會盡量避免這種情況,而且也會避免佇列溢位。之所以會丟擲MissingBackpressureException是因為消費者(ClientProjection)沒有能力實時地處理event。這是一個fail-fast機制。聰明的做法就是將處理的過程移到一個獨立的執行緒池,就像之前那樣,但這次要使用RxJava來實現:

EventStream es = new EventStream();
EventConsumer clientProjection = new FailOnConcurrentModification(
        new ClientProjection(
                new ProjectionMetrics(
                        new MetricRegistry())));

es.observe()
        .flatMap(e -> clientProjection.consume(e, Schedulers.io()))
        .window(1, TimeUnit.SECONDS)
        .flatMap(Observable::count)
        .subscribe(
                c -> log.info("Processed {} events/s", c),
                e -> log.error("Fatal error", e)
        );

EventConsumer中添加了一個輔助方法,它能夠利用提供的Scheduler非同步地處理event:

@FunctionalInterface
interface EventConsumer {
    Event consume(Event event);

    default Observable<Event> consume(Event event, Scheduler scheduler) {
        return Observable
                .fromCallable(() -> this.consume(event))
                .subscribeOn(scheduler);
    }

}

使用flatMap()在一個獨立的Scheduler.io()中處理event,這樣每一個消費過程都是非同步呼叫的。這次event的處理已經符合實時性的要求了,但還有一個更大的問題。我使用FailOnConcurrentModification對ClientProjection進行包裝是有原因的。events的處理都是彼此獨立的,所以對於同一個clientId有可能會併發地處理兩個event,這樣並不好。幸運的是比起使用執行緒來說,用RxJava解決這個問題要更加簡單:

es.observe()
        .groupBy(Event::getClientId)
        .flatMap(byClient -> byClient
                .observeOn(Schedulers.io())
                .map(clientProjection::consume))
        .window(1, TimeUnit.SECONDS)
        .flatMap(Observable::count)
        .subscribe(
                c -> log.info("Processed {} events/s", c),
                e -> log.error("Fatal error", e)
        );

上面的程式碼改動的地方只有一點點。首先我們依據clientId對event進行分組,將單一的Observable流分割成多個流,每個名為byClient的子流都代表著擁有相同clientId的event。現在如果我們對子流進行對映,我們能夠確定有相同clientId的event是絕不會併發地被處理的。輸出流是惰性的,所以我們必須對流呼叫subscribe。與其對每一個event單獨地呼叫subscribe,我們選擇將每一秒內處理的event收集起來並對其計數。這樣一來每秒我們接收到的就是一個Integer型別的event,它代表著每秒內我們處理的event數量。

Impure, non-idiomatic, error-prone, unsafe solution of deduplication using global state

現在我們必須除去重複的UUID,最簡單也是最笨的做法就是利用全域性狀態。我們能夠簡單地利用filter()在cache中查詢重複的event:

final Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder()
        .expireAfterWrite(10, TimeUnit.SECONDS)
        .build();

es.observe()
        .filter(e -> seenUuids.getIfPresent(e.getUuid()) == null)
        .doOnNext(e -> seenUuids.put(e.getUuid(), e.getUuid()))
        .subscribe(
                clientProjection::consume,
                e -> log.error("Fatal error", e)
        );

如果你想要監控上面程式碼的效果可以簡單的加入一個度量器:

Meter duplicates = metricRegistry.meter("duplicates");

es.observe()
        .filter(e -> {
            if (seenUuids.getIfPresent(e.getUuid()) != null) {
                duplicates.mark();
                return false;
            } else {
                return true;
            }
        })

在操作符內部訪問全域性的、尤其是可變的狀態時是非常危險的,並且這樣會破壞RxJava唯一的目的-簡單併發。雖然我們使用的是Guava中執行緒安全的Cache,但在很多情況下你很容易會忘記這個全域性共享的可變狀態是可以被多個執行緒訪問的,如果你發現你在操作符鏈中修改外部的一些變數的話,那就要非常小心了。

Custom distinct() operator in RxJava 1.x

RxJava 1.x有一個distinct()運算函式,它大概可以做如下的工作:

es.observe()
        .distinct(Event::getUuid)
        .groupBy(Event::getClientId)

不幸的是distinct()會在內部將所有的UUID都儲存在一個不斷增長的HashSet裡面,但我們只關心10s內的重複事件。通過複製貼上DistinctOperator的實現,我創造了DistinctEvent操作符,它利用了Guava的cache僅僅只儲存10s內的UUID。我故意將Event硬編碼在這個操作符內而不是將它寫成一般性的就是為了讓程式碼更易懂:

class DistinctEvent implements Observable.Operator<Event, Event> {
    private final Duration duration;

    DistinctEvent(Duration duration) {
        this.duration = duration;
    }

    @Override
    public Subscriber<? super Event> call(Subscriber<? super Event> child) {
        return new Subscriber<Event>(child) {
            final Map<UUID, Boolean> keyMemory = CacheBuilder.newBuilder()
                    .expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS)
                    .<UUID, Boolean>build().asMap();

            @Override
            public void onNext(Event event) {
                if (keyMemory.put(event.getUuid(), true) == null) {
                    child.onNext(event);
                } else {
                    request(1);
                }
            }

            @Override
            public void onError(Throwable e) {
                child.onError(e);
            }

            @Override
            public void onCompleted() {
                child.onCompleted();
            }

        };
    }
}

自定義的操作符使用起來非常簡單,實現如下:

es.observe()
        .lift(new DistinctEvent(Duration.ofSeconds(10)))
        .groupBy(Event::getClientId)
        .flatMap(byClient -> byClient
                .observeOn(Schedulers.io())
                .map(clientProjection::consume)
        )
        .window(1, TimeUnit.SECONDS)
        .flatMap(Observable::count)
        .subscribe(
                c -> log.info("Processed {} events/s", c),
                e -> log.error("Fatal error", e)
        );

事實上如果我們跳過每秒的logging實現可以變得更加簡單:

es.observe()
        .lift(new DistinctEvent(Duration.ofSeconds(10)))
        .groupBy(Event::getClientId)
        .flatMap(byClient -> byClient
                .observeOn(Schedulers.io())
                .map(clientProjection::consume)
        )
        .subscribe(
                e -> {},
                e -> log.error("Fatal error", e)
        );

這個方案比之前的基於執行緒池和裝飾者模式的要更加簡短,其中唯一麻煩的部分就是在自定義的操作符中當儲存了太多的UUID之後會造成記憶體洩漏,幸好RxJava 2能解決這個問題。

RxJava 2.x and more powerful built-in distinct()

distinct()允許使用自定義的Collection而不必使用內建的HashSet(感覺2.x中可以使用自定義的資料結構後,1.x中的DistinctEvent就完全沒必要了)。不管你是否相信,依賴倒置不僅僅只出現在Spring框架或者Java EE中。當一個庫允許你提供它內部資料結構的自定義實現時,這就已經是依賴反轉。首先我創造了一個輔助方法,它能夠建立Set,Set由Map提供依賴,而Map則由Cache提供依賴。這就像委託一樣!

private Set<UUID> recentUuids() {
    return Collections.newSetFromMap(
            CacheBuilder.newBuilder()
                    .expireAfterWrite(10, TimeUnit.SECONDS)
                    .<UUID, Boolean>build()
                    .asMap()
    );
}

有了這個方法之後,我們就能利用以下的程式碼實現整個任務:

es.observe()
        .distinct(Event::getUuid, this::recentUuids)
        .groupBy(Event::getClientId)
        .flatMap(byClient -> byClient
                .observeOn(Schedulers.io())
                .map(clientProjection::consume)
        )
        .subscribe(
                e -> {},
                e -> log.error("Fatal error", e)
        );

這段程式碼是如此的優雅、簡單、清晰!它的大致流程如下:

  • observe一個event流
  • 消除重複的UUID
  • 依據clientId對event分組
  • 對每一個client有序地處理event

希望你能喜歡這些方案,並能將之運用到你的日常生活中去。

See also: