1. 程式人生 > >在不同版本hdfs叢集之間轉移資料

在不同版本hdfs叢集之間轉移資料

本文僅供記錄一下程式心得:
很多人會有這樣一個需求:將一個hdfs叢集上的資料寫入另一個hdfs叢集所在的hbase資料庫。通常情況下兩個hdfs叢集的版本差距並不大,這樣的程式會很容易寫。但有時會跨大版本。比如作者所在的廠子,資料都在基於hadoop0.19.2版本修改的hdfs叢集上,要將這樣的資料匯入版本為0.20.2+的hdfs叢集,就不能使用同一個hadoop jar包來完成了。如何實現呢?
最簡單的辦法就是把src叢集的資料導到本地,然後起另一個程序將本地資料傳到des叢集上去。
不過這有幾個問題:
[list]
[*] 效率降低
[*] 佔用本地磁碟空間
[*] 不能應付實時導資料需求
[*] 兩個程序需要協調,複雜度增加
[/list]
更好的辦法是在同一個程序內一邊讀src資料,一邊寫des叢集。不過這相當於在同一個程序空間內載入兩個版本的hadoop jar包,這就需要在程式中使用兩個classloader來實現。
以下程式碼可以實現classloader載入自定義的jar包,並生成需要的Configuration物件:
URL[] jarUrls = new URL[1];
jarUrls[0]=new File(des_jar_path).toURI().toURL();
ClassLoader jarloader = new URLClassLoader(jarUrls, null);
Class Proxy = Class.forName("yourclass", true, jarloader);
Configuration conf = (Configuration)Proxy.newInstance();


但是由於在生成HTable物件時,需要使用這個conf物件,而載入這個conf物件的程式碼本身是由預設的classloader載入的,也就是0.19.2的jar包。所以在以上程式碼最後一行所強制轉換的Configuration物件仍然是0.19.2版本的。那怎麼辦呢?
琢磨了一會,發現如果要實現以上功能,必須將生成HTable物件,以及以後的所有hbase操作都使用這個新的classloader,因此這個新的classloader必須載入除了0.19.2的jar包外所有需要用到的jar包,然後把所有操作都封裝進去。在外面用反射來呼叫。
這樣的話,通常建構函式都不為空了,因此需要用到Constructor來構造一個自定義的建構函式
程式碼段如下:
main.java
void init(){
ClassLoader jarloader = generateJarLoader();
Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);
Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});
Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);
proxy = con.newInstance(new Object[]{path, tablename, autoflush});
}
void put(){
...
while((line = getLine()) != null) {
proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));
Method addPut = proxy.getClass().getMethod("addPut",
new Class[]{String.class, String.class, String.class});
addPut.invoke(proxy, new Object[]{field, column, encode});
proxy.getClass().getMethod("putLine").invoke(proxy);
}
}

ClassLoader generateJarLoader() throws IOException {
String libPath = System.getProperty("java.ext.dirs");
FileFilter filter = new FileFilter() {
@Override
public boolean accept(File pathname) {
if(pathname.getName().startsWith("hadoop-0.19.2"))
return false;
else
return pathname.getName().endsWith(".jar");
}
};
File[] jars = new File(libPath).listFiles(filter);
URL[] jarUrls = new URL[jars.length+1];

int k = 0;
for (int i = 0; i < jars.length; i++) {
jarUrls[k++] = jars[i].toURI().toURL();
}
jarUrls[k] = new File("hadoop-0.20.205.jar")
ClassLoader jarloader = new URLClassLoader(jarUrls, null);
return jarloader;
}


HBaseProxy.java
public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)
throws IOException{
Configuration conf = new Configuration();
conf.addResource(new Path(hbase_conf));
config = new Configuration(conf);
htable = new HTable(config, tableName);
admin = new HBaseAdmin(config);
htable.setAutoFlush(autoflush);
}
public void addPut(String field, String column, String encode) throws IOException {
try {
p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
field.getBytes(encode));
} catch (UnsupportedEncodingException e) {
p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
field.getBytes());
}

}
public void generatePut(String rowkey){
p = new Put(rowkey.getBytes());
}

public void putLine() throws IOException{
htable.put(p);
}

總之,在同一個程序中載入多個classloader時一定要注意,classloader A所載入的物件是不能轉換成classloader B的物件的,當然也不能使用。兩個空間的相互呼叫只能用java的基本型別或是反射。