1. 程式人生 > >跟著例項學習ZooKeeper的用法: 佇列

跟著例項學習ZooKeeper的用法: 佇列

使用Curator也可以簡化Ephemeral Node (臨時節點)的操作。Curator也提供ZK Recipe的分散式佇列實現。 利用ZK的 PERSISTENTSEQUENTIAL節點, 可以保證放入到佇列中的專案是按照順序排隊的。 如果單一的消費者從佇列中取資料, 那麼它是先入先出的,這也是佇列的特點。 如果你嚴格要求順序,你就的使用單一的消費者,可以使用leader選舉只讓leader作為唯一的消費者。

但是, 根據Netflix的Curator作者所說, ZooKeeper真心不適合做Queue,或者說ZK沒有實現一個好的Queue,詳細內容可以看 Tech Note 4, 原因有五:

  1. ZK有1MB 的傳輸限制。 實踐中ZNode必須相對較小,而佇列包含成千上萬的訊息,非常的大。
  2. 如果有很多節點,ZK啟動時相當的慢。 而使用queue會導致好多ZNode. 你需要顯著增大 initLimit 和 syncLimit.
  3. ZNode很大的時候很難清理。Netflix不得不建立了一個專門的程式做這事。
  4. 當很大量的包含成千上萬的子節點的ZNode時, ZK的效能變得不好
  5. ZK的資料庫完全放在記憶體中。 大量的Queue意味著會佔用很多的記憶體空間。

儘管如此, Curator還是建立了各種Queue的實現。 如果Queue的資料量不太多,資料量不太大的情況下,酌情考慮,還是可以使用的。

DistributedQueue

DistributedQueue是最普通的一種佇列。 它設計以下四個類:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

建立佇列使用QueueBuilder,它也是其它佇列的建立類。

public static <T> QueueBuilder<T> builder(CuratorFramework client,
                                          QueueConsumer<T> consumer,
                                          QueueSerializer<T> serializer,
                                          java.lang.String queuePath)
QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedQueue<MessageType queue = builder.build();    `

建立好queue就可以往裡面放入資料了:

queue.put(aMessage);

QueueSerializer提供了對佇列中的物件的序列化和反序列化。

QueueConsumer是消費者, 它可以接收佇列的資料。 處理佇列中的資料的程式碼邏輯可以放在QueueConsumer.consumeMessage()中。

正常情況下先將訊息從佇列中移除,再交給消費者消費。 但這是兩個步驟,不是原子的。 可以呼叫Builder的lockPath()消費者加鎖, 當消費者消費資料時持有鎖,這樣其它消費者不能消費此訊息。 如果消費失敗或者程序死掉,訊息可以交給其它程序。這會帶來一點效能的損失。 最好還是單消費者模式使用佇列。

例子:

package com.colobu.zkrecipe.queue;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;

public class DistributedQueueExample {
    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new CuratorListener() {
                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("CuratorEvent: " + event.getType().name());
                }
            });

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i);
                Thread.sleep((long)(3 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>(){

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>(){

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);                
            }

        };
    }
}

DistributedIdQueue

DistributedIdQueue和上面的佇列類似, 但是可以為佇列中的每一個元素設定一個ID。 可以通過ID把佇列中任意的元素移除。 它涉及幾個類:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

通過下面方法建立:

builder.buildIdQueue()

放入元素時:

queue.put(aMessage, messageId);

移除元素時:

int numberRemoved = queue.remove(messageId);

在這個例子中, 有些元素還沒有被消費者消費時就移除了,這樣消費者不會收到刪除的訊息。

package com.colobu.zkrecipe.queue;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedIdQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;

public class DistributedIdQueueExample {
    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new CuratorListener() {
                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("CuratorEvent: " + event.getType().name());
                }
            });

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long)(50 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>(){

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>(){

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);                
            }

        };
    }
}

DistributedPriorityQueue

優先順序佇列對佇列中的元素按照優先順序進行排序。 Priority越小, 元素月靠前, 越先被消費掉。 它涉及下面幾個類:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

通過builder.buildPriorityQueue(minItemsBeforeRefresh)方法建立。 當優先順序佇列得到元素增刪訊息時,它會暫停處理當前的元素佇列,然後重新整理佇列。minItemsBeforeRefresh指定重新整理前當前活動的佇列的最小數量。 主要設定你的程式可以容忍的不排序的最小值。

放入佇列時需要指定優先順序:

queue.put(aMessage, priority);

例子:

package com.colobu.zkrecipe.queue;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedPriorityQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;

public class DistributedPriorityQueueExample {
    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new CuratorListener() {
                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("CuratorEvent: " + event.getType().name());
                }
            });

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int)(Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long)(50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>(){

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>(){

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);                
            }

        };
    }
}

DistributedDelayQueue

JDK中也有DelayQueue,不知道你是否熟悉。 DistributedDelayQueue也提供了類似的功能, 元素有個delay值, 消費者隔一段時間才能收到元素。 涉及到下面四個類。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

通過下面的語句建立:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入元素時可以指定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是離現在的一個時間間隔, 比如20毫秒,而是未來的一個時間戳,如 System.currentTimeMillis() + 10秒。 如果delayUntilEpoch的時間已經過去,訊息會立刻被消費者接收。

例子:

package com.colobu.zkrecipe.queue;

import java.util.Date;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedDelayQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;

public class DistributedDelayQueueExample {
    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new CuratorListener() {
                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("CuratorEvent: " + event.getType().name());
                }
            });

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

SimpleDistributedQueue

前面雖然實現了各種佇列,但是你注意到沒有,這些佇列並沒有實現類似JDK一樣的介面。 SimpleDistributedQueue提供了和JDK一致性的介面(但是沒有實現Queue介面)。 建立很簡單:

public SimpleDistributedQueue(CuratorFramework client,
                              String path)

增加元素:

public boolean offer(byte[] data) throws Exception

刪除元素:

public byte[] take() throws Exception

另外還提供了其它方法:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

沒有add方法, 多了take方法。

take方法在成功返回之前會被阻塞。 而poll在佇列為空時直接返回null。w String(client.getData().forPath(PATH2)));

    } catch (Exception ex) {
        ex.printStackTrace();
    } finally {
        CloseableUtils.closeQuietly(node);
        CloseableUtils.closeQuietly(client);
        CloseableUtils.closeQuietly(server);
    }

}

} “`