1. 程式人生 > >基於Spark streaming的SQL服務實時自動化運維

基於Spark streaming的SQL服務實時自動化運維

設計背景

spark thriftserver目前線上有10個例項,以往通過監控埠存活的方式很不準確,當出故障時程序不退出情況很多,而手動去檢視日誌再重啟處理服務這個過程很低效,故設計利用Spark streaming去實時獲取spark thriftserver的log,通過log判斷服務是否停止服務,從而進行對應的自動重啟處理,該方案能達到秒級 7 * 24h不間斷監控及維護服務。

設計架構

這裡寫圖片描述

  • 在需要檢測的spark thriftserver服務節點上部署flume agent來監控日誌流 (flume使用interceptor給日誌加host資訊)
  • flume收集的日誌流打入kafka
  • spark streaming接收kafka的日誌流,根據自定義關鍵詞檢測日誌內容,如果命中關鍵字則認為服務不可用,把該日誌對應的host資訊打入mysql
  • 寫一個shell指令碼從mysql讀取host資訊,執行重啟服務操作

軟體版本及配置

spark 2.0.1, kafka 0.10, flume 1.7

1)flume配置及命令:

修改flume-conf.properties

agent.sources = sparkTS070
agent.channels = c
agent.sinks = kafkaSink
# For each one of the sources, the type is defined
agent.sources.sparkTS070.type = TAILDIR agent.sources.sparkTS070.interceptors = i1 agent.sources.sparkTS070.interceptors.i1.type = host agent.sources.sparkTS070.interceptors.i1.useIP = false agent.sources.sparkTS070.interceptors.i1.hostHeader = agentHost # The channel can be defined as follows. agent.sources
.sparkTS070.channels = c agent.sources.sparkTS070.positionFile = /home/hadoop/xu.wenchun/apache-flume-1.7.0-bin/taildir_position.json agent.sources.sparkTS070.filegroups = f1 agent.sources.sparkTS070.filegroups.f1 = /data1/spark/logs/spark-hadoop-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-hadoop070.dx.com.out # Each sink's type must be defined agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.kafka.topic = mytest-topic1 agent.sinks.kafkaSink.kafka.bootstrap.servers = 10.87.202.51:9092 agent.sinks.kafkaSink.useFlumeEventFormat = true #Specify the channel the sink should use agent.sinks.kafkaSink.channel = c # Each channel's type is defined. agent.channels.c.type = memory

執行命令:
nohup bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties -Dflume.root.logger=INFO,LOGFILE &

2)kafka配置及執行命令:

修改config/server.properties

broker.id=1
listeners=PLAINTEXT://10.87.202.51:9092
log.dirs=/home/hadoop/xu.wenchun/kafka_2.11-0.10.0.1/kafka.log
zookeeper.connect=10.87.202.44:2181,10.87.202.51:2181,10.87.202.52:2181

執行命令
nohup bin/kafka-server-start.sh config/server.properties &

spark streaming執行命令 :

/opt/spark-2.0.1-bin-2.6.0/bin/spark-submit --master yarn-cluster --num-executors 3 --class SparkTSLogMonitor /tmp/mavenSparkProject.jar 10.87.202.51:9092 mytest-topic1

3)shell指令碼

寫一個shell指令碼從mysql讀取host資訊,執行重啟服務操作

spark streaming監控job的核心程式碼

這類分享spark streaming程式碼,以下程式碼經過一些坑摸索出來驗證可用。

      stream.foreachRDD { rdd =>
        rdd.foreachPartition { rddOfPartition =>
          val conn = ConnectPool.getConnection
          println(" conn:" + conn)
          conn.setAutoCommit(false)  //設為手動提交
          val  stmt = conn.createStatement()
          rddOfPartition.foreach { event =>
            val body = event.value().get()
            val decoder = DecoderFactory.get().binaryDecoder(body, null)
            val result = new SpecificDatumReader[AvroFlumeEvent](classOf[AvroFlumeEvent]).read(null, decoder)
            val hostname = result.getHeaders.get(new Utf8("agentHost"))
            val text = new String(result.getBody.array())

            if (text.contains("Broken pipe") || text.contains("No active SparkContext")) {
              val dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyyMMddhhmmssSSS")
              val id = dateFormat.format(new Date()) + "_" + (new util.Random).nextInt(999)
              stmt.addBatch("insert into monitor(id,hostname) values ('" + id + "','" + hostname + "')")
              println("insert into monitor(id,hostname) values ('" + id + "','" + hostname + "')")
            }
          }
          stmt.executeBatch()
          conn.commit()
          conn.close()
        }
      }

沒有監控的特例情況:服務jvm老年代滿了,日誌不打出來,在yarn上的註冊服務掛掉。

在kafka所在機器執行命令查offset情況

  • bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker –zookeeper localhost:2181 –group myspark –topic mytest-topic1 (ZooKeeper-based consumers)
  • bin/kafka-consumer-groups.sh –bootstrap-server localhost:9092 –describe –group myspark (non-ZooKeeper-based consumers)

(完)

以上是一個實時處理的典型入門應用,我個人工作中剛好遇到這類監控運維問題,於是採用該方案進行處理,效果不錯。