asp.net core microservices 架構之分散式自動計算(三)-kafka遷移至elasticsearch-kibana展示
一 kafka consumer準備
前面的章節進行了分散式job的自動計算的概念講解以及實踐。上次分散式日誌說過日誌寫進kafka,是需要進行處理,以便合理的進行展示,分散式日誌的量和我們對日誌的重視程度,決定了我們必須要有一個大資料檢索,和友好展示的需求。那麼自然就是elasticsearch和kibana,elasticsearch是可以檢索TB級別資料的一個分散式NOSQL資料庫,而kibana,不僅僅可以展示詳情,而且有針對不同展示需求的功能,並且定製了很多很多日誌格式的模板和採集資料的外掛,這裡不多介紹了,我自己感覺是比percona的pmm強大很多。
書歸正傳,我們這一節是要做同步前的準備工作。第一,對kafka的consumer進行封裝。第二,讀取kafka資料是需要一個後臺程式去處理,但是不需要job,我們上次做的框架是基於zookeeper的分散式job,而kafka的分散式是在伺服器端的,當然將job分散式設計方案用在輪詢或者阻塞方式的後臺程式,也是可以的,但是這次就不講解了。下面我們就將kafka分散式的原理分析下,kafka的客戶端有一個組的概念,borker端有一個topic的概念,product在傳送訊息的時候,會有一個key值。因為kafka存資料就是以key-value的方式儲存資料的,所以broker就是用product傳遞過來的這個key進行運算,合理的將資料儲存到某個topic的某個分割槽。而consumer端訂閱topic,可以訂閱多個topic,它的分派是這樣的,每一個topic下的分割槽會有多個consuer,但是這些consumer必須屬於不同的組,而每一個consumer可以訂閱多個topic下的分割槽,但是不能重複。下面看圖吧,以我們這次實際的日誌為例,在kafka中mylog topic有5個分割槽。
那麼如果我們有三個程式需要用這個mylog topic怎麼辦?而且我們需要很快的處理完這個資料,所以有可能這三個程式每一個程式都要兩臺伺服器。想著都很頭大,對吧?當然如果有我們前面講解的分散式job也可以處理,但是要把分散式的功能遷移到這個後臺程式,避免不了又大動干戈,開發,除錯,測試,修改bug,直到程式穩定,那又是一場苦功。但是在kafka這裡,不用擔心,三個程式,比如訂單,庫存,顧客,我們為這三個程式的kafka客戶端對應的設定為三個組,每一個組中consumer數量只要不超過5個,假如訂單需要用到名為mylog的topic的訊息,只要訂單處理這個topic的例項數量,必須不能超過5個,當然可以少於5個,也可以等於0個。而同時一個consumer又可以去訂閱多個topic,這也是kafka可以媲美rabbit的重要的一個原因,先天支援併發和擴充套件。我們看圖:
如果一個組的consumer數量沒有topic的分割槽多,kafka會自動分派給這個組的consumer,如果某一個consumer失敗,kafka也會自動的將這個consumer的offset記錄並且分派給另外一個consumer。
下面看看我們封裝的kafka客戶端方法:
using System; using System.Collections.Generic; using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.Options; namespaceWalt.Framework.Service.Kafka { public class KafkaService : IKafkaService { private KafkaOptions _kafkaOptions; private Producer _producer; private Consumer _consumer; public Action<Message> GetMessageDele{ get; set; } public Action<Error> ErrorDele{ get; set; } public Action<LogMessage> LogDele{ get; set; } public KafkaService(IOptionsMonitor<KafkaOptions>kafkaOptions) { _kafkaOptions=kafkaOptions.CurrentValue; kafkaOptions.OnChange((kafkaOpt,s)=>{ _kafkaOptions=kafkaOpt; System.Diagnostics.Debug .WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(kafkaOpt)+"---"+s); }); _producer=new Producer(_kafkaOptions.Properties); _consumer=new Consumer(_kafkaOptions.Properties); } private byte[] ConvertToByte(string str) { return System.Text.Encoding.Default.GetBytes(str); } publicasync Task<Message> Producer<T>(string topic,string key,T t) { if(string.IsNullOrEmpty(topic) || t==null) { throw new ArgumentNullException("topic或者value不能為null."); } string data = Newtonsoft.Json.JsonConvert.SerializeObject(t); var task=await _producer.ProduceAsync(topic,ConvertToByte(key),ConvertToByte(data)); return task; } public void AddProductEvent() { _producer.OnError+=new EventHandler<Error>(Error); _producer.OnLog+=new EventHandler<LogMessage>(Log); } ///以事件的方式獲取message public void AddConsumerEvent(IEnumerable<string> topics) { _consumer.Subscribe(topics); _consumer.OnMessage += new EventHandler<Message>(GetMessage); _consumer.OnError += new EventHandler<Error>(Error); _consumer.OnLog += new EventHandler<LogMessage>(Log); } private void GetMessage(object sender, Message mess) { if(GetMessageDele!=null) { GetMessageDele(mess); } } private void Error(object sender, Error mess) { if(ErrorDele!=null) { ErrorDele(mess); } } private void Log(object sender, LogMessage mess) { if(LogDele!=null) { LogDele(mess); } } //以輪詢的方式獲取message public Message Poll(int timeoutMilliseconds) { Message message =default(Message); _consumer.Consume(out message, timeoutMilliseconds); return message; } } }
以事件激發的方式,因為是執行緒安全的方式呼叫,而本例項是後臺方式執行,少不了多執行緒,所以還是以輪詢的方式。以輪詢的方式,這樣的程式需要放那塊尼?就是我們的後臺程式框架。
二 後臺程式管理框架開發
他的原理和job幾乎差不多,比job要簡單多了。看入口程式:
using System; using System.Collections.Generic; using System.Collections.ObjectModel; using System.IO; using System.Linq; using System.Reflection; using System.Threading.Tasks; using Microsoft.AspNetCore; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using EnvironmentName = Microsoft.Extensions.Hosting.EnvironmentName; using Walt.Framework.Log; using Walt.Framework.Service; using Walt.Framework.Service.Kafka; using Walt.Framework.Configuration; using MySql.Data.EntityFrameworkCore; using Microsoft.EntityFrameworkCore; using System.Threading; using IApplicationLife =Microsoft.Extensions.Hosting; using IApplicationLifetime = Microsoft.Extensions.Hosting.IApplicationLifetime; namespace Walt.Framework.Console { public class Program { public static void Main(string[] args) { //這裡獲取程式及和工作執行緒配置資訊 Dictionary<string, Assembly> assmblyColl = new Dictionary<string, Assembly>(); var host = new HostBuilder() .UseEnvironment(EnvironmentName.Development) .ConfigureAppConfiguration((hostContext, configApp) => { //這裡netcore支援多資料來源,所以可以擴充套件到資料庫或者redis,集中進行配置。 // configApp.SetBasePath(Directory.GetCurrentDirectory()); configApp.AddJsonFile( $"appsettings.{hostContext.HostingEnvironment.EnvironmentName}.json", optional: true); configApp.AddEnvironmentVariables("PREFIX_"); configApp.AddCommandLine(args); }).ConfigureLogging((hostContext, configBuild) => { configBuild.AddConfiguration(hostContext.Configuration.GetSection("Logging")); configBuild.AddConsole(); configBuild.AddCustomizationLogger(); }) .ConfigureServices((hostContext, service) => { service.Configure<HostOptions>(option => { option.ShutdownTimeout = System.TimeSpan.FromSeconds(10); }); service.AddKafka(KafkaBuilder => { KafkaBuilder.AddConfiguration(hostContext.Configuration.GetSection("KafkaService")); }); service.AddElasticsearchClient(config=>{ config.AddConfiguration(hostContext.Configuration.GetSection("ElasticsearchService")); }); service.AddDbContext<ConsoleDbContext>(option => option.UseMySQL(hostContext.Configuration.GetConnectionString("ConsoleDatabase")), ServiceLifetime.Transient, ServiceLifetime.Transient); ///TODO 待實現從資料庫中pull資料,再將任務新增進DI service.AddSingleton<IConsole,KafkaToElasticsearch>(); }) .Build(); CancellationTokenSource source = new CancellationTokenSource(); CancellationToken token = source.Token; var task=Task.Run(async () =>{ IConsole console = host.Services.GetService<IConsole>(); await console.AsyncExcute(source.Token); },source.Token); Dictionary<string, Task> dictTask = new Dictionary<string, Task>(); dictTask.Add("kafkatoelasticsearch", task); int recordRunCount = 0; var fact = host.Services.GetService<ILoggerFactory>(); var log = fact.CreateLogger<Program>(); var disp = Task.Run(() => { while (true) { if (!token.IsCancellationRequested) { ++recordRunCount; foreach (KeyValuePair<string, Task> item in dictTask) { if (item.Value.IsCanceled || item.Value.IsCompleted || item.Value.IsCompletedSuccessfully || item.Value.IsFaulted) { log.LogWarning("console任務:{0},引數:{1},執行異常,task狀態:{2}", item.Key, "", item.Value.Status); if (item.Value.Exception != null) { log.LogError(item.Value.Exception, "task:{0},引數:{1},執行錯誤.", item.Key, ""); //TODO 根據引數更新資料庫狀態,以便被監控到。 } //更新資料庫狀態。 } } } System.Threading.Thread.Sleep(2000); log.LogInformation("迴圈:{0}次,接下來等待2秒。", recordRunCount); } },source.Token); IApplicationLifetime appLiftTime = host.Services.GetService<IApplicationLifetime>(); appLiftTime.ApplicationStopping.Register(()=>{ log.LogInformation("程式停止中。"); source.Cancel(); log.LogInformation("程式停止完成。"); }); host.RunAsync().GetAwaiter().GetResult(); } } }
因為分散式job有quartz,是有自己的設計理念,但是這個console後臺框架不需要,是自己開發,所以完全和Host通用主機相容,所有的部件都可以DI。設計原理就是以資料庫的配置,構造Task,然後使用
CancellationTokenSource和TaskCompletionSource去管理Task。執行結果根據狀態去更新資料庫,以便監控。當然咱們這個例子功能沒實現全,後面可以完善 ,感興趣的可以去我的github上pull程式碼。咱們看任務中的例子程式碼:
using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Nest; using Walt.Framework.Log; using Walt.Framework.Service.Elasticsearch; using Walt.Framework.Service.Kafka; namespace Walt.Framework.Console { public class KafkaToElasticsearch : IConsole { ILoggerFactory _logFact; IConfiguration _config; IElasticsearchService _elasticsearch; IKafkaService _kafkaService; public KafkaToElasticsearch(ILoggerFactory logFact,IConfiguration config ,IElasticsearchService elasticsearch ,IKafkaService kafkaService) { _logFact = logFact; _config = config; _elasticsearch = elasticsearch; _kafkaService = kafkaService; } public async Task AsyncExcute(CancellationToken cancel=default(CancellationToken)) { var log = _logFact.CreateLogger<KafkaToElasticsearch>(); _kafkaService.AddConsumerEvent(new List<string>(){"mylog"}); //以事件方式獲取message不工作,因為跨執行緒 // _kafkaService.GetMessageDele = (message) => { //var id = message.Key; //var offset = string.Format("{0}---{2}",message.Offset.IsSpecial,message.Offset.Value); //var topic = message.Topic; //var topicPartition = message.TopicPartition.Partition.ToString(); //var topicPartitionOffsetValue = message.TopicPartitionOffset.Offset.Value; //// log.LogInformation("id:{0},offset:{1},topic:{2},topicpatiton:{3},topicPartitionOffsetValue:{4}" //// ,id,offset,topic,topicPartition,topicPartitionOffsetValue); // }; //_kafkaService.ErrorDele = (message) => { //log.LogError(message.ToString()); //}; //_kafkaService.LogDele = (message) => { //log.LogInformation(message.ToString()); // }; // log.LogInformation("事件新增完畢"); // var waitForStop = // new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously); // cancel.Register(()=>{ //log.LogInformation("task執行被取消回掉函式"); //waitForStop.SetResult(null); // }); // waitForStop.Task.Wait(); // log.LogInformation("任務已經被取消。"); //下面以輪詢方式。 if(!cancel.IsCancellationRequested) { while (true) { Message message = _kafkaService.Poll(2000); if (message != null) { if(message.Error!=null&&message.Error.Code!=ErrorCode.NoError) { //log.LogError("consumer獲取message出錯,詳細資訊:{0}",message.Error); System.Console.WriteLine("consumer獲取message出錯,詳細資訊:{0}",message.Error); System.Threading.Thread.Sleep(200); continue; } var id =message.Key==null?"":System.Text.Encoding.Default.GetString(message.Key); var offset = string.Format("{0}---{1}", message.Offset.IsSpecial, message.Offset.Value); var topic = message.Topic; var topicPartition = message.TopicPartition.Partition.ToString(); var topicPartitionOffsetValue = message.TopicPartitionOffset.Offset.Value; var val =System.Text.Encoding.Default.GetString( message.Value); EntityMessages entityMess = Newtonsoft.Json.JsonConvert.DeserializeObject<EntityMessages>(val); await_elasticsearch.CreateIndexIfNoExists<LogElasticsearch>("mylog"+entityMess.OtherFlag); // _elasticsearch.CreateMappingIfNoExists<LogElasticsearch>("mylog"+entityMess.OtherFlag //,"mylog"+entityMess.OtherFlag+"type",null); //為elasticsearch新增document var addDocumentResponse = await _elasticsearch.CreateDocument<LogElasticsearch>("mylog" + entityMess.OtherFlag , new LogElasticsearch() { Id = entityMess.Id, Time = entityMess.DateTime, LogLevel = entityMess.LogLevel, Exception = entityMess.Message } ); if (addDocumentResponse != null) { if (!addDocumentResponse.ApiCall.Success) { } } } } } return ; } } }
三 elasticsearch 服務開發
服務已經開發很多了,主要就是構建和配置的設計,還有就是對元件的封裝,看程式結構:
配置:
{ "Logging": { "LogLevel": { "Default": "Information", "System": "Information", "Microsoft": "Information" }, "KafkaLog":{ "Prix":"console", //目前這個屬性,可以放程式類別,比如使用者中心,商品等。 "LogStoreTopic":"mylog" } }, "KafkaService":{ "Properties":{ "bootstrap.servers":"192.168.249.106:9092", "group.id":"group2" } }, "ConnectionStrings": { "ConsoleDatabase":"Server=192.168.249.106;Database=quartz;Uid=quartz;Pwd=quartz" }, "ElasticsearchService":{ "Host":["http://192.168.249.105:9200","http://localhost:9200"], "TimeOut":"10000", "User":"", "Pass":"" } }
服務類:這裡有必要說下,elasticsearch是基於api的介面,最底層就是http請求,在介面上,實現了一個高階的介面和一個低級別的介面,當然低級別的介面需要熟悉elasticsearch的協議,
而高級別的api,使用強型別去使用,對開發很有幫助。下面是封裝elasticsearch的服務類:
using System; using System.Net.Http; using Elasticsearch.Net; using Microsoft.Extensions.Options; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Nest; namespace Walt.Framework.Service.Elasticsearch { public class ElasticsearchService:IElasticsearchService { privateElasticsearchOptions _elasticsearchOptions=null; private ElasticClient _elasticClient = null; private ILoggerFactory _loggerFac; public ElasticsearchService(IOptionsMonitor<ElasticsearchOptions>options ,ILoggerFactory loggerFac) { _elasticsearchOptions = options.CurrentValue; options.OnChange((elasticsearchOpt,s)=>{ _elasticsearchOptions=elasticsearchOpt; System.Diagnostics.Debug .WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(elasticsearchOpt)+"---"+s); }); //連線客戶端需,支援多個節點,防止單點故障 var lowlevelClient = new ElasticLowLevelClient(); var urlColl = new Uri[_elasticsearchOptions.Host.Length]; for (int i = 0; i < _elasticsearchOptions.Host.Length;i++) { urlColl[i] = new Uri(_elasticsearchOptions.Host[i]); } _loggerFac = loggerFac; var connectionPool = new SniffingConnectionPool(urlColl); var settings = new ConnectionSettings(connectionPool) .RequestTimeout(TimeSpan.FromMinutes(_elasticsearchOptions.TimeOut)) .DefaultIndex("mylogjob");//設定預設的index _elasticClient = new ElasticClient(settings); } //如果index存在,則返回,如果不存在,則建立,type的建立方式是為文件型別打標籤ElasticsearchTypeAttribute public async Task<bool> CreateIndexIfNoExists<T>(string indexName) where T : class { var log = _loggerFac.CreateLogger<ElasticsearchService>(); var exists = await _elasticClient.IndexExistsAsync(Indices.Index(indexName)); if (exists.Exists) { log.LogWarning("index:{0}已經存在", indexName.ToString()); return await Task.FromResult(true); } var response = await _elasticClient.CreateIndexAsync(indexName ,c=>c.Mappings(mm=>mm.Map<T>(m=>m.AutoMap())));//將型別的屬性自動對映到index的type上,也可以打標籤控制那個可以對映,那些不可以 log.LogInformation(response.DebugInformation); if (response.Acknowledged) { log.LogInformation("index:{0},建立成功", indexName.ToString()); return await Task.FromResult(false); } else { log.LogError(response.ServerError.ToString()); log.LogError(response.OriginalException.ToString()); return await Task.FromResult(false); } } //建立document public async Task<ICreateResponse> CreateDocument<T>(string indexName,Tt) where T:class { var log=_loggerFac.CreateLogger<ElasticsearchService>(); if(t==null) { log.LogError("bulk 引數不能為空。"); return null; } IndexRequest<T> request = new IndexRequest<T>(indexName, TypeName.From<T>()) { Document = t }; var createResponse = await _elasticClient.CreateDocumentAsync<T>(t); log.LogInformation(createResponse.DebugInformation); if (createResponse.ApiCall.Success) { log.LogInformation("index:{0},type:{1},建立成功", createResponse.Index, createResponse.Type); return createResponse; } else { log.LogError(createResponse.ServerError.ToString()); log.LogError(createResponse.OriginalException.ToString()); return null; } } } }
poco型別,這個類會和index的typ相關聯的:
using System; using Nest; namespace Walt.Framework.Console { [ElasticsearchTypeAttribute(Name="LogElasticsearchDefaultType")] //可以使用型別生成和查詢type public class LogElasticsearch { public string Id { get; set; } public DateTime Time { get; set; } public string LogLevel{ get; set; } public string Exception{ get; set; } public string Mess{ get; set; } } }
然後就是執行我們console後臺程式,就可以在kibana看到日誌被同步的情況:
所有程式都提交到github,如果除錯程式碼,再看這篇文章,或許理解能更快。