優雅實現延時任務之zookeeper篇
前言
在《 ofollow,noindex" target="_blank">優雅實現延時任務之Redis篇 》一文中提到,實現延時任務的關鍵點,是要儲存任務的描述和任務的執行時間,還要能根據任務執行時間進行排序,那麼我們可不可以使用zookeeper來實現延時任務呢?答案當然是肯定的。要知道,zookeeper的znode同樣可以用來儲存資料,那麼我們就可以利用這一點來實現延時任務。實際上,著名的zookeeper客戶端curator就提供了基於zookeeper的延時任務API,今天就從原始碼的角度帶大家瞭解下curator是如何使用zookeeper實現延時任務的。不過需要提前說明的是,使用zookeeper實現延時任務不是一個很好的選擇,至少稱不上優雅,標題中的優雅實現延時任務只是為了和前文呼應,關於使用zookeeper實現延時任務的弊端,後文我會詳細解釋。
上手curator
關於zookeeper的安裝和使用這裡就不介紹了,之前也推送過相關文章了,如果對zookeeper不瞭解的,可以翻下歷史記錄看下。接下來直接進入主題,首先來體驗一把curator的延時任務API。
首先是任務消費者:
public class DelayTaskConsumer implements QueueConsumer<String>{
@Override
System.out.println(MessageFormat.format("釋出資訊。id - {0} , timeStamp - {1} , " +
public void consumeMessage(String message) throws Exception {
"threadName - {2}",message,System.currentTimeMillis(),Thread.currentThread().getName()));
} @Override public void stateChanged(CuratorFramework client, ConnectionState newState) {
}
System.out.println(MessageFormat.format("State change . New State is - {0}",newState));
}
curator的消費者需要實現QueueConsumer介面,在這裡我們做的邏輯就是拿到任務描述(這裡簡單起見,任務描述就是資訊id),然後釋出相應的資訊。
接下來看下任務生產者:
public class DelayTaskProducer {
private static final String CONNECT_ADDRESS="study-machine:32783";
private static final int SESSION_OUTTIME = 5000;private static final String QUEUE_PATH = "/queue";
private static final String NAMESPACE = "delayTask"; private static final String LOCK_PATH = "/lock";RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
private CuratorFramework curatorFramework; private DistributedDelayQueue<String> delayQueue; {.sessionTimeoutMs(SESSION_OUTTIME).retryPolicy(retryPolicy)
curatorFramework= CuratorFrameworkFactory.builder().connectString(CONNECT_ADDRESS) .namespace(NAMESPACE).build(); curatorFramework.start();}catch (Exception e){
delayQueue= QueueBuilder.builder(curatorFramework, new DelayTaskConsumer(), new DelayTaskSerializer(), QUEUE_PATH).lockPath(LOCK_PATH).buildDelayQueue(); try { delayQueue.start(); e.printStackTrace(); } }}
public void produce(String id,long timeStamp){ try { delayQueue.put(id,timeStamp); }catch (Exception e){ e.printStackTrace(); } }任務生產者主要有2個邏輯,一個是在構造程式碼塊中初始化curator的延時任務佇列,另一個是提供一個produce方法供外部往佇列裡放延時任務。
在初始化延時任務時,需要傳入一個位元組陣列與任務描述實體之間的序列化器,這裡簡單地將任務描述處理成字串:
public class DelayTaskSerializer implements QueueSerializer<String> {
@Override public byte[] serialize(String item) {
public String deserialize(byte[] bytes) {
return item.getBytes(); } @Override return new String(bytes); }
}
最後寫一個客戶端測一下:
public class DelayTaskTest {
public static void main(String[] args) throws Exception{
DelayTaskProducer producer=new DelayTaskProducer();
System.out.println(MessageFormat.format("start time - {0}",now));
long now=new Date().getTime();
producer.produce("2",now+TimeUnit.SECONDS.toMillis(10));
producer.produce("1",now+TimeUnit.SECONDS.toMillis(5));
producer.produce("4",now+TimeUnit.SECONDS.toMillis(20));
producer.produce("3",now+TimeUnit.SECONDS.toMillis(15));
}
producer.produce("5",now+TimeUnit.SECONDS.toMillis(2000)); TimeUnit.HOURS.sleep(1);
}
客戶端比較簡單,就是往延時佇列裡放5個任務,其中最後一個任務的執行時間比較晚,主要是為了觀察curator到底往zookeeper裡放了些啥。執行程式,結果如下:

可以看到,前4個任務都在特定的時間點執行了,最後一個任務執行時間比較晚,這裡就不驗證了。
接下來我們看下zookeeper裡到底存了哪些資訊:
[zk: localhost(CONNECTED) 2] ls /
[delayTask, zookeeper]
其中,zookeeper節點是zookeeper自帶的,除了zookeeper之後,還有一個delayTask節點,這個節點就是我們在生產者裡設定的名稱空間NAMESPACE。因為同一個zookeeper叢集可能會被用於不同的延時佇列,NAMESPACE的作用就是用來區分不同延時佇列的。再看看NAMESPACE裡是啥:
[zk: localhost(CONNECTED) 3] ls /delayTask
[lock, queue]
可以看到,有2個子節點:lock跟queue,分別是我們在生產者中設定的分散式鎖路徑LOCK_PATH和佇列路徑QUEUE_PATH。因為同一個延時佇列可能會被不同執行緒監聽,所以為了保證任務只被一個執行緒執行,zookeeper在任務到期的時候需要申請到分散式鎖後才能執行任務。接下來我們重點看下queue節點下有什麼:
[zk: localhost(CONNECTED) 7] ls /delayTask/queue
[queue-|165B92FCD69|0000000014]
發現裡面只有一個子節點,我們猜想應該就是我們剛剛放到延時佇列裡面的還未執行的任務,我們接著看看這個子節點下面還有沒有子節點:
[zk: localhost(CONNECTED) 8] ls /delayTask/queue/queue-|165B92FCD69|0000000014
[]
發現沒有了。
那我們就看看queue-|165B92FCD69|0000000014這個節點裡面放了什麼資料:
[zk: localhost(CONNECTED) 9] get /delayTask/queue/queue-|165B92FCD69|0000000014
5 cZxid = 0x3d ctime = Sat Sep 08 12:20:41 GMT 2018 mZxid = 0x3d
aclVersion = 0
mtime = Sat Sep 08 12:20:41 GMT 2018 pZxid = 0x3d cversion = 0 dataVersion = 0 ephemeralOwner = 0x0
numChildren = 0
dataLength = 11
可以發現放的是任務描述,也就是資訊id——5。到這裡我們就會知道了,zookeeper把任務描述放到了相應任務節點下了,那麼任務執行時間放到哪裡了呢?由於queue-|165B92FCD69|0000000014並沒有子節點,所以我們可以猜想任務執行時間放在了節點名稱上了。觀察節點名稱,queue只是一個字首,沒什麼資訊量。0000000014應該是節點序號(這裡也可以猜測zookeeper用來存放任務的節點是順序節點)。那麼就只剩下165B92FCD69了,這個看上去並不像時間戳或者日期,但是裡面有字母,可以猜測會不會是時間戳的十六進位制表示。我們將其轉化為十進位制看下:
@Test
public void test(){
long number = Long.parseLong("165B92FCD69", 16);
System.out.println(number);
}
System.out.println(new Date(number));
執行結果如下:
可以看到確實可以轉化為十進位制,然後將十進位制數轉化成日期,確實也是我們在一開始設定的任務執行時間。這樣一來就大概清楚了curator是怎麼利用zookeeper來儲存延時任務的了:將任務執行時間儲存在節點名稱中,將任務描述儲存在節點相應的資料中。
那麼到底是不是這樣的呢?接下來我們看下curator的原始碼就知道了。
curator原始碼解析
1.DistributedDelayQueue類
curator延時任務的入口就是DistributedDelayQueue類的start方法了。我們先不說start方法,先來看看DistributedDelayQueue類有哪些屬性:
private final DistributedQueue<T> queue;
DistributedDelayQueue (
QueueConsumer<T> consumer,
CuratorFramework client,
ThreadFactory threadFactory,
QueueSerializer<T> serializer, String queuePath,
String lockPath,
Executor executor, int minItemsBeforeRefresh, int maxItems,
Preconditions.checkArgument(minItemsBeforeRefresh >= 0, "minItemsBeforeRefresh cannot be negative");
boolean putInBackground, int finalFlushMs ) { queue = new DistributedQueue<T> (
minItemsBeforeRefresh,
client, consumer, serializer, queuePath, threadFactory, executor, true, lockPath,
protected long getDelay(String itemNode)
maxItems, putInBackground, finalFlushMs ) { @Override {
private long getDelay(String itemNode, long sortTime)
return getDelay(itemNode, System.currentTimeMillis()); } { long epoch = getEpoch(itemNode);
final long sortTime = System.currentTimeMillis();
return epoch - sortTime; } @Override protected void sortChildren(List<String> children) { Collections.sort (
long diff = getDelay(o1, sortTime) - getDelay(o2, sortTime);
children, new Comparator<String>() { @Override public int compare(String o1, String o2) {
}
return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0); } } ); }
};
這裡擷取一部分程式碼出來。實際上DistributedDelayQueue裡只有一個queue屬性,queue屬性是DistributedQueue類的例項,從名字可以看到其是一個分散式佇列。不過DistributedDelayQueue裡的queue比較特殊, 其是DistributedQueue類的匿名內部類的例項,這個匿名子類重寫了DistributedQueue的部分方法,如:getDelay、sortChildren等。 這一點很重要,後面的程式碼會用到這2個方法。
2.DistributedDelayQueue的入口start方法
接下來我們就來看下DistributedDelayQueue的入口start方法:
/**
* Start the queue. No other methods work until this is called
* * @throws Exception startup errors */ @Override
}
public void start() throws Exception {
queue.start();
可以看到,其呼叫的是queue的start方法。我們跟進去看看:
@Override
public void start() throws Exception
{
if ( !state.compareAndSet(State.LATENT, State.STARTED) )
{ throw new IllegalStateException(); } try {
client.create().creatingParentContainersIfNeeded().forPath(queuePath);
} catch ( KeeperException.NodeExistsException ignore ) { // this is OK } if ( lockPath != null ) { try {
catch ( KeeperException.NodeExistsException ignore )
client.create().creatingParentContainersIfNeeded().forPath(lockPath); } { // this is OK } } if ( !isProducerOnly || (maxItems != QueueBuilder.NOT_SET) ) {
}
childrenCache.start(); } if ( !isProducerOnly ) { service.submit ( new Callable<Object>() { @Override public Object call() { runLoop(); return null; } } );
}
這個方法首先是檢查狀態,然後建立一些必須的節點,如前面的queue節點和lock節點就是在這裡建立的。
由於我們建立queue的時候有傳入了消費者,所以這裡isProducerOnly為true,故以下2個分支的程式碼都會執行:
if ( !isProducerOnly || (maxItems != QueueBuilder.NOT_SET) )
{ childrenCache.start(); } if ( !isProducerOnly ) {
public Object call()
service.submit ( new Callable<Object>() { @Override { runLoop();
}
return null; } }
);
2.1.childrenCache.start()
先來看看第一個分支:
childrenCache.start();
從名字上看,這個childrenCache應該是子節點的快取,我們進到start方法裡看看:
void start() throws Exception
{ sync(true);
}
調的是sync方法,我們跟進去看看:
private synchronized void sync(boolean watched) throws Exception
{ if ( watched ) {
client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
} else { client.getChildren().inBackground(callback).forPath(path); }
}
這裡watched為true,所以會走第一個分支。第一個分支程式碼的作用是在後臺去拿path路徑下的子節點,這裡的path就是我們配置的queue_path。拿到子節點後,會呼叫callback裡的回撥方法。我們看下這裡的callback做了什麼:
private final BackgroundCallback callback = new BackgroundCallback()
{ @Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{ if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) {
};
setNewChildren(event.getChildren()); }
}
可以看到,當有子節點時,會去呼叫setNewChildren方法。我們繼續跟進去:
private synchronized void setNewChildren(List<String> newChildren)
{ if ( newChildren != null ) {
children.set(new Data(newChildren, currentData.version + 1));
Data currentData = children.get(); notifyFromCallback(); }
}
這裡就是把子節點放到快取裡,並呼叫notifyFromCallback方法:
private synchronized void notifyFromCallback()
{ notifyAll();
}
這裡就是喚醒所有等待執行緒。既然有喚醒,那麼就一定有等待。繼續看ChildrenCache類的其他方法,發現在blockingNextGetData方法中,呼叫了wait方法:
synchronized Data blockingNextGetData(long startVersion, long maxWait, TimeUnit unit) throws InterruptedException
{ long startMs = System.currentTimeMillis(); boolean hasMaxWait = (unit != null);
if ( hasMaxWait )
long maxWaitMs = hasMaxWait ? unit.toMillis(maxWait) : -1; while ( startVersion == children.get().version ) { {
wait(thisWaitMs);
long elapsedMs = System.currentTimeMillis() - startMs; long thisWaitMs = maxWaitMs - elapsedMs; if ( thisWaitMs <= 0 ) { break; } } else { wait(); } }
}
return children.get();
當blockingNextGetData方法被呼叫時,會先睡眠,當有子節點到來時,等待執行緒才會被喚醒,進而返回當前的子節點。這個blockingNextGetData方法後面還會看到。
2.2.runLoop方法
接下來我們看下start方法的最後一段程式碼:
service.submit
(
new Callable<Object>()
{ @Override
runLoop();
public Object call() {
);
return null; }
}
這段程式碼主要是向執行緒池提交了一個Callable,主要邏輯是runLoop方法。我們進到runLoop方法裡看看:
private void runLoop()
{
long currentVersion = -1;
long maxWaitMs = -1; try {
while ( state.get() == State.STARTED )
{ try {
ChildrenCache.Data data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);
currentVersion = data.version; List<String> children = Lists.newArrayList(data.children); sortChildren(children); // makes sure items are processed in the correct order
catch ( InterruptedException e )
if ( children.size() > 0 ) { maxWaitMs = getDelay(children.get(0)); if ( maxWaitMs > 0 ) { continue; } } else { continue; } processChildren(children, currentVersion); } {
}
// swallow the interrupt as it's only possible from either a background // operation and, thus, doesn't apply to this loop or the instance // is being closed in which case the while test will get it } } catch ( Exception e ) {
}
log.error("Exception caught in background handler", e);
}
可以看到,runLoop方法就是一個死迴圈,只要與伺服器的狀態一直是STARTED,這個迴圈就不會退出。
首先看這句程式碼:
ChildrenCache.Data data = (maxWaitMs > 0) ?
childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) :
childrenCache.blockingNextGetData(currentVersion);
這行程式碼比較長,我把他拆成多行了。這句程式碼主要是去獲取子節點,前面說了,當呼叫blockingNextGetData方法時,會先等待,直到有新的子節點時,才會呼叫notifyAll喚醒等待執行緒。
拿到子節點後就對子節點列表進行排序:
sortChildren(children); // makes sure items are processed in the correct order
sortChildren方法是DistributedQueue類的方法,在一開始分析DistributedDelayQueue類的時候說到,DistributedDelayQueue類中的queue是一個匿名內部類例項,其重寫了getDelay和sortChildren等方法,因此我們要看經過重寫的getDelay和sortChildren是怎樣的,由於sortChildren方法依賴getDelay方法,因此我們先看看getDelay方法:
@Override
protected long getDelay(String itemNode)
{
return getDelay(itemNode, System.currentTimeMillis());
}
其會去呼叫getDelay私有方法,同時傳入當前時間戳:
private long getDelay(String itemNode, long sortTime)
{ long epoch = getEpoch(itemNode);
}
return epoch - sortTime;
getDelay私有方法又會去呼叫getEpoch方法:
private static long getEpoch(String itemNode)
{
int index1 = (index2 > 0) ? itemNode.lastIndexOf(SEPARATOR, index2 - 1) : -1;
int index2 = itemNode.lastIndexOf(SEPARATOR);
{
if ( (index1 > 0) && (index2 > (index1 + 1)) ) try {
return Long.parseLong(epochStr, 16);
String epochStr = itemNode.substring(index1 + 1, index2); } catch ( NumberFormatException ignore ) {
}
// ignore } }
return 0;
getEpoch方法其實就是去解析子節點名稱的,前面帶大家看了zookeeper佇列路徑下的子節點名稱,是這種形式的:queue-|165B92FCD69|0000000014。這個方法的作用就是將其中的任務執行的時間戳給解析出來,也就是中間的那段字串。拿到字串後再將十六進位制轉化為十進位制:
Long.parseLong(epochStr, 16);
這樣驗證了我們之前的猜想:curator會把任務執行時間編碼成十六進位制放到節點名稱裡。至於為什麼要編碼成十六進位制,個人認為應該是為了節省字串長度。
我們再回到私有方法getDelay:
private long getDelay(String itemNode, long sortTime)
{ long epoch = getEpoch(itemNode);
}
return epoch - sortTime;
拿到延時任務執行時間戳後,再跟當前時間戳相減,得出任務執行時間戳跟當前時間戳的差值,這個差值決定了這個任務要不要立即執行,如果說這個差值小於或等於0,說明任務已經到了執行時間,那麼就會執行相應的任務。當然這個差值還有一個用途,就是用於排序,具體在sortChildren方法裡面:
@Override
protected void sortChildren(List<String> children)
{
Collections.sort
final long sortTime = System.currentTimeMillis(); (
public int compare(String o1, String o2)
children, new Comparator<String>() { @Override {
return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);
long diff = getDelay(o1, sortTime) - getDelay(o2, sortTime); } } );
}
這個sortChildren方法是經過重寫了的匿名內部類的方法,其根據任務執行時間與當前時間戳的差值進行排序,越早執行的任務排在前面,這樣就可以保證延時任務是按執行時間從早到晚排序的了。
分析完了getDelay和sortChildren,我們再回到runLoop方法:
ChildrenCache.Data data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);
currentVersion = data.version; List<String> children = Lists.newArrayList(data.children); sortChildren(children); // makes sure items are processed in the correct orderprocessChildren(children, currentVersion);
if ( children.size() > 0 ) { maxWaitMs = getDelay(children.get(0)); if ( maxWaitMs > 0 ) { continue; } } else { continue; }在對子節點按執行時間進行升序排序後,會先拿到排在最前面的子節點,判斷該子節點的執行時間與當前時間戳的差值是否小於0,如果小於0,則說明到了執行時間,那麼就會呼叫下面這行程式碼:
processChildren(children, currentVersion);
我們跟進去看看:
private void processChildren(List<String> children, long currentVersion) throws Exception
{ final Semaphore processedLatch = new Semaphore(0);
for ( final String itemNode : children )
final boolean isUsingLockSafety = (lockPath != null); int min = minItemsBeforeRefresh; {
if ( !itemNode.startsWith(QUEUE_ITEM_NAME) )
if ( Thread.currentThread().isInterrupted() ) { processedLatch.release(children.size()); break; } {
if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) )
log.warn("Foreign node in queue path: " + itemNode); processedLatch.release(); continue; } if ( min-- <= 0 ) { {
new Runnable()
processedLatch.release(children.size()); break; } } if ( getDelay(itemNode) > 0 ) { processedLatch.release(); continue; } executor.execute ( {
processNormally(itemNode, ProcessType.NORMAL);
@Override public void run() { try { if ( isUsingLockSafety ) { processWithLockSafety(itemNode, ProcessType.NORMAL); } else { } } catch ( Exception e ) {
processedLatch.acquire(children.size());
ThreadUtils.checkInterrupted(e); log.error("Error processing message at " + itemNode, e); } finally { processedLatch.release(); } } } ); }
}
這裡用訊號量Semaphore保證了只有當所有子節點都被遍歷並處理了或者執行緒被中斷了,這個方法才會返回。如果這段程式是單執行緒執行的,那麼不需要使用訊號量也能做到這一點。但是大家看程式碼就知道,這個方法在執行到期的延時任務的時候是放到執行緒池裡面執行的,所以才需要使用訊號量來保證當所有任務被遍歷並處理了,這個方法才返回。
我們重點關注延時任務的執行部分:
executor.execute
(
{
new Runnable()
public void run()
@Override { try {
{
if ( isUsingLockSafety )
processWithLockSafety(itemNode, ProcessType.NORMAL);
} else { processNormally(itemNode, ProcessType.NORMAL); } }
log.error("Error processing message at " + itemNode, e);
catch ( Exception e ) { ThreadUtils.checkInterrupted(e); } finally {
);
processedLatch.release(); } }
}
由於我們在初始化延時佇列的時候傳入了lockPath ,所以實際上會走到下面這個分支:
processWithLockSafety(itemNode, ProcessType.NORMAL);
從方法名可以看到,這個方式是使用鎖的方式來處理延時任務。這裡順便提一句,好的程式碼是自解釋的,我們僅僅看方法名就可以大概知道這個方法是做什麼的,這一點大家平時在寫程式碼的時候要時刻牢記,因為我在公司的老系統上已經看到不少method1、method2之類的方法命名了。這裡略去1萬字……
我們進到processWithLockSafety方法裡面去:
@VisibleForTesting
protected boolean processWithLockSafety(String itemNode, ProcessType type) throws Exception
{ String lockNodePath = ZKPaths.makePath(lockPath, itemNode);
client.create().withMode(CreateMode.EPHEMERAL).forPath(lockNodePath);
boolean lockCreated = false; try { lockCreated = true;
byte[] bytes = null;
String itemPath = ZKPaths.makePath(queuePath, itemNode); boolean requeue = false;
requeue = (processMessageBytes(itemNode, bytes) == ProcessMessageBytesCode.REQUEUE);
if ( type == ProcessType.NORMAL ) { bytes = client.getData().forPath(itemPath); } if ( requeue ) {
.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(makeRequeueItemPath(itemPath), bytes)
client.inTransaction() .delete().forPath(itemPath) .and() .and() .commit(); } else { client.delete().forPath(itemPath);
catch ( KeeperException.NoNodeException ignore )
} return true; } catch ( KeeperException.NodeExistsException ignore ) { // another process got it } {
if ( lockCreated )
// another process got it } catch ( KeeperException.BadVersionException ignore ) { // another process got it } finally { {
}
client.delete().guaranteed().forPath(lockNodePath); } }
return false;
這個方法首先會申請分散式鎖:
client.create().withMode(CreateMode.EPHEMERAL).forPath(lockNodePath);
這裡申請鎖是通過建立臨時節點的方式實現的,一個任務只對應一個節點,所以只有一個zk客戶端能夠建立成功,也就是說只有一個客戶端可以拿到鎖。
拿到鎖後就是處理任務了,最後在finally塊中釋放分散式鎖。
我們重點看下處理任務那一塊:
requeue = (processMessageBytes(itemNode, bytes) == ProcessMessageBytesCode.REQUEUE);
我們進到processMessageBytes裡面去:
private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception
{ ProcessMessageBytesCode resultCode = ProcessMessageBytesCode.NORMAL; MultiItem<T> items; try
ThreadUtils.checkInterrupted(e);
{ items = ItemSerializer.deserialize(bytes, serializer); } catch ( Throwable e ) {
T item = items.nextItem();
log.error("Corrupted queue item: " + itemNode, e); return resultCode; } for(;;) { if ( item == null ) { break; } try {
log.error("Exception processing queue item: " + itemNode, e);
consumer.consumeMessage(item); } catch ( Throwable e ) { ThreadUtils.checkInterrupted(e); if ( errorMode.get() == ErrorMode.REQUEUE ) {
}
resultCode = ProcessMessageBytesCode.REQUEUE; break; } } }
return resultCode;
千呼萬喚始出來,總算看到任務消費的程式碼了:
consumer.consumeMessage(item);
這裡的consumer就是我們初始化延時任務佇列時傳入的任務消費者了。到這裡curator延時任務的處理邏輯就全部講完了。其他細節大家可以自己去看下原始碼,這裡就不細講了。
總結
這裡簡單回顧下curator實現延時任務的邏輯:首先在生產任務的時候,將所有任務都放到同一個節點下面,其中任務執行時間放到子節點的名稱中,任務描述放到子節點的data中。後臺會有一個執行緒去掃相應佇列節點下的所有子節點,客戶端拿到這些子節點後會將執行時間和任務描述解析出來,再按任務執行時間從早到晚排序,再依次處理到期的任務,處理完再刪除相應的子節點。這就是curator處理延時任務的大致流程了。
前面說了,curator實現延時任務不是很優雅,具體不優雅在哪裡呢?首先,curator對任務執行時間的排序不是在zookeeper服務端完成的,而是在客戶端進行,假如說有人一次性往zookeeper裡放了100萬個延時任務,那麼curator也會全部拿到客戶端進行排序,這在任務數多的時候肯定是有問題的。再者,zookeeper的主要用途不是用於儲存的,他不像SQL/">MySQL或者Redis一樣,被設計成儲存系統,zookeeper更多地是作為分散式協調系統,儲存不是他的強項,所以如果你要儲存的延時任務很多,用zookeeper來做也是不合適的。
之所以花了這麼大的篇幅來介紹curator如何利用zookeeper來實現延時任務,是為了告訴大家,不是隻要有輪子就可以直接拿來用的,如果不關心輪子是怎麼實現的,那有一天出了問題就無從下手了。
原文釋出時間為:2018-09-10
本文作者:不才黃某