1. 程式人生 > >zookeeper開源客戶端Curator典型應用場景之-訊息佇列(十二)

zookeeper開源客戶端Curator典型應用場景之-訊息佇列(十二)

Curator框架也有分散式佇列實現。 利用ZK的PERSISTENT SEQUENTIAL(持久順序)節點,可以保證放入到佇列中的專案是按照順序排隊的。並且宕機重啟並不丟失訊息, 如果單一的消費者從佇列中取資料, 那麼它是先入先出的,這也是佇列的特點。 如果你嚴格要求順序,你就的使用單一的消費者,可以使用leader選舉只讓leader作為唯一的消費者。

但是,我們在閱讀官網的時候,發現每一個Queue文章的開頭都有建議我們不要使用ZooKeeper做Queue,詳細內容可以看 Tech Note 4, 原因有五:

  1. ZK有1MB 的傳輸限制。 實踐中ZNode必須相對較小,而佇列包含成千上萬的訊息,非常的大。
  2. 如果有很多節點,ZK啟動時相當的慢。 而使用queue會導致好多ZNode. 你需要顯著增大 initLimit 和 syncLimit.
  3. ZNode很大的時候很難清理。Netflix不得不建立了一個專門的程式做這事。
  4. 當很大量的包含成千上萬的子節點的ZNode時, ZK的效能變得不好
  5. ZK的資料庫完全放在記憶體中。 大量的Queue意味著會佔用很多的記憶體空間。

儘管如此, Curator還是建立了各種Queue的實現。 如果Queue的資料量不太多,資料量不太大的情況下,酌情考慮,還是可以使用的。

DistributedQueue

DistributedQueue是最普通的一種佇列。 它設計以下四個類:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

建立佇列使用QueueBuilder,它也是其它佇列的建立類。

public static <T> QueueBuilder<T> builder(CuratorFramework client,
                                          QueueConsumer<T> consumer,
                                          QueueSerializer<
T>
serializer, java.lang.String queuePath);
QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedQueue<MessageType queue = builder.build();

建立好queue就可以往裡面放入資料了:

queue.put(aMessage);

QueueSerializer提供了對佇列中的物件的序列化和反序列化。
QueueConsumer是消費者, 它可以接收佇列的資料。 處理佇列中的資料的程式碼邏輯可以放在QueueConsumer.consumeMessage()中。
正常情況下先將訊息從佇列中移除,再交給消費者消費。 但這是兩個步驟,不是原子的。 可以呼叫Builder的lockPath()消費者加鎖, 當消費者消費資料時持有鎖,這樣其它消費者不能消費此訊息。 如果消費失敗或者程序死掉,訊息可以交給其它程序。這會帶來一點效能的損失。 最好還是單消費者模式使用佇列。

列子:
public class DistributedQueueExample {


    private static final String PATH = "/example/queue";
    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new CuratorListener() {
                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("CuratorEvent: " + event.getType().name());
                }
            });
            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i);
                Thread.sleep((long)(3 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {
        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>(){
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }
            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }
    private static QueueConsumer<String> createQueueConsumer() {
        return new QueueConsumer<String>(){
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }
            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }

}
DistributedIdQueue

DistributedIdQueue和上面的佇列類似, 但是可以為佇列中的每一個元素設定一個ID。 可以通過ID把佇列中任意的元素移除。
它涉及幾個類:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

通過下面方法建立:

builder.buildIdQueue()

放入元素時:

queue.put(aMessage, messageId);

移除元素時:

int numberRemoved = queue.remove(messageId);

在這個例子中, 有些元素還沒有被消費者消費時就移除了,這樣消費者不會收到刪除的訊息。

public class DistributedIdQueueExample {

    private static final String PATH = "/example/queue";
    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new CuratorListener() {
                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("CuratorEvent: " + event.getType().name());
                }
            });
            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long)(50 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {
        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>(){
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }
            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }
    private static QueueConsumer<String> createQueueConsumer() {
        return new QueueConsumer<String>(){
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }
            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }

}
DistributedPriorityQueue

優先順序佇列對佇列中的元素按照優先順序進行排序。 Priority越小, 元素月靠前, 越先被消費掉。
它涉及下面幾個類:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

通過builder.buildPriorityQueue(minItemsBeforeRefresh)方法建立。
當優先順序佇列得到元素增刪訊息時,它會暫停處理當前的元素佇列,然後重新整理佇列。minItemsBeforeRefresh指定重新整理前當前活動的佇列的最小數量。 主要設定你的程式可以容忍的不排序的最小值。在重新整理項列表之前要處理的最小項。
放入佇列時需要指定優先順序:

queue.put(aMessage, priority);
例子:
public class DistributedPriorityQueueExample {

    private static final String PATH = "/example/queue";
    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new CuratorListener() {
                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("CuratorEvent: " + event.getType().name());
                }
            });
            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int)(Math.random() * 100);
                queue.put("test-" + i +"-"+ priority, priority);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {
        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>(){
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }
            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }
    private static QueueConsumer<String> createQueueConsumer() {
        return new QueueConsumer<String>(){
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }
            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }

}
DistributedDelayQueue

DistributedDelayQueue是延時佇列。元素有個delay值, 消費者隔一段時間才能收到元素。
涉及到下面四個類。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

通過下面的語句建立:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入元素時可以指定delayUntilEpoch:

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是離現在的一個時間間隔,而是未來的一個時間戳,如 :
System.currentTimeMillis() + 10秒。
如果delayUntilEpoch的時間已經到達,訊息會立刻被消費者接收。

例子
public class DistributedDelayQueueExample {

    private static final String PATH = "/example/queue";
    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new CuratorListener() {
                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("CuratorEvent: " + event.getType().name());
                }
            });
            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {
        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            
            
           

相關推薦

zookeeper開源客戶Curator典型應用場景-訊息佇列()

Curator框架也有分散式佇列實現。 利用ZK的PERSISTENT SEQUENTIAL(持久順序)節點,可以保證放入到佇列中的專案是按照順序排隊的。並且宕機重啟並不丟失訊息, 如果單一的消費者從佇列中取資料, 那麼它是先入先出的,這也是佇列的特點。 如果

zookeeper開源客戶Curator典型應用場景-分散式計數器(四)

之前我們瞭解了基於Corator的分散式鎖之後,我們就很容易基於其實現一個分散式計數器,顧名思義,計數器是用來計數的, 利用ZooKeeper可以實現一個叢集共享的計數器。 只要使用相同的path就可以得到最新的計數器值, 這是由ZooKeeper的一致性保證

zookeeper開源客戶Curator典型應用場景-Master選舉()

在生產環境中,一般要保證服務的高可用,有時候只需要選出一臺機器來執行,其餘機器處於備用狀態,比如,在分散式系統中很常見的一個問題就是定時任務的執行。如果多臺機器同時執行相同的定時任務,業務複雜則可能出現災難性的後果。我使用的是噹噹網的elastic-job分散

zookeeper開源客戶Curator典型應用場景-服務註冊與發現(一)

隨著業務增加,以前簡單的系統已經變得越來越複雜,單純的提升伺服器效能也不是辦法,而且程式碼也是越來越龐大,維護也變得越來越困難,這一切都催生了新的架構設計風格 – 微服務架構的出現。 微服務給我們帶來了很多好處,例如:獨立可擴充套件、易維護。但是隨著應用的分解

zookeeper開源客戶Curator典型應用場景-Barrier屏障(十三)

什麼是Barrier Barrier是這樣的:Barrier是一個同步點,每一個程序到達此點都要等待,直到某一個條件滿足,然後所有的節點繼續進行。 比如:賽跑大家都知道,所有比賽人員都會在起跑線外等待,直到教練員的槍響之後,所有參賽者立刻開始賽跑。 JDK的併

zookeeper開源客戶Curator介紹(六)

上一篇文章介紹了zookeeper原生API的使用,使用過原生API不得不說,有很多的問題,比如:不能遞迴建立和刪除節點、Watcher只能使用一次、還有很多可以解決分散式應用問題的api(比如分散式鎖,leader選舉等),但由於ZooKeeper提供的原始

Zookeeper開源客戶Curator基本功能講解

簡介 Curator是Netflix公司開源的一套Zookeeper客戶端框架。瞭解過Zookeeper原生API都會清楚其複雜度。Curator幫助我們在其基礎上進行封裝、實現一些開發細節,包括接連重連、反覆註冊Watcher和NodeExistsExcept

Zookeeper開源客戶框架Curator簡介

curator簡介 Netflix curator 是Netflix公司開源的一個Zookeeper client library,用於簡化zookeeper客戶端程式設計,包含一下幾個模組: curator-client - zookeeper client封裝,用於

【hadoop zookeeperZookeeper開源客戶框架Curator簡介

Curator是Netflix開源的一套ZooKeeper客戶端框架. Netflix在使用ZooKeeper的過程中發現ZooKeeper自帶的客戶端太底層, 應用方在使用的時候需要自己處理很多事情, 於是在它的基礎上包裝了一下, 提供了一套更好用的客戶端框架. Netf

Zookeeper 開源客戶 ZkClient 版本 api介紹和示例

ZkClient是由Datameer的工程師開發的開源客戶端,對Zookeeper的原生API進行了包裝,實現了超時重連、Watcher反覆註冊等功能。 ZKClient版本及原始碼 maven依賴 ZkClient目前有兩個不同artifactId的系列。  其中最早

ZooKeeper典型應用場景Master選舉。

        Master選舉是一個分散式系統中非常常見的應用場景。分散式最核心的特性就是能夠將具有獨立計算能力的系統單元部署在不同的機器上,構成一個完整的分散式系統。而與此同時,實際場景中往往也需要在這些分佈在不同機器上的獨立系統單元中選出一個所謂的“老大”,在電腦科學

開源客戶Curator 使用(上)

一 介紹 Curator是Netflix公司開源的一款Zookeeper客戶端框架,Curator解決了很多Zookeeper客戶端非常底層的細節開發工作,包括連線重連、反覆註冊Watcher等,實現Fluent風格的API介面,目前已經成為Apache的頂級專案,是全世界

ZooKeeper典型應用場景名稱空間。

        命名服務(Name Service)也是分散式系統中比較常見的一類場景,在《Java網路高階程式設計》一書中提到,命名服務是分散式系統最基本的公共服務之一。在分散式系統中,被命名的實體通常可以是叢集中的機器、提供的服務地址或遠端物件等——這些我們都可以統稱他

Curator典型使用場景事件監聽。

        ZooKeeper原生支援通過註冊Watcher來進行事件監聽,但是其使用並不是特別方便,需要開發人員自己反覆註冊Watcher,比較繁瑣。Curator引入了Cache來實現對ZooKeeper服務端事件的監聽。Cache是Curator中對事件監聽的包裝

Hive典型應用場景行列轉換

在使用Hive處理資料時,經常遇到行列轉換的場景,本文將對Hive的行列轉換操作做詳細的說明。 行轉列 1)多行轉多列 假設資料表 row2col: col1 col2 col3 a c 1 a d

Zookeeper學習六】——開源客戶ZKClient和Curator介紹與應用

前言 在真正的專案中通常使用的是zkclient和curator,而不是原生的zookeeper客戶端,因為zookeeper原生的客戶端存在一定的侷限性,本篇小編主要講解一下這兩種zookeeper客戶端的使用! 內容 1.1zk原生api不足之

7.5 zookeeper客戶curator的基本使用

serve server 超時 one c-c tlist result 強制 car 使用zookeeper原生API實現一些復雜的東西比較麻煩。所以,出現了兩款比較好的開源客戶端,對zookeeper的原生API進行了包裝:zkClient和curator。後者是Net

ZooKeeper典型應用場景

拉取 ons 執行 全局 進行 創建失敗 消息通知 防止 成了 《從Paxos到Zookeeper 分布式一致性原理與實踐》讀書筆記 本文:總結腦圖地址:腦圖 前言 所有的典型應用場景,都是利用了ZK的如下特性: 強一致性:在高並發情況下,能夠保證節點的創建一定是

zookeeper客戶應用

什麼zookeeper?   ZooKeeper是一個分散式的,開放原始碼的分散式應用程式協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要元件。它是一個為分散式應用提供一致性服務的軟體,提供的功能包括:配置維護、域名服務、分散式同步、組服務等。 ZooKeeper

Zookeeper分散式及客戶Curator的API簡單使用

最近公司專案中使用了分散式Zookeeper及Dubbo,為了弄清楚這些框架在專案中的使用,在我業餘時間中學習了一些Zookeeper的簡單用法,分享出來,若有不足之處,望大家給與建議...... 一、什麼是分散式系統? 我的理解:將原有的系統拆分為多個子系統組成一個龐大的系統,這個龐大