1. 程式人生 > >asp.net core microservices 架構之 分散式自動計算(一)

asp.net core microservices 架構之 分散式自動計算(一)

 

 

    一:簡介                                   

        自動計算都是常駐記憶體的,沒有人機互動。我們經常用到的就是console job和sql job了。sqljob有自己的宿主,與資料庫產品有很關聯,暫時不提。console job使用quartz.net框架,目前3.x已經支援netcore。

        如果單臺伺服器執行計算,那就很簡單了,quartz很強大了,而且支援故障災難轉移叢集,docker做宿主,很容易實現。但是分散式就不可同日而語了。如果你的一個數據處理太慢,需要多程序多主機處理,那麼就需要多程序自動協調處理這一資料,比如如果你的訂單太多,而一個程序處理延遲是10秒,那使用者體驗就會非常不好,雖然非同步已經提高了你的吞吐量,但是延遲太久,對後續業務還是造成很大的干擾,時不時的會進行停頓。如果兩到三臺的機器進行處理,那延遲就會大大的減低,但是這兩到三臺伺服器如果分配處理任務?如何分割這個資料,多程序進行處理?就需要到這一篇講解的知識了。

        在多個job應用之間進行協調的工具,就是zookeeper了,zookeeper官方介紹:一個分散式應用協調服務。其實他也是一個類似檔案系統的寫一致的資料儲存軟體,我們可以使用它做分散式鎖,對應用進行協調控制。

       目前流行的這一類產品也比較多,但是我是熟悉quartz,至於切片的功能,在quartz之上可以進行封裝,因為所要封裝的功能不多,所以我還是選擇了quartz。

 

二 zookeeper 服務                                                  

         首先就是zookeeper服務,和前面log日誌一樣,首先建立構建和配置檔案類。

  首先看看配置類:

ZookeeperConfiguration.cs
using Microsoft.Extensions.Configuration;

namespace Walt.Framework.Configuration
{
    public class ZookeeperConfiguration
    { 
        public IConfiguration Configuration { get; }

        public ZookeeperConfiguration(IConfiguration configuration)
        {
            Configuration = configuration;
        }
    }
    
}
View Code ZookeeperConfigurationExtensioncs.cs
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using Walt.Framework.Service.Zookeeper;

namespace Walt.Framework.Configuration
{
    public static class ZookeeperConfigurationExtensioncs
    {
          public static IZookeeperBuilder AddConfiguration(this IZookeeperBuilder builder
          ,IConfiguration configuration)
          {
               
                InitService( builder,configuration); 
                return builder;
          }


          public static void InitService(IZookeeperBuilder builder,IConfiguration configuration)
          {
            builder.Services.TryAddSingleton<IConfigureOptions<ZookeeperOptions>>(
                  new ZookeeperConfigurationOptions(configuration));

            builder.Services.TryAddSingleton
            (ServiceDescriptor.Singleton<IOptionsChangeTokenSource<ZookeeperOptions>>(
                  new ConfigurationChangeTokenSource<ZookeeperOptions>(configuration)) );

            builder.Services
            .TryAddEnumerable(ServiceDescriptor.Singleton<IConfigureOptions<ZookeeperOptions>>
            (new ConfigureFromConfigurationOptions<ZookeeperOptions>(configuration)));
            
             builder.Services.AddSingleton(new ZookeeperConfiguration(configuration));
          }
    }
} 
View Code ZookeeperConfigurationOptions.cs
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
using Walt.Framework.Service.Zookeeper;

namespace Walt.Framework.Configuration
{
    public class ZookeeperConfigurationOptions : IConfigureOptions<ZookeeperOptions>
    {

        private readonly IConfiguration _configuration;


        public ZookeeperConfigurationOptions(IConfiguration configuration)
        {
           _configuration=configuration;
        }


        public void Configure(ZookeeperOptions options)
        {
             System.Diagnostics.Debug.WriteLine("zookeeper配置類,適配方法。"
             +Newtonsoft.Json.JsonConvert.SerializeObject(options));
        }
    }
}
View Code

以上這三個類就是配置類,下面是構建類和配置資訊類:

ZookeeperBuilder.cs
using Microsoft.Extensions.DependencyInjection;

namespace Walt.Framework.Service.Zookeeper
{
    public class ZookeeperBuilder : IZookeeperBuilder
    {
        public IServiceCollection Services {get;}

        public ZookeeperBuilder(IServiceCollection services)
        {
            Services=services;
        }
    }
}
View Code ZookeeperOptions.cs
using System;
using System.Collections.Generic;

namespace Walt.Framework.Service.Zookeeper
{
    public class ZookeeperOptions
    {
        /// <param name="connectstring">comma separated host:port pairs, each corresponding to a zk server. 
        /// e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional chroot suffix is used the example would look like:
        /// "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" where the client would be rooted at "/app/a" and all paths would 
        /// be relative to this root - ie getting/setting/etc... "/foo/bar" would result in operations being run on "/app/a/foo/bar" 
        /// (from the server perspective).</param>
        public string Connectstring{get;set;}

        public int SessionTimeout{get;set;}
 
        public int SessionId{get;set;}

        public string SessionPwd{get;set;}

        public bool IsReadOnly{get;set;}

    }
}
View Code ZookeeperService.cs 這個類中有兩個watch類。
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using org.apache.zookeeper;
using org.apache.zookeeper.data;
using static org.apache.zookeeper.ZooKeeper;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using static org.apache.zookeeper.Watcher.Event;
using Newtonsoft.Json;
using System.Collections.Concurrent;

namespace  Walt.Framework.Service.Zookeeper
{

    internal class  WaitLockWatch:Watcher
    {
        private AutoResetEvent _autoResetEvent;
        private ILogger _logger;

        private string _path;

        private ZookeeperService _zookeeperService;

        public string _tempNode;

        public WaitLockWatch(AutoResetEvent autoResetEvent
        ,ZookeeperService zookeeperService
        ,ILogger logger,string path
        ,string tempNode)
        {
            _autoResetEvent=autoResetEvent;
            _zookeeperService=zookeeperService;
            _logger=logger;
            _path=path;
            _tempNode=tempNode;
        }

       public override Task process(WatchedEvent @event)
       {
           _logger.LogDebug("{0}節點下子節點發生改變,激發監視方法。",_path);
            var childList=_zookeeperService.GetChildrenAsync(_path,null,true).Result;
            if(childList==null||childList.Children==null||childList.Children.Count<1)
                   {
                        _logger.LogDebug("獲取子序列失敗,計數為零.path:{0}",_path);
                        return Task.FromResult(0);
                   }
                   var top=childList.Children.OrderBy(or=>or).First();
                   if(_path+"/"+top==_tempNode)
                   {
                        _logger.LogDebug("釋放阻塞");
                        _autoResetEvent.Set();
                   }
           
            return Task.FromResult(0);
       }
    }


    internal class WaitConnWatch : Watcher
    {
        private AutoResetEvent _autoResetEvent;
        private ILogger _logger;

        public WaitConnWatch(AutoResetEvent autoResetEvent
        ,ILogger logger)
        {
            _autoResetEvent=autoResetEvent;
            _logger=logger;
        }

       public override Task process(WatchedEvent @event)
       {
           _logger.LogDebug("watch激發,回掉狀態:{0}",@event.getState().ToString());
            if(@event.getState()== KeeperState.SyncConnected
            ||@event.getState()== KeeperState.ConnectedReadOnly)
            {
                _logger.LogDebug("釋放阻塞");
                _autoResetEvent.Set();
            }
            return Task.FromResult(0);
       }
    }

    public class ZookeeperService : IZookeeperService
    {

        public List<string> requestLockSequence=new List<string>();
        private object _lock=new object();
        private ZookeeperOptions _zookeeperOptions;
        private ZooKeeper _zookeeper;

         private static readonly byte[] NO_PASSWORD = new byte[0];

         public Watcher Wathcer {get;set;}

         public ILoggerFactory LoggerFac { get; set; }

         private ILogger _logger;

         AutoResetEvent[] autoResetEvent=new AutoResetEvent[2]
         {new AutoResetEvent(false),new AutoResetEvent(false)};

        public ZookeeperService(IOptionsMonitor<ZookeeperOptions>  zookeeperOptions
        ,ILoggerFactory loggerFac)
        {
            LoggerFac=loggerFac;
            _logger=LoggerFac.CreateLogger<ZookeeperService>();
            _zookeeperOptions=zookeeperOptions.CurrentValue; 
            _logger.LogDebug("配置引數:{0}",JsonConvert.SerializeObject(_zookeeperOptions));
             zookeeperOptions.OnChange((zookopt,s)=>{
                _zookeeperOptions=zookopt; 
            });
            _logger.LogDebug("開始連線");
            Conn(_zookeeperOptions); 
        }

       

        private void Conn(ZookeeperOptions zookeeperOptions)
        {
            bool isReadOnly=default(Boolean);
            Wathcer=new WaitConnWatch(autoResetEvent[0],_logger);
            if(isReadOnly!=zookeeperOptions.IsReadOnly)
            {
                isReadOnly=zookeeperOptions.IsReadOnly;
            }

            
            byte[] pwd=new byte[0];
            //如果沒有密碼和sessionId
            if(string.IsNullOrEmpty(zookeeperOptions.SessionPwd)
            &&_zookeeperOptions.SessionId==default(int))
            {
             _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,isReadOnly);
            }
            else if (!string.IsNullOrEmpty(zookeeperOptions.SessionPwd))
            {
                pwd=System.Text.Encoding.Default.GetBytes(zookeeperOptions.SessionPwd);
                 _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,0,pwd,isReadOnly);
            }
            else
            {
                 _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring
                 ,zookeeperOptions.SessionTimeout,Wathcer,zookeeperOptions.SessionId,pwd,isReadOnly);
            }
             if(_zookeeper.getState()==States.CONNECTING)
            {
                _logger.LogDebug("當前狀態:CONNECTING。阻塞等待");
                autoResetEvent[0].WaitOne();
            }
        }

        public Task<string> CreateZNode(string path,string data,CreateMode createMode,List<ACL> aclList)
        {
            ReConn();
            if(string.IsNullOrEmpty(path)||!path.StartsWith('/'))
            {
                _logger.LogDebug("path路徑非法,引數:path:{0}",path);
                return null;
            }
            byte[] dat=new byte[0];
            if(string.IsNullOrEmpty(data))
            { 
                dat=System.Text.Encoding.Default.GetBytes(data);
            }
            if(createMode==null)
            {
                 _logger.LogDebug("createMode為null,預設使用CreateMode.PERSISTENT");
                createMode=CreateMode.PERSISTENT;
            }
            return _zookeeper.createAsync(path,dat,aclList,createMode);
        }

        public Task<DataResult> GetDataAsync(string path,Watcher watcher,bool isSync)
        {
            ReConn();
            if(_zookeeper.existsAsync(path).Result==null )
            {
                _logger.LogDebug("path不存在");
                return null;
            }
            if(isSync)
            {
                 _logger.LogDebug("即將進行同步。"); 
                 var task=Task.Run(async ()=>{
                    await _zookeeper.sync(path);  
                 }); 
                task.Wait();
            }
           

            return _zookeeper.getDataAsync(path,watcher);
        }

         public Task<Stat> SetDataAsync(string path,string data,bool isSync)
        {
            ReConn();
            if(_zookeeper.existsAsync(path).Result==null )
            {
                 _logger.LogDebug("path不存在");
                return null;
            }
            byte[] dat=new byte[0];
            if(!string.IsNullOrEmpty(data))
            { 
                dat=System.Text.Encoding.Default.GetBytes(data);
            }
            return _zookeeper.setDataAsync(path,dat);
        }

         public async Task<ChildrenResult> GetChildrenAsync(string path,Watcher watcher,bool isSync) 
         {
             ReConn();
              if(_zookeeper.existsAsync(path).Result==null )
            {
                 _logger.LogDebug("path不存在");
                return null;
            }
             if(isSync)
            {
                 _logger.LogDebug("即將進行同步。");
                 var task=Task.Run(async  ()=>{
                      _logger.LogDebug("開始同步");
                      await _zookeeper.sync(path);  
                 });
                task.Wait();
            }
             return await _zookeeper.getChildrenAsync(path,watcher);
         }

         public void DeleteNode(string path,String tempNode)
         {
             if(!string.IsNullOrEmpty(tempNode))
             {
                requestLockSequence.Remove(tempNode); 
             }
             ReConn();
              if(_zookeeper.existsAsync(path).Result==null )
            {
                 _logger.LogDebug("path不存在");
                return;
            }
            var  task=Task.Run(async ()=>{
                 _logger.LogDebug("刪除node:{0}",path);
                  await _zookeeper.deleteAsync(path);
            });
            task.Wait();
            var sequencePath=requestLockSequence.Where(w=>path==w).FirstOrDefault();
            if(sequencePath!=null)
            {
                requestLockSequence.Remove(sequencePath);
            }
         }

         public  string GetDataByLockNode(string path,string sequenceName,List<ACL> aclList,out string tempNodeOut)
         {
             _logger.LogInformation("獲取分散式鎖開始。");
             ReConn();
             string tempNode=string.Empty;
             tempNodeOut=string.Empty;

              if(_zookeeper.existsAsync(path).Result==null )
            {
                 _logger.LogDebug("path不存在");
                return null;
            }

            
            try
            {
                _logger.LogDebug("開始鎖定語句塊");
                lock(_lock)
                {
                     _logger.LogDebug("鎖定,訪問requestLockSequence的程式碼應該同步。");
                    tempNode=requestLockSequence
                    .Where(w=>w.StartsWith(path+"/"+sequenceName)).FirstOrDefault();
                   
                    if(tempNode==null)
                    {
                        tempNode=CreateZNode(path+"/"+sequenceName,"",CreateMode.EPHEMERAL_SEQUENTIAL,aclList).Result;
                        _logger.LogDebug("建立節點:{0}",tempNode);
                        if(tempNode==null)
                        {
                            _logger.LogDebug("建立臨時序列節點失敗。詳細引數:path:{0},data:{1},CreateMode:{2}"
                            ,path+"/squence","",CreateMode.EPHEMERAL_SEQUENTIAL);
                            return null;
                        }
                         _logger.LogInformation("建立成功,加入requestLockSequence列表。");
                        requestLockSequence.Add(tempNode);
                    }
                    else
                    {
                        _logger.LogDebug("已經存在的鎖節點,返回null");
                        return null;
                    }
                }

                var childList= GetChildrenAsync(path,null,true).Result;
                   if(childList==null||childList.Children==null||childList.Children.Count<1)
                   {
                        _logger.LogDebug("獲取子序列失敗,計數為零.path:{0}",path);
                        return null;
                   }
                   _logger.LogDebug("獲取path:{0}的子節點:{1}",path,Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children));
                   var top=childList.Children.OrderBy(or=>or).First();
                   byte[] da=null;
                   if(path+"/"+top==tempNode)
                   {
                       tempNodeOut =tempNode;
                       da= GetDataAsync(path,null,true).Result.Data;
                        if(da==null||da.Length<1)
                        {
                            return string.Empty;
                        } 
                        return System.Text.Encoding.Default.GetString(da);
                   }
                   else
                   {
                    childList= GetChildrenAsync(path,new WaitLockWatch(autoResetEvent[1],this,_logger,path,tempNode),true).Result;
                    autoResetEvent[1].WaitOne();
                   }
                    _logger.LogDebug("繼續執行。");
                    tempNodeOut =tempNode;
                    da= GetDataAsync(path,null,true).Result.Data;
                    if(da==null||da.Length<1)
                    {
                         return string.Empty;
                    }
                    return System.Text.Encoding.Default.GetString(da);
            }
            catch(Exception ep)
            {
                 _logger.LogError(ep,"獲取同步鎖出現錯誤。");
                if(!string.IsNullOrEmpty(tempNode))
                {
                    DeleteNode(tempNode,tempNode);  
                }
            }
            return null;
         }

         private void ReConn()
         {
             _logger.LogInformation("檢查連線狀態");
             if(_zookeeper.getState()==States.CLOSED
             ||_zookeeper.getState()== States.NOT_CONNECTED)
             {
                 _logger.LogInformation("連線為關閉,開始重新連線。");
                Conn(_zookeeperOptions);
             }
         }

         public void Close(string tempNode)
         {
             var task=Task.Run(async ()=>{ 
             requestLockSequence.Remove(tempNode); 
              await _zookeeper.closeAsync();
             });
             task.Wait(); 
         }
 
    }
}
View Code

 

前面的類如果瞭解了netcore的擴充套件服務和配置機制,就很簡單理解了,我們主要是講解這個服務類的功能。

首先看服務類其中的一段程式碼:

 private void Conn(ZookeeperOptions zookeeperOptions)
        {
            bool isReadOnly=default(Boolean);
            Wathcer=new WaitConnWatch(autoResetEvent[0],_logger); //監控連線是否連線成功
            if(isReadOnly!=zookeeperOptions.IsReadOnly)
            {
                isReadOnly=zookeeperOptions.IsReadOnly;
            }

            
            byte[] pwd=new byte[0];
            //如果沒有密碼和sessionId
            if(string.IsNullOrEmpty(zookeeperOptions.SessionPwd)
            &&_zookeeperOptions.SessionId==default(int))
            {
             _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,isReadOnly);
            }
            else if (!string.IsNullOrEmpty(zookeeperOptions.SessionPwd))
            {
                pwd=System.Text.Encoding.Default.GetBytes(zookeeperOptions.SessionPwd);
                 _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,0,pwd,isReadOnly);
            }
            else
            {
                 _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring
                 ,zookeeperOptions.SessionTimeout,Wathcer,zookeeperOptions.SessionId,pwd,isReadOnly);
            }
             if(_zookeeper.getState()==States.CONNECTING)
            {
                _logger.LogDebug("當前狀態:CONNECTING。阻塞等待");
                autoResetEvent[0].WaitOne();
            }
        }

這個方法是連線zookeeper,我們在建構函式中呼叫它,注意,zookeeper是非同步,所以需要watcher類監控和AutoResetEvent阻塞當前執行緒,因為如果不阻塞,在連線還沒建立的時候,後面呼叫,

會出現錯誤。在監控類被觸發的時候,執行取消阻塞。

 internal class WaitConnWatch : Watcher
    {
        private AutoResetEvent _autoResetEvent;
        private ILogger _logger;

        public WaitConnWatch(AutoResetEvent autoResetEvent
        ,ILogger logger)
        {
            _autoResetEvent=autoResetEvent;
            _logger=logger;
        }

       public override Task process(WatchedEvent @event)
       {
           _logger.LogDebug("watch激發,回掉狀態:{0}",@event.getState().ToString());
            if(@event.getState()== KeeperState.SyncConnected
            ||@event.getState()== KeeperState.ConnectedReadOnly)
            {
                _logger.LogDebug("釋放阻塞");
                _autoResetEvent.Set();  //取消阻塞當前執行緒。
            }
            return Task.FromResult(0);
       }
    }

大家看zookeeper服務類,裡面很多方法在執行前需要執行conn方法,這只是一種失敗重連的機制,因為一般沒有執行緒池的,我都會給這個服務單例。哪有人會問,如果失敗重連,會不會阻塞當前應用,

這個不會,因為netcore是多執行緒的,但是會降低這個應用的生產力。我前面翻譯過一篇net的關於執行緒的知識,後面也會單獨一篇講解netcore的執行緒模型。還有程式碼中我很少用異常去處理容錯性,儘量丟擲原生的異常,使用日誌去記錄,然後為這個類返回個null,異常對效能也有一定的消耗,當然看自己習慣了,目的都是為了應用的健壯性。

其他方法是操作zookeeper的類,大家可以看我貼出來的程式碼。因為zookeeper最出名的估計就是分散式鎖了,所以就把這個功能加進來。

 public  string GetDataByLockNode(string path,string sequenceName,List<ACL> aclList,out string tempNodeOut)
         {
             _logger.LogInformation("獲取分散式鎖開始。");
             ReConn();
             string tempNode=string.Empty;
             tempNodeOut=string.Empty;

              if(_zookeeper.existsAsync(path).Result==null )
            {
                 _logger.LogDebug("path不存在");
                return null;
            }

            
            try
            {
                _logger.LogDebug("開始鎖定語句塊");
                lock(_lock)  //這是我為了防止重複提交做的防止併發,當然實際用的地方是協調應用之間的功能,而肯定不會用人機互動。你可以把這個看作多此一舉。
                {
                     _logger.LogDebug("鎖定,訪問requestLockSequence的程式碼應該同步。");
                    tempNode=requestLockSequence
                    .Where(w=>w.StartsWith(path+"/"+sequenceName)).FirstOrDefault();
                   
                    if(tempNode==null)
                    {
                        tempNode=CreateZNode(path+"/"+sequenceName,"",CreateMode.EPHEMERAL_SEQUENTIAL,aclList).Result;
                        _logger.LogDebug("建立節點:{0}",tempNode);
                        if(tempNode==null)
                        {
                            _logger.LogDebug("建立臨時序列節點失敗。詳細引數:path:{0},data:{1},CreateMode:{2}"
                            ,path+"/squence","",CreateMode.EPHEMERAL_SEQUENTIAL);
                            return null;
                        }
                         _logger.LogInformation("建立成功,加入requestLockSequence列表。");
                        requestLockSequence.Add(tempNode);
                    }
                    else
                    {
                        _logger.LogDebug("已經存在的鎖節點,返回null");
                        return null;
                    }
                }

                var childList= GetChildrenAsync(path,null,true).Result;  //首先獲取lock子節點。
                   if(childList==null||childList.Children==null||childList.Children.Count<1)
                   {
                        _logger.LogDebug("獲取子序列失敗,計數為零.path:{0}",path);
                        return null;
                   }
                   _logger.LogDebug("獲取path:{0}的子節點:{1}",path,Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children));
                   var top=childList.Children.OrderBy(or=>or).First();
                   byte[] da=null;
                   if(path+"/"+top==tempNode) //判斷是否是當前自己的節點在佇列頂端。
                   {
                       tempNodeOut =tempNode;
                       da= GetDataAsync(path,null,true).Result.Data;
                        if(da==null||da.Length<1)
                        {
                            return string.Empty;
                        } 
                        return System.Text.Encoding.Default.GetString(da);
                   }
                   else
                   {
                    childList= GetChildrenAsync(path,new WaitLockWatch(autoResetEvent[1],this,_logger,path,tempNode),true).Result; //如果自己不再佇列頂端,則加監聽等待這個節點有更改。
                    autoResetEvent[1].WaitOne();
                   }
                    _logger.LogDebug("繼續執行。");
                    tempNodeOut =tempNode;
                    da= GetDataAsync(path,null,true).Result.Data;
                    if(da==null||da.Length<1)
                    {
                         return string.Empty;
                    }
                    return System.Text.Encoding.Default.GetString(da);
            }
            catch(Exception ep)
            {
                 _logger.LogError(ep,"獲取同步鎖出現錯誤。");
                if(!string.IsNullOrEmpty(tempNode))
                {
                    DeleteNode(tempNode,tempNode);  
                }
            }
            return null;
         }

 

接下來看監聽類:

 internal class  WaitLockWatch:Watcher
    {
        private AutoResetEvent _autoResetEvent;
        private ILogger _logger;

        private string _path;

        private ZookeeperService _zookeeperService;

        public string _tempNode;

        public WaitLockWatch(AutoResetEvent autoResetEvent
        ,ZookeeperService zookeeperService
        ,ILogger logger,string path
        ,string tempNode)
        {
            _autoResetEvent=autoResetEvent;
            _zookeeperService=zookeeperService;
            _logger=logger;
            _path=path;
            _tempNode=tempNode;
        }

       public override Task process(WatchedEvent @event)
       {
           _logger.LogDebug("{0}節點下子節點發生改變,激發監視方法。",_path);
            var childList=_zookeeperService.GetChildrenAsync(_path,null,true).Result;
            if(childList==null||childList.Children==null||childList.Children.Count<1)
                   {
                        _logger.LogDebug("獲取子序列失敗,計數為零.path:{0}",_path);
                        return Task.FromResult(0);
                   }
                   var top=childList.Children.OrderBy(or=>or).First();
                   if(_path+"/"+top==_tempNode)   //判斷當前節點是否佇列頂端
                   {
                        _logger.LogDebug("釋放阻塞");
                        _autoResetEvent.Set();
                   }
           
            return Task.FromResult(0);
       }
    }

 

這個類,在path節點每次更改或者子節點更改的時候都會激發,僅僅是判斷當前節點是不是列表的頂端,再執行 _autoResetEvent.Set();釋放阻塞讓繼續執行。

編譯提交nuget,然後整合到測試程式看效果。

首先看zookeeper:

目前節點下沒有一個子節點,再看看這個節點的值:

首先新建一個看看:

create -s -e /testzookeeper/sequence "" 

然後啟動程式,當程式提交一個子節點,他是排序的:

我們刪除我們剛剛用命令列建立的節點:

 delete /testzookeeper/sequence0000000072

然後再看值:

我們的客戶端就是獲取到鎖後更改這個值,增加了一段字串,客戶端呼叫如下:

 [HttpGet("{id}")]
        public ActionResult<string> Get(int id)
        { 
            string sequenceNode=string.Empty;
            string nodename=_zookeeperService.GetDataByLockNode("/testzookeeper","sequence",ZooDefs.Ids.OPEN_ACL_UNSAFE,out  sequenceNode);
            if(sequenceNode==null)
            {
                return "獲取分散式鎖失敗,請檢視日誌。";
            }
            
            _zookeeperService.SetDataAsync("/testzookeeper",nodename+"執行了"+DateTime.Now.ToString("yyyyMMddhhmmss"),false);
            if(!string.IsNullOrEmpty(sequenceNode))
            {
                _zookeeperService.DeleteNode(sequenceNode,sequenceNode);
                return "取得鎖並且成功處理資料,釋放鎖成功。";
            }

            return "出現錯誤,請查日誌。";
        }

當然這個是我做測試用的,實際開發,web就是用來做他的資料庫存取的,儘量不要去做一些額外的功能,因為i守護程序或者後臺服務和job有它自己的任務。

 下一節我們就實際 使用這個分散式鎖做quartz的切片功能。