1. 程式人生 > >spark處理資料寫入kafka

spark處理資料寫入kafka

  1. 首先,我們需要將KafkaProducer利用lazy val的方式進行包裝如下:
package com.eitcloud.util

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import scala.collection.JavaConversions._

class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V
]) extends Serializable { /* This is the key idea that allows us to work around running into NotSerializableExceptions. */ lazy val producer = createProducer() def send(topic: String, key: K, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, key, value)) def
send(topic: String, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, value)) } object KafkaSink { def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = { val createProducerFunc = () => { val producer = new KafkaProducer[K, V](config) sys.addShutdownHook
{ // Ensure that, on executor JVM shutdown, the Kafka producer sends // any buffered messages to Kafka before shutting down. producer.close() } producer } new KafkaSink(createProducerFunc) } def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap) }
  1. 2、之後我們利用廣播變數的形式,將KafkaProducer廣播到每一個executor,在每個executor中愉快的將資料輸入到kafka當中:
package com.eitcloud.Entrance

import java.util.Properties

import breeze.numerics.log
import com.eitcloud.util.{KafkaOut, KafkaSink}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Test2 {

  def main(args: Array[String]): Unit = {
    //取消列印多餘日誌
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    val conf = new SparkConf()
    conf.setExecutorEnv("SPARK_JAVA_OPTS", " -Xms8024m -Xmx12040m -XX:MaxPermSize=30840m")
    conf.setMaster("local[4]")
    conf.setAppName(s"${this.getClass.getSimpleName}")
    val sc: SparkContext = new SparkContext(conf)
    val rdd: RDD[String] = sc.parallelize(Array("1","2","4","5","6"))
//KafkaOut.outPut(rdd,sc)
//    rdd.collect()
     //廣播KafkaSink
    val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
      val kafkaProducerConfig = {
        val p = new Properties()
        p.setProperty("bootstrap.servers", "192.168.2.116:9092")
        p.setProperty("key.serializer", classOf[StringSerializer].getName)
        p.setProperty("value.serializer", classOf[StringSerializer].getName)
        p
      }
      sc.broadcast(KafkaSink[String, String](kafkaProducerConfig))
    }

    //輸出到kafka
rdd.foreach(record=>{
      kafkaProducer.value.send("lili", record)
    })

  }

}

相關推薦

spark處理資料寫入kafka

首先,我們需要將KafkaProducer利用lazy val的方式進行包裝如下:package com.eitcloud.util import java.util.concurrent.Future import org.apache.kafka.clients.pro

使用spark資料寫入Hbase

--------------組裝xml並捕獲異常------------------- package wondersgroup_0628.com import java.io.{IOException, PrintWriter, StringReader, StringWriter} imp

flume讀取日誌資料寫入kafka 然後kafka+storm整合

一、flume配置 flume要求1.6以上版本 flume-conf.properties檔案配置內容,sinks的輸出作為kafka的product a1.sources = r1 a1.sinks = k1 a1.cha

flume讀取日誌資料寫入kafka

一、flume配置 flume要求1.6以上版本 flume-conf.properties檔案配置內容,sinks的輸出作為kafka的product a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/

spark資料寫入ES(ElasticSearch)終極總結

簡介 spark接入ES可以使用多種方式,常見型別如下。 將Map物件寫入ElasticSearch 將case class 類物件寫入ElasticSearch 將Json的字串寫入ElasticSearch 本文主要介紹將case class 類物件寫入El

Spark資料寫入Hbase以及從Hbase讀取資料

本文將介紹 1、spark如何利用saveAsHadoopDataset和saveAsNewAPIHadoopDataset將RDD寫入hbase 2、spark從hbase中讀取資料並轉化為RDD 操作方式為在eclipse本地執行spark連線到遠端的hbase。 ja

Spark SQL大資料處理寫入Elasticsearch

1 # coding: utf-8 2 import sys 3 import os 4 5 pre_current_dir = os.path.dirname(os.getcwd()) 6 sys.path.append(pre_current_dir) 7 from pyspark.sq

spark streaming 接收kafka資料寫入Hive分割槽表

直接上程式碼 object KafkaToHive{ def main(args: Array[String]){ val sparkConf = new SparkConf().setAppName("KafkaToHive") val sc = new SparkConte

Spark Streaming的foreachRDD把處理後的資料寫入外部儲存系統中

1.程式碼 dstream.foreachRDD { rdd =>   rdd.foreachPartition { partitionOfRecords =>     // ConnectionPool is a static, lazily initial

spark讀取kafka資料寫入hbase

package com.prince.demo.test import java.util.UUID import com.typesafe.config.{Config, ConfigFactory} import org.apache.hadoop.hbase.HBa

Spark SQL大數據處理寫入Elasticsearch

可能 value exc ima dirname .py _file__ down show SparkSQL(Spark用於處理結構化數據的模塊) 通過SparkSQL導入的數據可以來自MySQL數據庫、Json數據、Csv數據等,通過load這些數據可以對其做一系列計算

38套大資料,雲端計算,架構,資料分析師,Hadoop,Spark,Storm,Kafka,人工智慧,機器學習,深度學習,專案實戰視訊教程

38套大資料,雲端計算,架構,資料分析師,Hadoop,Spark,Storm,Kafka,人工智慧,機器學習,深度學習,專案實戰視訊教程 視訊課程包含: 38套大資料和人工智慧高階課包含:大資料,雲端計算,架構,資料探勘實戰,實時推薦系統實戰,電視收視率專案實戰,實時流統計專案實戰,離線電

使用spark將記憶體中的資料寫入到hive表中

使用spark將記憶體中的資料寫入到hive表中 hive-site.xml <?xml version="1.0" encoding="UTF-8" standalone="no"?> <?xml-stylesheet type="text/xsl" href="configurati

相同資料來源情況下,使用Kafka實時消費資料 vs 離線環境下全部落表後處理資料,結果存在差異

原因分析: 當某個consumer宕機時,消費位點(例如2s提交一次)尚未提交到zookeeper,此時Kafka叢集自動rebalance後另一consumer來接替該宕機consumer繼續消費,因為先前宕機consumer最近的消費位點尚未提交,導致資料重複消費 突發流量、跨機房(網路請求延

Spark Streaming+Kafka spark 寫入 kafka

目錄 前言 在WeTest輿情專案中,需要對每天千萬級的遊戲評論資訊進行詞頻統計,在生產者一端,我們將資料按照每天的拉取時間存入了Kafka當中,而在消費者一端,我們利用了spark streaming從kafka中不斷拉取資料進行詞頻統計。本文首先對spark stre

Spark SQL將資料寫入Mysql表的一些坑

轉自:https://blog.csdn.net/dai451954706/article/details/52840011/  最近,在使用Spark SQL分析一些資料,要求將分析之後的結果資料存入到相應的MySQL表中。     但是將資料處理完了之後,存

如何使用Spark快速將資料寫入Elasticsearch

如何使用Spark快速將資料寫入Elasticsearch 說到資料寫入Elasticsearch,最先想到的肯定是Logstash。Logstash因為其簡單上手、可擴充套件、可伸縮等優點被廣大使用者接受。但是尺有所短,寸有所長,Logstash肯定也有它無法適用的應用場景,比如:

簡單實現kafka資料寫入hbase

測試資料格式 19392963501,17816115082,2018-09-18 16:19:44,1431 19392963501,17816115082,2018-09-18 16:19:44,1431 14081946321,13094566759,2018-05-23

使用spark資料以bulkload的方式寫入Hbase時報錯

Exception in thread "main" java.io.IOException: Trying to load more than 32 hfiles to one family of one region 從報錯日誌中可以很明顯看出因為Hfiles的個數超出了32預設的時32

spark處理資料的幾個例項介紹

在叢集中跑應用,而不是在shell中感受寫spark應用的過程 整個過程為:1、案例分析:要用哪些spark的RDD的API2、程式設計實現: 用到scala3、提交到叢集執行:如何提交到叢集,檔案是否先傳到HDFS上4、監控執行結果: 通過web可以看到 介紹了四個案例: