SODBASE CEP學習進階篇(二):日誌採集-Flume
在IT系統運維和效能監控中,常常需要對日誌進行分析,得到系統的故障點或效能瓶頸。採用現成的日誌分析軟體,通常著重於監測節點和網路狀態,幾乎都難以滿足大型應用系統對故障點或效能瓶頸分析的要求。
舉幾個例子:
(1)找出故障的上下層呼叫的關係,定位應用層故障對應的底層介面
(2)分析父子呼叫的時間差,找出效能瓶頸
(3)分析指定系統呼叫和服務的響應時間、是否超時
SODBASE CEP可以處理各類複雜的日誌實時分析和圖表顯示功能。使用者可以自己定義日誌服務介面,採集日誌資料,也可以用一些日誌採集客戶端,如flume、splunk等,完成採集功能。
1.操作步驟
(1)Windows環境(Linux環境類似),安裝JDK1.6+
解壓點選cepstudio.exe開啟
在SODBASE Studio中點選選單“檔案”->“匯入" 選擇loganalysis.sod
在工作區面板空白處,右鍵點選測試執行
(3)下載apache-flume-1.5.2-bin.zip,解壓到E:\software或其它自定義目錄。將SODBASE Studio lib目錄下的sodbase-cep-engine.jar,sodbase-studio.jar,sodbase-dataadaptor-socket.jar,sodbase-dataadaptor-flume.jar拷貝到flume的lib目錄下
用記事本代開flume的bin/flume-win.bat,需要編輯兩個地方:FLUME_HOME,flume解壓後的目錄;JAVA_HOME,設定自己的JDK安裝目錄。
(4)執行flume-win.bat
(5)結果輸出。在Studio中可以看到接收的日誌結果,如下圖所示
(6)SODBASE Studio主要用於建模和測試。如果想把日誌分析功能部署到伺服器,請參考
示例操作完成,如果想了解工作原理,請看下文。
2.工作原理
flume比較其它日誌採集客戶端的優點是Java編寫跨平臺,輕量級。本文用flume作為日誌採集客戶端,將日誌資訊傳送到SODBASE CEP引擎。示例中,我們實現一個將資料通過socket傳輸到CEP引擎的Sink。在CEP引擎中通過socket輸入介面卡負責接收資料。
需要用到的類庫有sodbase-cep-engine.jar,sodbase-studio.jar,sodbase-dataadaptor-socket.jar,sodbase-dataadaptor-flume.jar, 執行時放到flume的lib目錄下即可。注:sodbase-cep-engine.jar在flume中用的版本需要和CEPserver中的版本一致,保證物件能夠正常解序列化。
package com.sodbase.dataadaptor.flume;
import java.io.IOException;
import java.util.Date;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import com.sodbase.outputadaptor.socket.SocketUtil;
import zstreamplus.eventbuffer.PrimitiveEvent;
import zstreamplus.eventbuffer.ValueType;
public class CEPServerSink extends AbstractSink implements Configurable
{
private String CEPServerSocketIpPort;
private String retrynum;
private static final String DEFAULT_ENCODING = "UTF-8";
private SocketUtil socketUtil=new SocketUtil();
@Override
public void configure(Context context)
{
/**
* 在flume-conf.properties中配置
*/
CEPServerSocketIpPort = context.getString("CEPServerSocketIpPort",
"localhost:12345");
retrynum = context.getString("CEPServerSocketRetryNum", "30");
}
@Override
public void start()
{
socketUtil.setRunning(true);
}
@Override
public void stop()
{
socketUtil.setRunning(false);
if (socketUtil.getClient() != null)
try
{
socketUtil.getClient().close();
} catch (IOException e)
{
e.printStackTrace();
}
}
@SuppressWarnings("unchecked")
@Override
public Status process() throws EventDeliveryException
{
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try
{
Event event = ch.take();
//prepare the log data
String eventData = new String(event.getBody(), DEFAULT_ENCODING);
PrimitiveEvent primitiveEvent = new PrimitiveEvent();
ValueType valueType = new ValueType(eventData, "string");
primitiveEvent.getAttributeMap().put("flumeeventdata", valueType);
Date date = new Date();
long time = date.getTime();
primitiveEvent.setStart_ts(time);
primitiveEvent.setEnd_ts(time);
//transfer data to cep server
String[] address = CEPServerSocketIpPort.split(":");
socketUtil.setIp(address[0]);
socketUtil.setPort(address[1]);
socketUtil.setRetrynum(retrynum);
socketUtil.outputPrimitiveEvent(primitiveEvent);
txn.commit();
status = Status.READY;
} catch (Throwable t)
{
txn.rollback();
status = Status.BACKOFF;
if (t instanceof Error)
{
throw (Error) t;
}
} finally
{
txn.close();
}
return status;
}
}
linux的flume啟動呼叫sh指令碼即可,windows中可參考下面指令碼
set FLUME_HOME=E:\software\apache-flume-1.5.2-bin\apache-flume-1.5.2-bin
set JAVA_HOME=D:\Program Files\Java\jdk1.7.0_51
set JAVA="%JAVA_HOME%\bin\java.exe"
set JAVA_OPTS=-Xmx256m
set CONF=%FLUME_HOME%\conf\flume-cep-conf.properties
set AGENT=agent
%JAVA% %JAVA_OPTS% -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 -Dlog4j.configuration=file:\\\%FLUME_HOME%\conf\log4j.properties -cp "%FLUME_HOME%\lib\*" org.apache.flume.node.Application -f %FLUME_HOME%\conf\flume-cep-conf.properties -n %AGENT%
flume-cep-conf.properties
agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = loggerSink
# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = seq
# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.loggerSink.type = logger
#Specify the channel the sink should use
agent.sinks.loggerSink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100
agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = loggerSink
agent.sources.seqGenSrc.type = seq
agent.sources.seqGenSrc.channels = memoryChannel
agent.sources.seqGenSrc.type = exec
agent.sources.seqGenSrc.command = cmd.exe /c echo test
agent.sources.seqGenSrc.restart = true
agent.sources.seqGenSrc.restartThrottle = 1000
agent.sources.seqGenSrc.batchSize = 100
agent.sinks.loggerSink.type = com.sodbase.dataadaptor.flume.CEPServerSink
agent.sinks.loggerSink.channel = memoryChannel
agent.sinks.loggerSink.CEPServerSocketIpPort=localhost:12345
agent.sinks.loggerSink.CEPServerSocketRetryNum=2
agent.channels.memoryChannel.type = memory
command可以用tail命令在監測日誌,windows下有python版的命令。
日誌資料採集上來以後,資料分析和展示請參考