1. 程式人生 > >Elasticsearch資料全量匯入HBase,scroll的正確使用姿勢,HBase資料到Hive

Elasticsearch資料全量匯入HBase,scroll的正確使用姿勢,HBase資料到Hive

1、程式碼

只貼出Es用scroll方式讀取資料以及批量寫入HBase的核心程式碼,其他工具類、方法,比如es、HBase配置、client、connection獲取就不貼了。

1-1、es獲取資料

package ipl.restapi.service.bigdata.es;

import ipl.restapi.util.EsOpenCloseUtils;
import ipl.restapi.util.EsPropertiesUtils;
import ipl.restapi.util.HbaseApiUtils;
import org.apache.hadoop.hbase.client.Connection;
import
org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHits; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import
java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; /** * <p>pakage: ipl.restapi.service.bigdata.es</p> * * descirption: es檢索某一所以全量資料,匯入HBase * * @author wanghai * @version V1.0 * @since <pre>2018/8/15 下午9:03</pre> */ public class
ReadFromEs {
private static final Logger LOGGER = LoggerFactory.getLogger("es"); private static final int SCROLL_SIZE = 10000; private static final int HBASE_PUT_SIZE = 1000; /** * es資料匯入HBase * @param tableName 資料匯入到HBase的那一張表 */ public void putAllDataToHbase(String tableName) { Map<String, Object> hashMap = EsPropertiesUtils.getConf(); TransportClient esClient = EsOpenCloseUtils.getInstance(hashMap); // 建立查詢體 Map<String, Object> queryParams = new HashMap<>(); queryParams.put("index", "papper_little"); queryParams.put("type", "automatic"); // index String indexName = queryParams.get("index").toString(); // type String type = queryParams.get("type").toString(); scrollDataToHbase(esClient, indexName, type, tableName); } /** * 允許我們做一個初始搜尋並且持續批量從Elasticsearch里拉取另一部分資料,結果直到沒有結果剩下,類似於資料庫的遊標, * 為了避免資料量過大,每次從上次scroll的位置繼續獲取資料獲取的資料寫入HBase。 * * @param esClient es客戶端 * @param indexName 索引名 * @param typeName es type * @param tableName 資料匯入到HBase的那一張表 */ public void scrollDataToHbase(Client esClient, String indexName, String typeName, String tableName) { // TODO:通訊過程中getScrollId丟失怎麼辦?比如說kafka,就有多種機制驗證處理請求者的請求資料的偏移量 int baseRowNum = 0; SearchResponse scrollResp = esClient.prepareSearch(indexName) .setTypes(typeName) .setScroll(new TimeValue(300000)) // 每次返回10000條資料(如果夠) .setSize(SCROLL_SIZE).get(); // arraylist放1000個map,一個map為一條論文資料,map中存放key-value對,是存入HBase的列名和值 ArrayList<Map<String, Object>> hit1000List = new ArrayList<>(1024); Connection connection = HbaseApiUtils.getConnection(); do { SearchHits searchHits = scrollResp.getHits(); long num = searchHits.getHits().length; System.out.println("數量:" + num); for (int i = 0; i < num; ) { hit1000List.add(searchHits.getAt(i).getSourceAsMap()); i++; if (i % HBASE_PUT_SIZE == 0) { baseRowNum++; try { // TODO:網路不好,1000條可以考慮非同步讀寫 HbaseApiUtils.putListByMap(connection, tableName, hit1000List, baseRowNum, "pappers_info", "papper_", HBASE_PUT_SIZE); hit1000List.clear(); } catch (IOException e) { LOGGER.error("es批量插入hbase異常-1!"); LOGGER.error(e.getMessage()); } } } System.out.println("目前完成 " + baseRowNum * HBASE_PUT_SIZE); // 處理不足1000的資料 if (!hit1000List.isEmpty()) { baseRowNum++; // 避免不足1000的資料覆蓋之前的 try { HbaseApiUtils.putListByMap(connection, tableName, hit1000List, baseRowNum, "pappers_info", "papper_", HBASE_PUT_SIZE); hit1000List.clear(); System.out.println("down!"); } catch (IOException e) { LOGGER.error("es批量插入hbase異常-2!"); LOGGER.error(e.getMessage()); } } scrollResp = esClient.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet(); } while (scrollResp.getHits().getHits().length != 0); } }

1-2、批量寫入HBase

    /**
     * 批量匯入資料到HBase table
     *
     * @param tableName 要匯入資料的 表名
     * @param arraylist 封裝資料的arraylist
     * @param baseRowNum 傳入的是第幾輪的資料
     * @param filedFamily 列族
     * @param keyPrefix rowKey字首
     * @param putSize 每一輪傳入的資料量(最後一次也許除外)
     * @throws IOException
     */
    public static void putListByMap(Connection connection, String tableName, ArrayList<Map<String, Object>> arraylist, int baseRowNum, String filedFamily, String keyPrefix, int putSize) throws IOException {

        Table table = connection.getTable(TableName.valueOf(tableName));
        List<Put> puts = new ArrayList<>();
        int rowNum = 0;
        baseRowNum = (baseRowNum - 1) * putSize;
        for (Map<String, Object> hitMap : arraylist) {
            rowNum++;
            // 傳輸時,資料需要序列化——Bytes.toBytes
            for (Map.Entry<String, Object> entry : hitMap.entrySet()) {
                Put put = new Put(Bytes.toBytes(keyPrefix + (rowNum + baseRowNum)));
                put.addColumn(Bytes.toBytes(filedFamily), Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue().toString()));
                puts.add(put);
            }
        }
        table.put(puts);
        LOGGER.info("表:{}已使用putListByMap方法批量更新!!!{}", tableName, baseRowNum);
    }
    public static Connection getConnection() {
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", "your_ipaddress");
        config.set("hbase.zookeeper.property.clientPort", "2181");
        // 建立一個連線到叢集的connection
        Connection connection = null;
        try {
            connection = ConnectionFactory.createConnection(config);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return connection;
    }

2、思路

2-1、es中取資料

  • es取資料的方式很多,但是這裡是取全量資料,所以不需要檢索詞,定義index、type即可。
  • 也有其他不少部落格寫es獲取全量資料,但是大多有問題,大致可以分為兩類問題:
    1、資料量不夠大,可以一次性讀進記憶體;
    2、程式碼有bug,獲取下一部分資料時,有丟失或者重複,或者說簡單使用setFrom,並不是真正的部分讀取。

對於資料不能全部讀進記憶體的情況,我們可以使用es中的scroll進行“下標選擇“。允許我們做一個初始搜尋並且持續批量從Elasticsearch里拉取另一部分資料,結果直到沒有結果剩下,類似於資料庫的遊標,為了避免資料量過大,每次從上次scroll的位置繼續獲取資料。

scroll 並不適合用來做實時搜尋,而更適用於後臺批處理任務。

scroll獲取資料大致可分為初始化和遍歷兩個階段,初始化時將所有符合搜尋條件的搜尋結果快取起來,可以想象成快照,然後更新scroll_id遍歷,從這個快照裡繼續取資料。

核心是
scrollResp = esClient.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();while (scrollResp.getHits().getHits().length != 0);

2-2、資料封裝

es獲取的資料封裝進Map,key-value剛好作為column-value。由於HBase稀疏列的特性,各條記錄列數不同也是ok的。

Map資料封裝進Arraylist,批量put進HBase。

3、mac 修改hosts檔案,老是自動恢復原樣

冥思不得解,發現每次都是連線了學校的VPN後發生的。於是查詢瞭解到/private/etc/pulse-hosts.bak,/etc/hosts在pulse
啟動後或者使用中被重置

解決辦法:
在/private/etc/pulse-hosts.bak中填寫hosts即可(類似白名單的概念吧),當然,加入白名單後,/etc/hosts中還是需要填寫的

4、HBase資料到Hive【整合】

5、TODO

1、通訊過程中getScrollId丟失怎麼辦?比如說kafka,就有多種機制驗證處理請求者的請求資料的偏移量
2、優化:非同步,讀es與資料存HBase執行緒分開,以減少處理時間。

參考