1. 程式人生 > >Hbase幾種資料入庫方式比較

Hbase幾種資料入庫方式比較


import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HBaseImport extends Configured implements Tool{
static final Log LOG = LogFactory.getLog(HBaseImport.class);

public static final String JOBNAME = "MRImport ";
public static class Map extends Mapper<LongWritable , Text, NullWritable, NullWritable>{
Configuration configuration = null;
HTable xTable = null;
private boolean wal = true;
static long count = 0;
@Override
protected void cleanup(Context context) throws IOException,

InterruptedException {

// TODO Auto-generated method stub

super.cleanup(context);

xTable.flushCommits();

xTable.close();

}

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String all[] = value.toString().split("/t");

If(all.length==2){

put = new Put(Bytes.toBytes(all[0]))); put.add(Bytes.toBytes("xxx"),Bytes.toBytes("20110313"),Bytes.toBytes(all[1]));

}

if (!wal) {

put.setWriteToWAL(false);

}

xTable.put(put);

if ((++count % 100)==0) {

context.setStatus(count +" DOCUMENTS done!");

context.progress();

System.out.println(count +" DOCUMENTS done!");

}

}

@Override

protected void setup(Context context) throws IOException,

InterruptedException {

// TODO Auto-generated method stub

super.setup(context);

configuration = context.getConfiguration();

xTable = new HTable(configuration,"testKang");

xTable.setAutoFlush(false);

xTable.setWriteBufferSize(12*1024*1024);

wal = true;

}

}

@Override

public int run(String[] args) throws Exception {

String input = args[0];

Configuration conf = HBaseConfiguration.create(getConf());

conf.set("hbase.master", "m0:60000");

Job job = new Job(conf,JOBNAME);

job.setJarByClass(HBaseImport.class);

job.setMapperClass(Map.class);

job.setNumReduceTasks(0);

job.setInputFormatClass(TextInputFormat.class);

TextInputFormat.setInputPaths(job, input);

job.setOutputFormatClass(NullOutputFormat.class);

return job.waitForCompletion(true)?0:1;

}

public static void main(String[] args) throws IOException {

Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

int res = 1;

try {

res = ToolRunner.run(conf, new HBaseImport (), otherArgs);

} catch (Exception e) {

e.printStackTrace();

}

System.exit(res);

}

}

3. 通過Java程式入庫


import java.io.BufferedReader;

import java.io.File;

import java.io.FileReader;

import java.io.IOException;

import java.util.ArrayList;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Put;

public class InsertContactJava {

public static long startTime;

public static long rowkey = 0; //起始rowkey

public static final int lineCount = 100000; //每次提交時錄入的行數

public static String tableName = "usercontact_kang"; //錄入目的表名

public static int countLie = 8; //表的列數

public static void main(String[] args) throws IOException {

startTime = System.currentTimeMillis() / 1000;

System.out.println("start time = " + startTime);

Thread t1 = new Thread() {

@Override

public void run() {

try {

insert_one("/run/jar/123");

//loadByLieWithVector("/run/jar/123");

//loadByLieWithArrayList("/run/jar/123");

} catch (IOException e) {

e.printStackTrace();

}

}

};

t1.start();

}

public static void insert_one(String path) throws IOException {

Configuration conf = HBaseConfiguration.create();

HTable table = new HTable(conf, tableName);

File f = new File(path);

ArrayList<Put> list = new ArrayList<Put>();

BufferedReader br = new BufferedReader(new FileReader(f));

String tmp = br.readLine();

int count = 0;

while (tmp != null) {

if (list.size() > 10000) {

table.put(list);

table.flushCommits();

list.clear();

} else {

String arr_value[] = tmp.toString().split("/t", 10);

String first[] = arr_value[0].split("~", 5);

String second[] = arr_value[1].split("~", 5);

String rowname = getIncreasRowKey();

String firstaccount = first[0];

String firstprotocolid = first[1];

String firstdomain = first[2];

String inserttime = Utils.getToday("yyyyMMdd");

String secondaccount = second[0];

String secondprotocolid = second[1];

String seconddomain = second[2];

String timescount = Integer.valueOf(arr_value[2]).toString();

Put p = new Put(rowname.getBytes());

p.add(("ucvalue").getBytes(), "FIRSTACCOUNT".getBytes(),

firstaccount.getBytes());

p.add(("ucvalue").getBytes(), "FIRSTDOMAIN".getBytes(),

firstdomain.getBytes());

p.add(("ucvalue").getBytes(), "FIRSTPROTOCOLID".getBytes(),

firstprotocolid.getBytes());

p.add(("ucvalue").getBytes(), "INSERTTIME".getBytes(),

inserttime.getBytes());

p.add(("ucvalue").getBytes(), "SECONDACCOUNT".getBytes(),

secondaccount.getBytes());

p.add(("ucvalue").getBytes(), "SECONDDOMAIN".getBytes(),

seconddomain.getBytes());

p.add(("ucvalue").getBytes(), "SECONDPROTOCOLID".getBytes(),

secondprotocolid.getBytes());

p.add(("ucvalue").getBytes(), "TIMESCOUNT".getBytes(),

timescount.getBytes());

list.add(p);

}

tmp = br.readLine();

count++;

}

if (list.size() > 0) {

table.put(list);

table.flushCommits();

}

table.close();

System.out.println("total = " + count);

long endTime = System.currentTimeMillis() / 1000;

long costTime = endTime - startTime;

System.out.println("end time = " + endTime);

System.out.println(path + ": cost time = " + costTime);

}

4. 入庫方式比較
Ø 生成HFile方式:

生成HFile的過程比較慢,生成HFile後寫入hbase非常快,基本上就是hdfs上的mv過程.對於生成HFile方式入庫的時候有一個改進的方案,就是先對資料排序,然後生成HFile。

HFile方式在所有的載入方案裡面是最快的,不過有個前提——資料是第一次匯入,表是空的。如果表中已經有了資料。HFile再匯入到hbase的表中會觸發split操作,最慢的時候這種操作會耗時1小時。


Ø MapReduce方式:

開始會很快,但是由於mr和hbase競爭資源,到一個特定的時間點會變很慢

Ø Java程式方式:

多客戶端,多執行緒同時入庫,目前看來是最好的方式,client和regionserver分開,硬碟讀寫分開,瓶頸只在網路和記憶體上。諮詢了一些牛人,大多推薦這種方式,並且一定要多客戶端,多執行緒。關於入庫效率的調優,在我另一篇部落格中有說明。

相關推薦

Hbase資料入庫方式比較

import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurati

Hbase資料入庫(load)方式比較

1. 預先生成HFile入庫 2. 通過MapReduce入庫 /* MapReduce 讀取hdfs上的檔案,以HTable.put(put)的方式在map中完成資料寫入,無reduce過程*/ import java.io.IOException; import or

SQL Server中遍歷方式比較

不同 .com font size 常用 分享 分享圖片 遊標 inf SQL遍歷解析   在SQL的存儲過程,函數中,經常需要使用遍歷(遍歷table),其中遊標、臨時表等遍歷方法很常用。面對小數據量,這幾種遍歷方法均可行,但是面臨大數據量時,就需要擇優選擇,不同的遍歷方

http協議基礎(三)資料傳輸方式

說說http協議的一些特點: 1)無狀態 http協議是一種自身不對請求和響應之間的通訊狀態進行儲存的協議,即無狀態協議。 這種設定的好處是:更快的處理更多的請求事務,確保協議的可伸縮性 不過隨著web的不斷髮展,有時候,需要將這種狀態進行保持,隨即,就引入了cookie技術,cookie技術通過在請

MySQL5.6建索引方式比較

mysql> select version(); +-----------+ | version() | +-----------+ | 5.6.17    | +-----------+ mysql> CREATE TABLE test AS SELECT *

linux下IPC通行方式比較

linux上面的IPC大多都是從UNIX上面繼承而來。         最初Unix IPC包括:管道、FIFO、訊號。System V IPC包括:System V訊息佇列、System V訊號燈、System V共享記憶體區。由於Unix版本的多樣性,電子電氣工程協會(

JS遍歷方式比較

數組 不能 left fff 支持 clas ffffff padding for in 幾種遍歷方式比較 for of 循環不僅支持數組、大多數偽數組對象,也支持字符串遍歷,此外還支持 Map 和 Set 對象遍歷。 for in 循環可以遍歷字符串、

Hive資料匯出方式

轉自http://www.iteblog.com/archives/955   寫在前面的話,學Hive這麼久了,發現目前國內還沒有一本完整的介紹Hive的書籍,而且網際網路上面的資料很亂,於是我決定寫一些關於《Hive的那些事》序列文章,分享給大家。我會在

std::vector的遍歷方式比較

std::vector是我在標準庫中實用最頻繁的容器。總結一下在遍歷和建立vector時需要注意的一些地方。         在不考慮執行緒安全問題的前提下,在C++11中有五種遍歷方式。 方式一 for (size_t i =0; i < vec.size(); i

java指定編碼的按行讀寫txt檔案(讀寫方式比較

輸入輸出的幾種形式 1.FileReader,FileWriter File r = new File("temp.txt") FileReader f = new FileReader(name);//讀取檔案name BufferedReader b = new Buf

從壹開始前後端分離【 .NET Core2.0 +Vue2.0 】框架之十二 || 三跨域方式比較,DTOs(資料傳輸物件)初探

更新反饋 1、博友@童鞋說到了,Nginx反向代理實現跨域,因為我目前還沒有使用到,給忽略了,這次記錄下,為下次補充。 程式碼已上傳Github+Gitee,文末有地址   今天忙著給小夥伴們提出的問題解答,時間上沒把握好,都快下班了,趕緊釋出:書說上文《從壹開始前

資料結構(一):常見排序演算法比較

排序 0. 常見排序演算法效率比較 時間複雜度及穩定性比較 排序方法 平均方法 最優複雜度 最壞複雜度 輔助空間 穩定性 氣泡排序 O(

postman的資料提交的方式,以及提交方式的介紹

    1、form-data:                     就是http請求中的multipart/form-data,它會將表單的資料處理為一

建立執行緒有不同的方式?哪一比較受歡迎?為什麼?

有三種方式: ①繼承Thread類(真正意義上的執行緒類),是Runnable介面的實現。 ②實現Runnable介面,並重寫裡面的run方法。 ③使用Executor框架建立執行緒池。Executor框架是juc裡提供的執行緒池的實現。呼叫執行緒的start():啟動此執行緒;呼叫相應的r

python: websocket獲取實時資料常見連結方式

第一種, 使用create_connection連結,需要pip install websocket-client (此方法不建議使用,連結不穩定,容易斷,並且連線很耗時) import time from websocket import create_co

Spark Streaming結合 Kafka 兩不同的資料接收方式比較

DirectKafkaInputDStream 只在 driver 端接收資料,所以繼承了 InputDStream,是沒有 receivers 的 在結合 Spark Streaming 及 Kafka 的實時應用中,我們通常使用以下兩個 API 來獲取最初的 DStream(這裡不關心這兩個 API 的

組合語言入門:定址方式比較

考慮到上一章的“定址方式”太重要了,遂單獨成章,作文於此 直接定址 適用於偏移地址為[idata]的情形 CS: IP存放程式碼指標(2000: 000E) 地址加法器合成C

GoLang讀檔案方式比較

    GoLang提供了很多讀檔案的方式,一般來說常用的有三種。使用Read加上buffer,使用bufio庫和ioutil 庫。 那他們的效率如何呢?用一個簡單的程式來評測一下: Go程式碼   package main  import(      "fmt"

資料處理框架的場景比較:傳統ETL工具、Mapreduce、Hive、Spark

ref: http://www.sohu.com/a/155141436_151779提起“大資料”就不得不提起有關資料的處理,雖然有人說過大資料在資料質量方面的要求不比傳統資料的要求那麼嚴格,當然這也是分場景的斷言,但是無論何時資料處理在大資料的生態中始終處於不可缺少的地位

技術實操丨HBase 2.X版本的元資料修復及一資料遷移方式

摘要:分享一個HBase叢集恢復的方法。 背景 在HBase 1.x中,經常會遇到元資料不一致的情況,這個時候使用HBCK的命令,可以快速修復元資料,讓叢集恢復正常。 另外HBase資料遷移時,大家經常使用到一種遷移方式是:拷貝HBase的資料目錄/hbase/data/default到新的叢集,然後在新叢集