Scala精簡版Filebeat日誌採集
多功能Scala精簡Filebeat日誌採集版,可以自定功能。
依賴包
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.7' compile group: 'commons-io', name: 'commons-io', version: '2.6' compile group: 'com.google.guava', name: 'guava', version: '27.0.1-jre'
App.scala
import java.io.File import java.util.concurrent.{Executors, LinkedBlockingDeque, ScheduledExecutorService, TimeUnit} import com.google.common.cache.LoadingCache import org.apache.commons.io.filefilter.IOFileFilter object App { val scheduledThreadPool: ScheduledExecutorService = Executors.newScheduledThreadPool(5) var instalnces = Map[String, JobThread]() def main(args: Array[String]): Unit = { //過濾優先順序 includeSuffix => ignoreOlder => includePaths => excludePaths val dbPath = "/Users/lake/dounine/git/sr-galaxy-serv-loghub/db" val job = JobUtil.createJob( dbPath, "/Users/lake/dounine/git/sr-galaxy-serv-loghub/logs", "test",//任務名稱 "log,txt", //日誌字尾 "24h", //過期不更新檔案忽略 "1s", //監聽當前檔案變動頻率 "30s", //掃描目錄適合匹配條件的檔案頻率 ".*", //匹配路徑 "", //排除路徑 "24h" //關閉多久不活躍檔案控制代碼 ) instalnces += ("test" -> job) instalnces.values.foreach { scheduledThreadPool.execute } TimeUnit.SECONDS.sleep(10) scheduledThreadPool.shutdown() } } case class Job( jobName: String, //任務名稱 workPath: String, logPath: String, intervalFileStatus: String, intervalScanFile: String, dirFilter: IOFileFilter, fileFilter: IOFileFilter, linesBlockQueue: LinkedBlockingDeque[String], handlerFiles: LoadingCache[String, File], seekDB: LoadingCache[String, java.lang.Long] //檔案索引快取 )
JobThread.scala
import java.io.{File, RandomAccessFile} import java.util.concurrent.TimeUnit import java.util.function.Consumer import org.apache.commons.io.FileUtils import scala.collection.JavaConversions._ import scala.collection.mutable.ListBuffer class JobThread(job: Job) extends Runnable { initSeekDb(job.jobName) override def run(): Unit = { App.scheduledThreadPool.scheduleAtFixedRate(new Runnable { //scan match log file override def run(): Unit = { val dirFile = FileUtils.getFile(job.logPath) val logFiles = FileUtils.listFiles(dirFile, job.fileFilter, job.dirFilter) logFiles.forEach(new Consumer[File] { override def accept(logFile: File): Unit = { job.handlerFiles.put(logFile.getAbsolutePath, logFile) } }) } }, 0, JobUtil.getSecondsByAlias(job.intervalScanFile), TimeUnit.SECONDS) App.scheduledThreadPool.scheduleAtFixedRate(new Runnable { override def run(): Unit = { job.handlerFiles.asMap().values().forEach(new Consumer[File] { override def accept(t: File): Unit = { val fileLength = t.length() val fileDbSeek = job.seekDB.get(t.getAbsolutePath) val lines = if (fileDbSeek == -1) { //新檔案 readLinesForSeek(0, t) } else if (fileDbSeek < t.length()) { //檔案修改 readLinesForSeek(fileDbSeek, t) } else { //檔案未更新 Array[String]() } if (lines.nonEmpty) { lines.foreach(job.linesBlockQueue.add) } job.seekDB.put(t.getAbsolutePath, fileLength) } }) flushCacheSeekToDb() } }, 1, JobUtil.getSecondsByAlias(job.intervalFileStatus), TimeUnit.SECONDS) App.scheduledThreadPool.schedule(new Runnable { override def run(): Unit = { while (!App.scheduledThreadPool.isShutdown) { val line: String = job.linesBlockQueue.poll() if (null != line) { println(s"line = ${line}") } } } }, 1, TimeUnit.MILLISECONDS) } def flushCacheSeekToDb(): Unit = { val dbFile = FileUtils.getFile(s"${job.workPath}/${job.jobName}/seek.txt") val seekLines = FileUtils.readLines(dbFile, "utf-8") val tmpList = ListBuffer.empty ++= job.seekDB.asMap().keys var matchCount = 0 val matchLine = seekLines.map { line => { val lineInfos = line.split("\\:") val currentSeek = job.seekDB.get(lineInfos(0)) tmpList -= lineInfos(0) if (!currentSeek.equals(lineInfos(1).toLong)) { //seek索引不相同,更新 matchCount += 1 s"${lineInfos(0)}:${currentSeek}" } else { line } } }.toList ++ tmpList.map { //插入沒有匹配到的檔案,新檔案 filePath => { matchCount += 1 filePath + ":" + job.seekDB.get(filePath) } } if (matchCount > 0) { FileUtils.writeLines(dbFile, matchLine, false) } } def initSeekDb(dbName: String): Unit = { val dbFold = FileUtils.getFile(s"${job.workPath}/${job.jobName}") if (!dbFold.exists()) { dbFold.mkdirs() } val dbFile = FileUtils.getFile(s"${job.workPath}/${job.jobName}/seek.txt") if (!dbFile.exists()) { dbFile.createNewFile() } } def readLinesForSeek(seek: Long, file: File): Array[String] = { val randomFile = new RandomAccessFile(file, "r") randomFile.seek(seek) val byteList = new Array[Byte]((file.length() - seek).toInt) randomFile.readFully(byteList) randomFile.close() new String(byteList, "utf-8").split("\n") } }
JobUtil.scala
import java.io.File import java.util.concurrent.{LinkedBlockingDeque, TimeUnit} import com.google.common.cache.{CacheBuilder, CacheLoader} import org.apache.commons.io.FileUtils import org.apache.commons.io.filefilter.{FileFilterUtils, IOFileFilter} object JobUtil { def getSecondsByAlias(alias: String): Long = { val value = alias.substring(0, alias.length - 1).toLong alias.reverse.charAt(0) match { case 's' => value case 'm' => value * 60 case 'h' => value * 60 * 60 case 'd' => value * 60 * 60 * 24 * 30 case default@_ => default.toLong } } def createJob(workPath: String, logPath: String, jobName: String, includeSuffix: String, ignoreOlder: String, intervalFileStatus: String, intervalScanFile: String, includePaths: String, excludePaths: String, handlerFileClose: String): JobThread = { val ignoreOlderSeconds = getSecondsByAlias(ignoreOlder) val suffixTypeFilters = includeSuffix.split(",").map { suff => FileFilterUtils.suffixFileFilter(suff) } val ignoreOlderFilter = new IOFileFilter { override def accept(file: File): Boolean = (System.currentTimeMillis() - file.lastModified()) / 1000 <= ignoreOlderSeconds override def accept(file: File, s: String): Boolean = true } val includeExcludeFilter = new IOFileFilter { override def accept(file: File): Boolean = { val excludePathsMatch = excludePaths.split(",").flatMap { excludePath => { if (file.getAbsolutePath.matches(excludePath)) { Array(x = true) } else { Array[Boolean]() } } }.contains(true) val includePathsMatch = includePaths.split(",").flatMap { excludePath => { if (file.getAbsolutePath.matches(excludePath)) { Array(x = true) } else { Array[Boolean]() } } }.contains(true) includePathsMatch || !excludePathsMatch } override def accept(file: File, s: String): Boolean = true } val fileFilter = FileFilterUtils.and( FileFilterUtils.or(suffixTypeFilters: _*), //檔案字尾匹配 ignoreOlderFilter, //忽略指定時間段以外修改的日誌內容 includeExcludeFilter ) val handlerFiles = CacheBuilder.newBuilder() .expireAfterWrite(getSecondsByAlias(handlerFileClose), TimeUnit.SECONDS) .build[String, File](new CacheLoader[String, File] { override def load(k: String): File = { null } }) val seekDB = CacheBuilder.newBuilder() .expireAfterWrite(24, TimeUnit.HOURS) .build[String, java.lang.Long](new CacheLoader[String, java.lang.Long] { override def load(path: String): java.lang.Long = { val dbFile = FileUtils.getFile(s"${workPath}/${jobName}/seek.txt") val seekLines = FileUtils.readLines(dbFile, "utf-8") import scala.collection.JavaConversions._ val matchLine = seekLines.flatMap { line => { if (line.split("\\:")(0).equals(path)) { Array(line) } else { Array[String]() } } } if (matchLine.nonEmpty) { matchLine.head.split(":")(1).toLong } else { -1L } } }) new JobThread(Job( jobName, workPath, logPath, intervalFileStatus, intervalScanFile, FileFilterUtils.directoryFileFilter(), fileFilter, new LinkedBlockingDeque[String](), handlerFiles, seekDB )) }