1. 程式人生 > >hadoop連線mysql資料庫執行資料讀寫資料庫操作

hadoop連線mysql資料庫執行資料讀寫資料庫操作

    為了方便 MapReduce 直接訪問關係型資料庫(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat兩個類。通過DBInputFormat類把資料庫表資料讀入到HDFS,根據DBOutputFormat類把MapReduce產生的結果集匯入到資料庫表中。     執行MapReduce時候報錯:java.io.IOException: com.mysql.jdbc.Driver,一般是由於程式找不到mysql驅動包。解決方法是讓每個tasktracker執行MapReduce程式時都可以找到該驅動包。

新增包有兩種方式:

(1)在每個節點下的${HADOOP_HOME}/lib下新增該包。重啟叢集,一般是比較原始的方法。

(2)a)把包傳到叢集上: Hadoop fs -put MySQL-connector-Java-5.1.0- bin.jar /hdfsPath/

       b)在mr程式提交job前,新增語句:DistributedCache.addFileToClassPath(new Path(“/hdfsPath/mysql- connector-java-5.1.0-bin.jar”),conf);

mysql資料庫儲存到hadoop hdfs

mysql表建立和資料初始化

[sql]
 view plain copy  print?
  1. DROPTABLE IF EXISTS `wu_testhadoop`;  
  2. CREATETABLE `wu_testhadoop` (  
  3.   `id` int(11) NOTNULL AUTO_INCREMENT,  
  4.   `title` varchar(255) DEFAULTNULL,  
  5.   `content` varchar(255) DEFAULTNULL,  
  6.   PRIMARYKEY (`id`)  
  7. ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;  
  8. -- ----------------------------
  9. -- Records of wu_testhadoop
  10. -- ----------------------------
  11. INSERTINTO `wu_testhadoop` VALUES ('1''123''122312');  
  12. INSERTINTO `wu_testhadoop` VALUES ('2''123''123456');  

定義hadoop資料訪問

mysql表建立完畢後,我們需要定義hadoop訪問mysql的規則;

hadoop提供了org.apache.hadoop.io.Writable介面來實現簡單的高效的可序列化的協議,該類基於DataInput和DataOutput來實現相關的功能。

hadoop對資料庫訪問也提供了org.apache.hadoop.mapred.lib.db.DBWritable介面,其中write方法用於對PreparedStatement物件設定值,readFields方法用於對從資料庫讀取出來的物件進行列的值繫結;

以上兩個介面的使用如下(內容是從原始碼得來)

writable

[java] view plain copy  print?
  1. publicclass MyWritable implements Writable {  
  2.       // Some data     
  3.       privateint counter;  
  4.       privatelong timestamp;  
  5.       publicvoid write(DataOutput out) throws IOException {  
  6.         out.writeInt(counter);  
  7.         out.writeLong(timestamp);  
  8.       }  
  9.       publicvoid readFields(DataInput in) throws IOException {  
  10.         counter = in.readInt();  
  11.         timestamp = in.readLong();  
  12.       }  
  13.       publicstatic MyWritable read(DataInput in) throws IOException {  
  14.         MyWritable w = new MyWritable();  
  15.         w.readFields(in);  
  16.         return w;  
  17.       }  
  18.     }  


DBWritable

[java] view plain copy  print?
  1. publicclass MyWritable implements Writable, DBWritable {  
  2.    // Some data     
  3.    privateint counter;  
  4.    privatelong timestamp;  
  5.    //Writable#write() implementation
  6.    publicvoid write(DataOutput out) throws IOException {  
  7.      out.writeInt(counter);  
  8.      out.writeLong(timestamp);  
  9.    }  
  10.    //Writable#readFields() implementation
  11.    publicvoid readFields(DataInput in) throws IOException {  
  12.      counter = in.readInt();  
  13.      timestamp = in.readLong();  
  14.    }  
  15.    publicvoid write(PreparedStatement statement) throws SQLException {  
  16.      statement.setInt(1, counter);  
  17.      statement.setLong(2, timestamp);  
  18.    }  
  19.    publicvoid readFields(ResultSet resultSet) throws SQLException {  
  20.      counter = resultSet.getInt(1);  
  21.      timestamp = resultSet.getLong(2);  
  22.    }   
  23.  }  

資料庫對應的實現

[java] view plain copy  print?
  1. package com.wyg.hadoop.mysql.bean;  
  2. import java.io.DataInput;  
  3. import java.io.DataOutput;  
  4. import java.io.IOException;  
  5. import java.sql.PreparedStatement;  
  6. import java.sql.ResultSet;  
  7. import java.sql.SQLException;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.io.Writable;  
  10. import org.apache.hadoop.mapred.lib.db.DBWritable;  
  11. publicclass DBRecord implements Writable, DBWritable{  
  12.     privateint id;  
  13.     private String title;  
  14.     private String content;  
  15.     publicint getId() {  
  16.         return id;  
  17.     }  
  18.     publicvoid setId(int id) {  
  19.         this.id = id;  
  20.     }  
  21.     public String getTitle() {  
  22.         return title;  
  23.     }  
  24.     publicvoid setTitle(String title) {  
  25.         this.title = title;