1. 程式人生 > >hbase建立預切割-開啟多執行緒實現大批量插入

hbase建立預切割-開啟多執行緒實現大批量插入

建立表同時預切割區域 可以防止資料傾斜

------------------

$hbase>create 'ns1:t3', 'f1', SPLITS => ['row3000000', 'row600000']

/**
	 * 新增批量資料
	 */
	public void bigput(int start , int end) throws Exception {
		DecimalFormat df = new DecimalFormat("000000") ;
		//
		Configuration conf = HBaseConfiguration.create();
		Connection conn = ConnectionFactory.createConnection(conf);
		HTable table = (HTable) conn.getTable(TableName.valueOf("ns1:t3"));
		//關閉自動清理緩衝區
		table.setAutoFlushTo(false);
		for (int i = start; i < end; i++) {
			byte[] rowkey = Bytes.toBytes("row" + df.format(i));
			Put put = new Put(rowkey);
			put.setWriteToWAL(false);
			put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("id"), Bytes.toBytes(i));
			put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes("tom" + i));
			put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("age"), Bytes.toBytes(i % 100));
			table.put(put);
			if (i % 2000 == 0) {
				//清理提交
				table.flushCommits();
			}
		}
		//清理提交
		table.flushCommits();
		table.close();
		//System.out.println(System.currentTimeMillis() - start);
	}

	@Test
	public void testBigInsertMulti() throws InterruptedException {
		long start = System.currentTimeMillis() ;
		Thread t1 = new Thread(){
			public void run() {
				try {
					bigput(0, 300000) ;
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		};
		Thread t2 = new Thread(){
			public void run() {
				try {
					bigput(300000, 600000) ;
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		};
		Thread t3 = new Thread(){
			public void run() {
				try {
					bigput(600000, 1000000) ;
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		};

		t1.start();
		t2.start();
		t3.start();

		t1.join();
		t2.join();
		t3.join();

		System.out.println(System.currentTimeMillis() - start);
	}