1. 程式人生 > >新一代分布式任務調度框架:當當elastic-job開源項目的10

新一代分布式任務調度框架:當當elastic-job開源項目的10

判斷 oba 早期 cep gin bean 社區 之前 都是

原文地址:https://www.cnblogs.com/qiumingcheng/p/5573845.html

作者簡介: 張亮,當當網架構師、當當技術委員會成員、消息中間件組負責人。對架構設計、分布式、優雅代碼等領域興趣濃厚。目前主導當當應用框架ddframe研發,並負責推廣及撰寫技術白皮書。 一、為什麽需要作業(定時任務)? 作業即定時任務。一般來說,系統可使用消息傳遞代替部分使用作業的場景。兩者確有相似之處。可互相替換的場景,如隊列表。將待處理的數據放入隊列表,然後使用頻率極短的定時任務拉取隊列表的數據並處理。這種情況使用消息中間件的推送模式可更好的處理實時性數據。而且基於數據庫的消息存儲吞吐量遠遠小於基於文件的順序追加消息存儲。

技術分享圖片

但在某些場景下則不能互換: 時間驅動 OR 事件驅動:內部系統一般可以通過事件來驅動,但涉及到外部系統,則只能使用時間驅動。如:抓取外部系統價格。每小時抓取,由於是外部系統,不能像內部系統一樣發送事件觸發事件。 批量處理 OR 逐條處理:批量處理堆積的數據更加高效,在不需要實時性的情況下比消息中間件更有優勢。而且有的業務邏輯只能批量處理,如:電商公司與快遞公司結算,一個月結算一次,並且根據送貨的數量有提成。比如,當月送貨超過1000則額外給快遞公司多1%優惠。 非實時性 OR 實時性:雖然消息中間件可以做到實時處理數據,但有的情況並不需要如的實時。如:VIP用戶降級,如果超過1年無購買行為,則自動降級。這類需求沒有強烈的時間要求,不需要按照時間精確的降級VIP用戶。 系統內部 OR 系統解耦。作業一般封裝在系統內部,而消息中間件可用於系統間解耦。 二、當當之前在使用什麽作業系統? 當當之前使用的作業系統比較散亂,各自為戰,大致分為以下4種: Quartz:Java事實上的定時任務標準。但Quartz關註點在於定時任務而非數據,並無一套根據數據處理而定制化的流程。雖然Quartz可以基於數據庫實現作業的高可用,但缺少分布式並行執行作業的功能。 TBSchedule:阿裏早期開源的分布式任務調度系統。代碼略陳舊,使用timer而非線程池執行任務調度。眾所周知,timer在處理異常狀況時是有缺陷的。而且TBSchedule作業類型較為單一,只能是獲取/處理數據一種模式。還有就是文檔缺失比較嚴重。 Crontab:Linux系統級的定時任務執行器。缺乏分布式和集中管理功能。 Perl:遺留系統使用,目前已不符合公司的Java化戰略。 三、elastic-job的來歷 elastic-job原本是當當Java應用框架ddframe的一部分,本名dd-job。 ddframe包括編碼規範,開發框架,技術規範,監控以及分布式組件。ddframe規劃分為4個演進階段,目前處於第2階段。3、4階段涉及的技術組件不代表當當沒有使用,只是ddframe還未統一規劃。

技術分享圖片

ddframe由各種模塊組成,均已dd-開頭,如dd-container,dd-soa,dd-rdb,dd-job等。當當希望將ddframe的各個模塊與公司環境解耦並開源以反饋社區。之前開源的Dubbo擴展版本DubboX即是dd-soa的核心模塊。而本次介紹的elastic-job則是dd-job的開源部分,其中監控(但開源了監控方法)和ddframe核心接入等部分並未開源。 四、elastic-job包含的功能 分布式:最重要的功能,如果任務不能在分布式的環境下執行,那麽直接使用Quartz就可以了。 任務分片:是elastic-job中最重要也是最難理解的概念。任務的分布式執行,需要將一個任務拆分為n個獨立的任務項,然後由分布式的服務器分別執行某一個或幾個分片項。 彈性擴容縮容:將任務拆分為n個任務項後,各個服務器分別執行各自分配到的任務項。一旦有新的服務器加入集群,或現有服務器下線,elastic-job將在保留本次任務執行不變的情況下,下次任務開始前觸發任務重分片。舉例說明:有3臺服務器,分為10個片。則分片項分配如下:{server1: [0,1,2], server2: [3,4,5], server3: [6,7,8,9]}。如果一臺服務器崩潰,則分片項分配如下:{server1: [0,1,2,3,4], server2: [5,6,7,8,9]}。如果新增一臺服務器,則分片項分配如下:{server1: [0,1], server2: [2,3] , server3: [4,5,6] , server4: [7,8,9]}。 穩定性:在服務器無波動的情況下,並不會重新分片;即使服務器有波動,下次分片的結果也會根據服務器IP和作業名稱哈希值算出穩定的分片順序,盡量不做大的變動。 高性能:elastic-job會將作業運行狀態的必要信息更新到註冊中心,但為了考慮性能問題,可以犧牲一些功能,而換取性能的提升。 冪等性:elastic-job可犧牲部分性能用以保證同一分片項不會同時在兩個服務器上運行。 失效轉移:彈性擴容縮容在下次作業運行前重分片,但本次作業執行的過程中,下線的服務器所分配的作業將不會重新被分配。失效轉移功能可以在本次作業運行中用空閑服務器抓取孤兒作業分片執行。同樣失效轉移功能也會犧牲部分性能。 狀態監控:監控作業的運行狀態,可以監控數據處理功能和失敗次數,作業運行時間等。是冪等性,失效轉移必須的功能。 多作業模式:作業可分為簡單和數據流處理兩種模式,數據流又分為高吞吐處理模式和順序性處理模式,其中高吞吐處理模式可以開啟足夠多的線程快速的處理數據,而順序性處理模式將每個分片項分配到一個獨立線程,用於保證同一分片的順序性,這點類似於kafka的分區順序性。 其他一些功能,如錯過任務重執行,單機並行處理,容錯處理,Spring命名空間支持,運維平臺等。 五、elastic-job的部署和使用 將使用elastic-job框架的jar/war連接同一個基於Zookeeper的註冊中心即可。

技術分享圖片

作業框架執行數據並不限於數據庫,且作業框架本身是不對數據進行關聯的。作業可以用於處理數據,文件,API等任何操作。 使用elastic-job所需要關註的僅僅是將業務處理邏輯和框架所分配的分片項匹配並處理,如:如果分片項是1,則獲取id以1結尾的數據處理。所以如果是處理數據的話,最佳實踐是將作業分片項規則和數據中間層規則對應。 通過上面的部署圖可以看出來,作業分片只是個邏輯概念,分片和實際數據其實框架是不做任何匹配關系的。而根據分片項和實際業務如何關聯,是成功使用elastic-job的關鍵所在。為了不讓代碼寫起來很無聊,看起來像if(shardingItem == 1) {do xxx} else if (shardingItem == 2) {do xxx},elastic-job提供了自定義參數,可將分片項序號和實際業務做映射。比如設置為1=北京,2=上海。那麽代碼中可以通過北京或是上海的枚舉,從業務中的北京倉庫或上海倉庫取數據。elastic-job更多的還是關註作業調度和分布式分配,處理數據還是交由數據中間層更好些。 誠如剛才所說,最佳實踐是將作業分片項規則和數據中間層規則對應,省去作業分片時,再次適配數據中間層的分片邏輯。 六、對開源產品的開發理念 為了讓感興趣的人放心使用,我想分享一下我們對開源產品的開發理念。 用心寫代碼。代碼是項目的唯一核心和產出,任何一行的代碼都需要用心思考優雅性,可讀性,合理性。優雅性看似簡單的幾個字,其實實現的難度非常大。每個人心中都有自己對代碼的理解,而elastic-job也好,ddframe也好,都不是出自一人之手。對代碼優雅性的權衡,是比較難把控的。後面幾項,可以理解為對第一項的補充,或具體的實現思路。 代碼整潔幹凈到極致。簡單點說就是重度代碼潔癖患者。只有代碼漂亮整潔,其他開源愛好者才願意閱讀代碼,進而找出項目中的bug和貢獻高質量代碼。 極簡代碼, 高度復用,無重復代碼和配置。Java生態圈的特點是高質量的開源產品極多。我們盡量考慮復用輪子,比如項目中大量用到lombok簡化代碼;但也不會無原則的使用開源產品,我們傾向於把開源產品分為積木類和大廈類。項目中一般只考慮使用積木類搭建屬於我們自己的大廈,而不會直接用其他已成型的大廈。java系的公司有兩種不同的聲音,擁抱開源,或完全不使用開源。我們的看法是既然選擇使用java,就應該遵循java的理念,去擁抱java這些年累積的成熟東西。java相比其他新興語言,在語法上可能沒什麽優勢,但在廣度上還是少有其他生態圈可比擬。 單一需求可不考慮擴展性;兩個類似需求時再提煉。為了不盲目追求所謂的極致,我們用這條規則,盡量提升交付的速度。 模塊抽象劃分合理。這點也很難用標準衡量。以elastic-job舉例:elastic-job核心代碼分為4塊,core,spring,console和example;分別用於放置核心,spring支持,控制臺和代碼示例。在項目級別上做拆分。而core中將包分為api,exception,plugin和internal。用於放置對外發布的接口、異常,向最終用戶提供的可擴展插件以及封裝好的內部實現。內部實現的任何改動,都不會影響對外接口的變動,用戶自定義的插件,也不會影響內部代碼的穩定性。 如無特殊理由, 測試需全覆蓋。elastic-job核心模塊的測試覆蓋率是95%以上。雖然單元測試覆蓋率在分布式的復雜環境中並無太大說服力,但至少證明項目中很少出現低級邏輯錯誤。 對質量的定義。代碼可讀性 > 代碼可測性 > 模塊解耦設計 > 功能正確性 > 性能 > 功能可擴展性。只有代碼可讀,可測試,可100%掌控,項目才可持續發展。功能有缺陷可以修復,性能不夠可以優化,而代碼不清晰則項目會漸漸變為黑盒。所以對於框架類產品,我們認為質量 > 時間 > 成本。 文檔清晰。 七、未來展望 監控體系有待提高,目前只能通過註冊中心做簡單的存活和數據積壓監控。未來需要做的監控部分有: 1. 增加可監控維度,如作業運行時間等。 2. 基於JMX的內部狀態監控。 3. 基於歷史的全量數據監控,將所有監控數據通過flume等形式發到外部監控中心,提供實時分析功能。 增加任務工作流,如任務依賴,初始化任務,清理任務等。 失效轉移功能的實時性提升。 更多作業類型支持,如文件,MQ等類型作業的支持。 更多分片策略支持。 項目的開源地址:https://github.com/dangdangdotcom/elastic-job 希望大家多關註,共同貢獻代碼。 Q&A Q1:請問失效轉移中如何判斷失效?對任務本身實現有什麽限制? 失效轉移目前通過Zookeeper監聽分片項臨時節點判斷。elastic-job會經過註冊中心會話過期時間才能感知任務掛掉。失效轉移有兩種形式:1、任務掛掉,elastic-job會找空閑的作業服務器(可能是未分配任務的,也可能是完成執行本次任務執行的)執行。2、如果當時沒有空閑服務器,則將在某服務器完成分配的任務時抓取未分配的分片項。 Q2:Zookeeper的作用是保存任務信息嗎,如果Zookeeper掛了會影響任務執行嗎? Zookeeper目前的znode分四類,config,servers,execution,leader。config用於保存分布式作業的全局控制,如,分多少片,要不要執行misfire,cron表達式。servers用於註冊作業服務器狀態和分片信息。execution以分片的維度存儲作業運行時狀態。leader用於存儲主節點。elastic-job作業執行是無中心化的,但主節點起到協調的作用,如:重分片、清理上次運行時信息等。 Q3:在任務處理上可以與spring batch集成嗎? spring batch之前關註過,但目前elastic-job還沒有集成。elastic-job的spring支持是自定義了job的命名空間,更簡化了基於spring的配置,並且可以使用spring註入的bean。spring batch也是很好的作業框架,包括spring-quartz也很不錯,但分布式功能並不成熟。所以在這之上改動難度比較大,而且elastic-job更希望做一個不依賴於spring,而是能融入spring的綠色產品。 Q4:針對簡單和數據流,能夠說說具體分片是怎麽處理的嗎? 簡單的作業就是未經過任何業務邏輯的封裝,只是提供了一個execute方法,定時觸發,但是增加了分布式分片功能。可以簡單理解為quartz的分布式版本。quartz雖然可以支持基於數據庫的分布式高可用,但不能分片。也就是說,兩臺服務器,只能一主一備,不能同時負載均衡的運行。數據流類型作業參照了阿裏之前開源的TBSchedule,將數據處理分為fetchData和processData。先將數據從數據庫,文件系統,或其他數據源取出來,然後processData集中處理,可以逐條處理,可以批量處理(這塊未來將加上)。processData是多線程執行的,數據流類型作業可再細分為兩種,一種是高吞吐,一種是順序性。高吞吐可以開啟任意多的線程並行執行數據處理,而順序執行會根據每個分片項一個線程,保證分片項之中的數據有序,這點參照了kafka的實現。數據流類型作業有isStreaming這個參數,用於控制是否流式不停歇的處理數據,類似永動機,只要有數據,則一直處理。但這種作業不適合每次fetchData都對數據庫造成壓力很大的場景。 Q5:請問如何實現一個任務僅僅只在一個節點執行一次? 目前的冪等性,是在execution的znode中增加了對分片項狀態的註冊,如果狀態是運行中,即使有別的服務器要運行這個分片項,elastic-job也會拒絕運行,而是等待這個狀態變為非運行的狀態。每個作業分片項啟動時會更新狀態。服務器沒有波動的情況下,是不存在一個分片被分到兩個服務器的情況。但一旦服務器波動,在分片的瞬間有可能出現這種情況。關於分片,其實是比較復雜的實現。目前分片是發現服務器波動,或修改分片總數,將記錄一個狀態,而非直接分片。分片將在下次作業觸發時執行,只有主節點可以分片,分片中從節點都將阻塞。無調度中心式分布式作業最大的一個問題是,無法保證主節點作業一定先於其他從節點觸發。所以很有可能從節點先觸發執行,而使用舊分片;然後主節點才重新分片,將造成這次作業分片可能不一致。這就需要execution節點來保證冪等性。下次執行時,只要無服務器波動,之前錯誤的分片自然會修正。 Q6:如果Zookeeper掛了,是否全部的任務都掛了不能運行包括已經運行過一次的,如果又恢復了,任務能正常運行嗎,還是業務應用服務也要重新啟動? 其實Zookeeper是不太容易掛的。畢竟Zookeeper是分布式高可用,一般不會是單臺。目前elastic-job做到的容錯是,連不上Zookeeper的作業服務器將立刻停止執行作業,防止主節點已重新分片,而腦裂的服務器還在執行。也就是說,Zookeeper掛掉,所有作業都將停止。而作業服務器一旦與Zookeeper恢復連接,作業也將恢復運行。所以Zookeeper掛掉不會影響數據,而Zookeeper恢復,作業會繼續跑,不用重啟。 Q7:可以具體到業務層面嗎?比如有個任務,是一樣發送100w的用戶郵件,這時候應該怎麽分片?針對分布式數據庫的分頁在咱們這裏又是怎麽處理的? 100W用戶的郵件,個人認為可以按照用戶id取模,比如分成100個分片,將整個userid % 100,然後每個分片發送userid結尾是取摸結果的郵件。詳細來說:分片1發送以01結尾的userid的郵件,…,分片99發送以99結尾的userid的郵件。分布式數據庫的分頁,理論上來說,不是作業框架處理的範疇,應由數據中間層處理。順便說下,ddframe的數據中間層部分,sharding-JDBC將於明年初開源。通過修改JDBC驅動實現分庫分表。非MyCat或cobar這種中間件方式;也非基於hibernate或mybatis這種ORM方式。sharding-JDBC相對輕量級,也更加容易適配各種數據庫和ORM Q8:ddframe是由很多組件組成?支持多語言嗎? ddframe是很多組件的總稱。分為核心模塊,分布式組件模塊和監控對接模塊等。核心模塊可以理解為spring-boot這種可快速啟動,快速搭建項目的東西。 分布式組件包括SOA調用的Dubbox,基於分布式作業的elastic-job,還有剛才提到的sharding-JDBC,以及近期暫無開源計劃的緩存、MQ、NoSQL等模塊。 監控模塊估計以後也不會開源,和公司本身的業務場景綁定太緊,不是不想開源,是無法開源。主要分為日誌中心,流量分析和系統關系調用圖。監控部分目前也還在做,不是很強大。 多語言方面,SOA模塊支持,Dubbox的REST擴展就是為了支持其他語言的調用。剩下的暫時不行。比如sharing-JDBC,主要是基於java的JDBC,如果多語言,中間層是個更好的方法。 ddframe的模塊名字都是dd-*,dd-soa,dd-rdb,dd-job,dd-log之類。elastic-job,sharding-JDBC等,是為開源而從ddframe抽離並重新起的名字。

新一代分布式任務調度框架:當當elastic-job開源項目的10