1. 程式人生 > >logstash nested內嵌欄位 field protobuf解碼 codec 的解決辦法

logstash nested內嵌欄位 field protobuf解碼 codec 的解決辦法

logstash nested內嵌欄位 field protobuf解碼 codec 的解決辦法

主要需求

logstash-codec 下
https://www.elastic.co/guide/en/logstash/6.3/codec-plugins.html
此類解碼器

只能應用在原始資料上
比如 https://www.elastic.co/guide/en/logstash/6.3/plugins-codecs-protobuf.html
kafka

{
 zk_connect => "127.0.0.1"
 topic_id => "your_topic_goes_here"
 codec 
=> protobuf { class_name => "Animal::Unicorn" include_path => ['/path/to/protobuf/definitions/UnicornProtobuf.pb.rb'] } }

 

對從kafka獲取的原始資料進行解碼

無法對應原始資料內部的某個欄位解碼


現在有一個應用場景 內嵌的某個欄位是

encode_str= base64_encode(protobuf_encode())

{
    "name":"cclient",
    "encode_str":".....
" }

 


codec-plugins 只能應用在完整資料上,而無法應用在encode_str欄位上


官方提供一個相關功能的filter
https://www.elastic.co/guide/en/logstash/6.3/plugins-filters-json.html

對內嵌的json欄位串 進行json解析

首選的辦法

1 自已實現相關外掛,完成特定解碼工作,實際工作量不大,因為logstash的官方外掛都開源

https://github.com/logstash-plugins/logstash-codec-protobuf
https://github.com/logstash-plugins/logstash-filter-json
以我的需求為例,結合logstash-codec-protobuf 和 logstash-filter-json
就能實現一套對內嵌protobuf欄位的解碼工具
這也是首先想到的辦法,但沒有執行,主要考慮到,這種場景有一定的通用性
這裡是protobuf,如果換成avro,又需要另寫一個外掛
所以這個方案先放一放(放了一個晚上,同事就已經實現,一共也只花了幾小時)


2 這個方案初期只是猜想,並未驗證,思路是,既然官方已經提供了許多的codec,同時提供了ruby filter https://www.elastic.co/guide/en/logstash/6.3/plugins-filters-ruby.html,可否通過ruby filter,呼叫相應的codec來實現相應的解碼功能?


由於同事已經實現了方案一,我主要就把精力放在方案2的驗證上,先放結果

*可行。

畢竟不是資深的ruby開發,踩了一些坑,多花了些時間

驗證步驟

1 首先是驗證ruby是否可引用包

網上有很多引用包的資料,但主要是官方包

首先驗證json解析

def json_decode(bin)
  return  LogStash::Json.load(bin)
end

 

* 官方包可行。
* 第三方未知。

2 嘗試引用Protobuf的包,並看可否解碼


這裡參考了logstash-codec-protobuf的原始碼
實現
https://github.com/logstash-plugins/logstash-codec-protobuf/blob/master/lib/logstash/codecs/protobuf.rb

 

def register
    @metainfo_messageclasses = {}
    @metainfo_enumclasses = {}
    @metainfo_pb2_enumlist = []
    include_path.each { |path| load_protobuf_definition(path) }
    if @protobuf_version == 3   
      @pb_builder = Google::Protobuf::DescriptorPool.generated_pool.lookup(class_name).msgclass
    else
      @pb_builder = pb2_create_instance(class_name)
    end
end

def decode(data)
    if @protobuf_version == 3
      decoded = @pb_builder.decode(data.to_s)
      h = pb3_deep_to_hash(decoded)
    else
      decoded = @pb_builder.parse(data.to_s)
      h = decoded.to_hash        
    end
    yield LogStash::Event.new(h) if block_given?
  rescue => e
    @logger.warn("Couldn't decode protobuf: #{e.inspect}.")
    if stop_on_error
      raise e
    end
end # def decode

 


測試用例
https://github.com/logstash-plugins/logstash-codec-protobuf/blob/master/spec/codecs/protobuf_spec.rb

    let(:plugin_unicorn) { LogStash::Codecs::Protobuf.new("class_name" => "Animal::Unicorn", "include_path" => [pb_include_path + '/pb2/unicorn.pb.rb'])  }
    before do
        plugin_unicorn.register      
    end

    it "should return an event from protobuf encoded data" do
    
      data = {:colour => 'rainbow', :horn_length => 18, :last_seen => 1420081471, :has_wings => true}
      unicorn = Animal::Unicorn.new(data)
        
      plugin_unicorn.decode(unicorn.serialize_to_string) do |event|
        expect(event.get("colour") ).to eq(data[:colour] )
        expect(event.get("horn_length") ).to eq(data[:horn_length] )
        expect(event.get("last_seen") ).to eq(data[:last_seen] )
        expect(event.get("has_wings") ).to eq(data[:has_wings] )
      end
    end # it

 

看到這裡就覺得方案大概率能行的通

 

嘗試引用第三方包

require "logstash/codecs/protobuf"
plugin_unicorn = LogStash::Codecs::Protobuf.new("class_name" => "Tutorial::Person", "include_path" => ["/usr/share/logstash/protobuf.pb.rb","/usr/share/logstash/addressbook.pb.rb"],"protobuf_version" => 2)
plugin_unicorn.register

def base64_decode(bin)
  return Base64.decode64(bin)
end

def protobuf_decode(bin)
  return protobuf_event=plugin_unicorn.decode(bin)
end

 

啟動成功
plugin_unicorn.register 是方法呼叫

*第三方包引用正常(前提是安裝了這個包,例require "logstash/codecs/protobuf" 前提是 logstash-plugin install logstash-codec-protobuf)

但執行時報錯,沒有plugin_unicorn 空引用,猜測是作用域問題

查了下ruby的資料,plugin_unicorn 改為$plugin_unicorn(表示全域性作用域),後沒有空引用錯誤

require "logstash/codecs/protobuf"
$plugin_unicorn = LogStash::Codecs::Protobuf.new("class_name" => "Tutorial::Person", "include_path" => ["/usr/share/logstash/protobuf.pb.rb","/usr/share/logstash/addressbook.pb.rb"],"protobuf_version" => 2)
$plugin_unicorn.register

def base64_decode(bin)
  return Base64.decode64(bin)
end

def protobuf_decode(bin)
  return $protobuf_event=plugin_unicorn.decode(bin)
end

 

新問題是protobuf_decode返回為null,主要時間都花在這個問題上

首先樣例資料是同事給的,確認了資料沒有問題,調整驗證了protobuf的版本,不行,最後試著自已生成了樣例資料,問題依舊


注意力都在decode 方法上

def decode(data)
    if @protobuf_version == 3
      decoded = @pb_builder.decode(data.to_s)
      h = pb3_deep_to_hash(decoded)
    else
      decoded = @pb_builder.parse(data.to_s)
      h = decoded.to_hash        
    end
    yield LogStash::Event.new(h) if block_given?
  rescue => e
    @logger.warn("Couldn't decode protobuf: #{e.inspect}.")
    if stop_on_error
      raise e
    end
end # def decode

 

試了多種情況都不成功,參照decode實現了decode_str

def decode_str(data_str)
    @logger.info("cdp source: #{data_str}.")
    if @protobuf_version == 3
      decoded = @pb_builder.decode(data_str)
      h = pb3_deep_to_hash(decoded)
      return h      
    else
      decoded = @pb_builder.parse(data_str)
      h = decoded.to_hash        
      @logger.info("cdp decoded: #{h}.")
      return h
    end
end # def decode

 

decode_str 成功解碼

就原始需求來說,問題解決了,但說白了這是在原始外掛上作定製,仍然是方案1,方案2的驗證,還未結束

測試改程式碼的時候突然注意到了一個小區別

yield 和 return

ruby的yield細節還不清楚,但任python和nodejs的經驗,yield和return的行為不一樣是一定的,yield不會返回結果,並通常有個類似next的方法,yield應該是結果null的原因

簡單瞭解ruby yield的相關資料

def protobuf_decode(bin)
  return $protobuf_event=plugin_unicorn.decode(bin)
end

 

改為

def protobuf_decode(bin)
  $plugin_unicorn.decode(bin) do |event|
    return event.to_hash
  end
end

 

方案二可行,完美

回頭再看,yield的坑,完全可以避免


官方原始碼裡的測試用例,有明顯的do |event|,因為不熟悉ruby 直覺的寫成了return,導致多花了很多時間排查

      plugin_unicorn.decode(unicorn.serialize_to_string) do |event|
        expect(event.get("colour") ).to eq(data[:colour] )
        expect(event.get("horn_length") ).to eq(data[:horn_length] )
        expect(event.get("last_seen") ).to eq(data[:last_seen] )
        expect(event.get("has_wings") ).to eq(data[:has_wings] )
      end

 

方案三
方案一是對單獨的codec進行外掛開發和定製
方案二驗證可行,已經完全滿足要求,且有一定的通用性,但是雖然不必為每種codec作定製,仍需簡單修改相關程式碼,作外掛的初始化(每個外掛的引數並不相同)

plugin_unicorn = LogStash::Codecs::Protobuf.new("class_name" => "Tutorial::Person", "include_path" => ["/usr/share/logstash/protobuf.pb.rb","/usr/share/logstash/addressbook.pb.rb"],"protobuf_version" => 2)

 

是否可以開發一個過濾器,通過配置(配置沿用官方外掛),完成對所有codes的解析?

這個待有精力再研究