1. 程式人生 > >dubbo 叢集容錯模式原始碼學習---ForkingCluster

dubbo 叢集容錯模式原始碼學習---ForkingCluster

ForkingCluster 模式

思想:服務端多個節點並行響應(併發任務)某個服務的請求,將執行完的結果放在一個阻塞佇列中(FIFO)。第一個完成的任務的結果進入佇列,將會馬上返回,不會等待所有任務執行完成, 只要有一個任務完成,結果放到佇列中,佇列中彈出第一個(最快完成響應的那個節點)結果, 返回服務請求方。如果n 個併發任務都出錯了,拋異常,將最後一個異常放到佇列中。服務請求返回就是拋異常。通常用於實時性高的讀操作,相對耗費資源。最大並行數要可配置。

public class ForkCluster {

private final ExecutorService executor = Executors.newCachedThreadPool();

@Test
public void run() {
    String response = forkCluster();
    System.out.println(response);
}

/**
 * 思想:多個節點並行響應(併發任務)某個服務的請求,將執行完的結果放在一個阻塞佇列中(FIFO)。
 * 不會等待所有任務執行完成, 只要有一個任務完成,結果放到佇列中,佇列中彈出第一個(最快完成響應的那個節點)結果, 返回服務請求方。
 * 如果n 個併發任務都出錯了,拋異常,將最後一個異常放到佇列中。服務請求返回就是拋異常。
 *
 * 通常用於實時性高的讀操作,相對耗費資源。最大並行數要可配置。
 *
 *
 */
public String forkCluster() {
    Node node1 = new Node(1);
    Node node2 = new Node(2);
    Node node3 = new Node(3);
    Node node4 = new Node(4);
    List<Node> cluster = new ArrayList<>();
    cluster.add(node1);
    cluster.add(node2);
    cluster.add(node3);
    cluster.add(node4);
    BlockingQueue<Object> responses = new LinkedBlockingQueue<>();
    final AtomicInteger count = new AtomicInteger(); // 統計異常節點的個數.
    for (Node node : cluster) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String response = node.doSomething();
                    responses.offer(response);
                }catch (Throwable e) {
                    int value = count.incrementAndGet();
                    if (value >= cluster.size())
                        responses.offer(e);
                }
            }
        });
    }
    try {
        Object resp = responses.poll(1000, TimeUnit.MILLISECONDS);
        if (resp instanceof Throwable) {
            throw new RuntimeException("Cannot perform cluster fork. ");
        }
        return (String)resp;
    } catch (InterruptedException e) {
        throw new RuntimeException("Cannot get cluster fork response.");
    }
}

class Node {
    private int id;

    Node(int id) {
        this.id = id;
    }

    String doSomething() {
        return "Node " + id + "第一個完成響應";
    };
}

}