hadoop連線mysql資料庫執行資料讀寫資料庫操作
阿新 • • 發佈:2019-01-21
為了方便 MapReduce 直接訪問關係型資料庫(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat兩個類。通過DBInputFormat類把資料庫表資料讀入到HDFS,根據DBOutputFormat類把MapReduce產生的結果集匯入到資料庫表中。
執行MapReduce時候報錯:java.io.IOException: com.mysql.jdbc.Driver,一般是由於程式找不到mysql驅動包。解決方法是讓每個tasktracker執行MapReduce程式時都可以找到該驅動包。
view
plain copy
print?
[java] view
plain copy
print?
新增包有兩種方式:
(1)在每個節點下的${HADOOP_HOME}/lib下新增該包。重啟叢集,一般是比較原始的方法。
(2)a)把包傳到叢集上: Hadoop fs -put MySQL-connector-Java-5.1.0- bin.jar /hdfsPath/
b)在mr程式提交job前,新增語句:DistributedCache.addFileToClassPath(new Path(“/hdfsPath/mysql- connector-java-5.1.0-bin.jar”),conf);
mysql資料庫儲存到hadoop hdfs
mysql表建立和資料初始化
[sql]- DROPTABLE IF EXISTS `wu_testhadoop`;
- CREATETABLE `wu_testhadoop` (
- `id` int(11) NOTNULL AUTO_INCREMENT,
- `title` varchar(255) DEFAULTNULL,
- `content` varchar(255) DEFAULTNULL,
- PRIMARYKEY (`id`)
-
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
- -- ----------------------------
- -- Records of wu_testhadoop
- -- ----------------------------
- INSERTINTO `wu_testhadoop` VALUES ('1', '123', '122312');
- INSERTINTO `wu_testhadoop` VALUES ('2', '123', '123456');
定義hadoop資料訪問
mysql表建立完畢後,我們需要定義hadoop訪問mysql的規則;
hadoop提供了org.apache.hadoop.io.Writable介面來實現簡單的高效的可序列化的協議,該類基於DataInput和DataOutput來實現相關的功能。
hadoop對資料庫訪問也提供了org.apache.hadoop.mapred.lib.db.DBWritable介面,其中write方法用於對PreparedStatement物件設定值,readFields方法用於對從資料庫讀取出來的物件進行列的值繫結;
以上兩個介面的使用如下(內容是從原始碼得來)
writable
[java] view plain copy print?- publicclass MyWritable implements Writable {
- // Some data
- privateint counter;
- privatelong timestamp;
- publicvoid write(DataOutput out) throws IOException {
- out.writeInt(counter);
- out.writeLong(timestamp);
- }
- publicvoid readFields(DataInput in) throws IOException {
- counter = in.readInt();
- timestamp = in.readLong();
- }
- publicstatic MyWritable read(DataInput in) throws IOException {
- MyWritable w = new MyWritable();
- w.readFields(in);
- return w;
- }
- }
DBWritable
[java] view
plain copy
print?
- publicclass MyWritable implements Writable, DBWritable {
- // Some data
- privateint counter;
- privatelong timestamp;
- //Writable#write() implementation
- publicvoid write(DataOutput out) throws IOException {
- out.writeInt(counter);
- out.writeLong(timestamp);
- }
- //Writable#readFields() implementation
- publicvoid readFields(DataInput in) throws IOException {
- counter = in.readInt();
- timestamp = in.readLong();
- }
- publicvoid write(PreparedStatement statement) throws SQLException {
- statement.setInt(1, counter);
- statement.setLong(2, timestamp);
- }
- publicvoid readFields(ResultSet resultSet) throws SQLException {
- counter = resultSet.getInt(1);
- timestamp = resultSet.getLong(2);
- }
- }
資料庫對應的實現
[java] view plain copy print?- package com.wyg.hadoop.mysql.bean;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import java.sql.PreparedStatement;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapred.lib.db.DBWritable;
- publicclass DBRecord implements Writable, DBWritable{
- privateint id;
- private String title;
- private String content;
- publicint getId() {
- return id;
- }
- publicvoid setId(int id) {
- this.id = id;
- }
- public String getTitle() {
- return title;
- }
- publicvoid setTitle(String title) {
- this.title = title;