1. 程式人生 > >spark讀取kafka資料寫入hbase

spark讀取kafka資料寫入hbase

package com.prince.demo.test

import java.util.UUID

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by prince on 2018/5/17.
  */
object ReadKafka {

  Logger.getLogger("org").setLevel(Level.WARN)
  implicit val conf: Config = ConfigFactory.load

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder.appName("ReadKafka").master("local[*]").getOrCreate()
    val sparkContext = spark.sparkContext
    val ssc = new StreamingContext(sparkContext, Seconds(10))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> conf.getString("kafka.brokers"),
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> conf.getString("kafka.group"),
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean))

    val topic = "topic883"
    val topics = Array(topic)
    val stream = KafkaUtils
      .createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

    val hbaseConf = HBaseConfiguration.create()

    hbaseConf.set("hbase.zookeeper.quorum", "slave5.hadoop,slave6.hadoop,slave7.hadoop")
    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")

    val tableName = "circle"

    val jobConf = new JobConf(hbaseConf)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

    val input = stream.flatMap(line => {
      Some(line.value.toString)
    })

    input.foreachRDD(rdd => {
      if (!rdd.isEmpty()){
        val spark1 = SparkSession.builder.getOrCreate
        val df = spark1.read.json(rdd)
        df.createOrReplaceTempView("temp")
        val ans = spark1.sql("select time,token from temp").rdd.map(x => {
          (UUID.randomUUID.toString, x.getString(0), x.getString(1))
        })
        ans.map(line =>{
          val put = new Put(Bytes.toBytes(line._1))
          put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("id"), Bytes.toBytes(line._2))
          put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("token"), Bytes.toBytes(line._3))
          (new ImmutableBytesWritable, put)
        }).saveAsHadoopDataset(jobConf)
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

kafka中傳輸的資料格式為:

{"id":"b2e4a1bb-741b-4700-8b03-18c06a258","token":"h981bd53a475b4edc9b0ad5f72870b03","time":"1503364337536"}

模擬向kafka中寫資料的過程:

package com.prince.demo.kafka

import java.util.Properties

import com.typesafe.config.ConfigFactory
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import org.apache.log4j.{Level, Logger}
import org.codehaus.jettison.json.JSONObject

import scala.util.Random

/**
  * 模擬資料
  * Created by prince on 2017/9/11.
  */
object KafkaProducerData {

  Logger.getLogger("org").setLevel(Level.WARN)
  private val userid = Array(
    "b2e4a1bb-741b-4700-8b03-18c06a298", "b2e4a1bb-741b-4700-8b03-18c06a232",
    "b2e4a1bb-741b-4700-8b03-18c06a224", "b2e4a1bb-741b-4700-8b03-18c06a258",
    "b2e4a1bb-741b-4700-8b03-18c06a280", "b2e4a1bb-741b-4700-8b03-18c06a248",
    "b2e4a1bb-741b-4700-8b03-18c06a275", "b2e4a1bb-741b-4700-8b03-18c06a266",
    "b2e4a1bb-741b-4700-8b03-18c06a268", "b2e4a1bb-741b-4700-8b03-18c06a212"
  )

  private val tokens = Array(
    "v2ff37ca54eda70e0c1b8902626cb6dd", "fb751fb989ce159e3ee5149927176474",
    "af89557c629d6b7af43378df4b8f30d9", "n3f164f9e9999eefa13064ac1e864fd8",
    "zbd6f5791a99249c3a513b21ce835038", "dc6470493c3c891db6f63326b19ef482",
    "k2917b1b391186ff8f032f4326778ef7", "ca796f74ee74360e169fc290f1e720c7",
    "h981bd53a475b4edc9b0ad5f72870b03", "p4064d445c9f4ff4d536dfeae965aa95"
  )

  private val time = Array(
    "1503364335202", "1503364335776",
    "1503364336578", "1503364337536",
    "1503364336340", "1503364335832",
    "1503364336726", "1503364336387",
    "1503364336691", "1503364335857"
  )

  private val random = new Random()

  def point(): Int = {
    random.nextInt(10)
  }

  def getId(): String = {
    userid(point())
  }

  def getToken(): String = {
    tokens(point())
  }

  def getTime(): String = {
    time(point())
  }

  def main(args: Array[String]): Unit = {
    implicit val conf = ConfigFactory.load
    val topic = "topic883"
    val brokers = conf.getString("kafka.brokers")
    val props = new Properties()
    props.put("metadata.broker.list", brokers)
    props.put("serializer.class", "kafka.serializer.StringEncoder")

    val kafkaConfig = new ProducerConfig(props)
    val producer = new Producer[String, String](kafkaConfig)

    while (true) {
      val event = new JSONObject()
      event
        .put("id", getId())
        .put("token", getToken())
        .put("time", getTime())
      producer.send(new KeyedMessage[String, String](topic, "key", event.toString))
      println("Message sent: " + event)

      Thread.sleep(1000)
    }
  }
}