1. 程式人生 > >SparkStreaming向Hbase中寫資料

SparkStreaming向Hbase中寫資料

在SparkStreaming中統計了資料之後,我們需要將結果寫入外部檔案系統。

首先,需要說一下,下面的這個方法。

foreachRDD(func)

最通用的輸出操作,把func作用於從stream生成的每一個RDD。

注意:這個函式是在 執行streaming程式的driver程序 中執行的。

下面跟著思路,看一下,怎麼優雅的向Hbase中寫入資料

向外部寫資料 常見的錯誤:

向外部資料庫寫資料,通常會建立連線,使用連線傳送資料(也就是儲存資料)。

開發者可能 在driver中建立連線,而在spark worker 中儲存資料

例如:
dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // 這個會在driver中執行
  rdd.foreach { record =>
    connection.send(record) //這個會在 worker中執行
  }
}
上面這種寫法是錯誤的!上面的寫法,需要connection 物件被序列化,然後從driver傳送到worker。

這樣的connection是很少在機器之間傳輸的。知道這個問題後,我們可以寫出以下的,修改後的程式碼:
dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}
很遺憾!這種寫法也是不對的。這會導致,對於每條資料,都建立一個connection(建立connection是消耗資源的)。

下面的方法會好一些:
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}
上面的方法,使用 rdd.foreachPartition 建立一個connection 物件, 一個RDD分割槽中的所有資料,都使用這一個connection。

是不是,很機智啊~~~

事實上,還可以更機智點

在多個RDD之間,connection物件是可以重用的,所以可以建立一個連線池。如下:
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool是一個靜態的,延遲初始化的連線池
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // 返回到池中 以便別人使用  }
}

注意:連線池中的連線應該是,應需求而延遲建立,並且,如果一段時間沒用,就超時了(也就是關閉該連線)

到此,SparkStreaming向外部資料庫寫資料的原理就講完了。

補充

看大家對spark連線HBase的ConnectionPool工具類有需求,在這裡在補充一下:

Hbase通用連線類
Scala連線Hbase是通過zookeeper獲取資訊,所以在配置時需要提供zookeeper的相關資訊,如下:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.client.ConnectionFactory

object HbaseUtil extends Serializable {
  private val conf = HBaseConfiguration.create()
  private val para = Conf.hbaseConfig // Conf為配置類,獲取hbase的配置
  conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, para.get("port").getOrElse("2181"))
  conf.set(HConstants.ZOOKEEPER_QUORUM, para.get("quorum").getOrElse("127-0-0-1"))  // hosts
  private val connection = ConnectionFactory.createConnection(conf)

  def getHbaseConn: Connection = connection
}

Hbase並沒有使用連線池

Hbase輸出操作
以put操作為例,將上述設計模式應用到Hbase輸出操作當中:

dstream.foreachRDD(rdd => {
  if (!rdd.isEmpty) {
    rdd.foreachPartition(partitionRecords => {
        val connection = HbaseUtil.getHbaseConn // 獲取Hbase連線
        partitionRecords.foreach(data => {
            val tableName = TableName.valueOf("tableName")
            val t = connection.getTable(tableName)
            try {
              val put = new Put(Bytes.toBytes(_rowKey_)) // row key
              // column, qualifier, value
              put.addColumn(_column_.getBytes, _qualifier_.getBytes, _value_.getBytes)
              Try(t.put(put)).getOrElse(t.close())
              // do some log(顯示在worker上)
            } catch {
              case e: Exception =>
                // log error
                e.printStackTrace()
            } finally {
              t.close()
            }
      })
    })
    // do some log(顯示在driver上)
  }
})
關於Hbase的其他操作可以參考官方文件

重點記錄在連線Hbase過程中配置HConstants.ZOOKEEPER_QUORUM的問題:

由於Hbase的連線不能直接使用ip地址進行訪問,往往需要配置hosts,例如我在上述程式碼段中127-0-0-1(任意),我們在hosts中需要配置

127-0-0-1 127.0.0.1

在單機情況下,我們只需要配置一臺zookeeper所在Hbase的hosts即可,但是當切換到Hbase叢集是遇到一個詭異的bug
問題描述:在foreachRDD中將Dstream儲存到Hbase時會卡住,並且沒有任何錯誤資訊爆出(沒錯!它就是卡住,沒反應)
問題分析:由於Hbase叢集有多臺機器,而我們只配置了一臺Hbase機器的hosts,這樣導致Spark叢集在訪問Hbase時不斷的去尋找但卻找不到就卡在那裡
解決方式:對每個worker上的hosts配置了所有hbase的節點ip,問題解決


MySQL通用連線類

import java.sql.Connection
import java.util.Properties

import com.mchange.v2.c3p0.ComboPooledDataSource

class MysqlPool extends Serializable {
  private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true)
  private val conf = Conf.mysqlConfig
  try {
    cpds.setJdbcUrl(conf.get("url").getOrElse("jdbc:mysql://127.0.0.1:3306/test_bee?useUnicode=true&characterEncoding=UTF-8"));
    cpds.setDriverClass("com.mysql.jdbc.Driver");
    cpds.setUser(conf.get("username").getOrElse("root"));
    cpds.setPassword(conf.get("password").getOrElse(""))
    cpds.setMaxPoolSize(200)
    cpds.setMinPoolSize(20)
    cpds.setAcquireIncrement(5)
    cpds.setMaxStatements(180)
  } catch {
    case e: Exception => e.printStackTrace()
  }
  def getConnection: Connection = {
    try {
      return cpds.getConnection();
    } catch {
      case ex: Exception =>
        ex.printStackTrace()
        null
    }
  }
}
object MysqlManager {
  var mysqlManager: MysqlPool = _
  def getMysqlManager: MysqlPool = {
    synchronized {
      if (mysqlManager == null) {
        mysqlManager = new MysqlPool
      }
    }
    mysqlManager
  }
}

我們利用c3p0建立Mysql連線池,然後訪問的時候每次從連線池中取出連線用於資料傳輸。

Mysql輸出操作

同樣利用之前的foreachRDD設計模式,將Dstream輸出到mysql的程式碼如下:
dstream.foreachRDD(rdd => {
    if (!rdd.isEmpty) {
      rdd.foreachPartition(partitionRecords => {
        //從連線池中獲取一個連線
        val conn = MysqlManager.getMysqlManager.getConnection
        val statement = conn.createStatement
        try {
          conn.setAutoCommit(false)
          partitionRecords.foreach(record => {
            val sql = "insert into table..." // 需要執行的sql操作
            statement.addBatch(sql)
          })
          statement.executeBatch
          conn.commit
        } catch {
          case e: Exception =>
            // do some log
        } finally {
          statement.close()
          conn.close()
        }
      })
    }
})
值得注意的是:

我們在提交Mysql的操作的時候,並不是每條記錄提交一次,而是採用了批量提交的形式,所以需要將conn.setAutoCommit(false),這樣可以進一步提高mysql的效率。
如果我們更新Mysql中帶索引的欄位時,會導致更新速度較慢,這種情況應想辦法避免,如果不可避免,那就硬上吧(T^T)

pom依賴
提供一下Spark連線Mysql和Hbase所需要的jar包的maven配置:

<dependency><!-- Hbase -->
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-common</artifactId>
    <version>1.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.0.0</version>
</dependency>

<dependency><!-- Mysql -->
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.31</version>
</dependency>
<dependency>
    <groupId>c3p0</groupId>
    <artifactId>c3p0</artifactId>
    <version>0.9.1.2</version>
</dependency>