1. 程式人生 > >多執行緒實現Callable資料查詢

多執行緒實現Callable資料查詢

    當在做es查詢時,需要通過terms進行查詢,terms一次最多可以進行1000個值的查詢,如果通過迭代去查詢,單執行緒查詢會比較耗時間,因此,採用分批,每一個批次放入一個執行緒,通過設定固定執行緒池去進行執行緒的呼叫,防止terms太多,分批次多造成執行緒數過多。經過這樣,可以有效的提升查詢效能。其他相似場景亦可。

定義執行緒類:

/**
 * @Author: MR LIS
 * @Description:terms多執行緒查詢實現
 * @Date: Create in 17:05 2018/7/3
 * @Modified By:
 */
@Service(value = "multiTermsESQueryThread")
@Scope(value="prototype")
public class MultiTermsESQueryThread implements Callable<List<Map<String, Object>>> {

    private List<String> indexs;
    private List<String> types;
    private String proName;
    private List<Object> proValueList;

    @Override
    public List<Map<String, Object>> call() throws Exception {
        TransportClient client = ESUtils.getClient();
        SearchRequestBuilder searchRequestBuilder = client.prepareSearch();
        String[] indexNew = DataOperateUtils.tranListToArray(indexs);
        searchRequestBuilder.setIndices(indexNew);  //一次性查詢幾個index 進行搜尋 大類情況下的搜尋
        if (types != null && !types.isEmpty()) {
            searchRequestBuilder.setTypes(types.toArray(new String[0]));
        }

        List<Map<String, Object>> datas = new ArrayList<>();
        searchRequestBuilder.setQuery(QueryBuilders.termsQuery(proName, proValueList));
        searchRequestBuilder.setSize(10000);//一次最多可以返回的記錄數
        SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
        SearchHits hits = searchResponse.getHits();
         List<Long> nodeIdShipIdList = new ArrayList<>();
        for (SearchHit searchHitFields : hits.getHits()) {
            Map<String, Object> sourceAsMap = searchHitFields.getSourceAsMap();
            DataOperateUtils.expendProperty(searchHitFields, sourceAsMap);
            boolean isNode = sourceAsMap.containsKey(StaticParameterUtils.ES_NODE_ID_NAME);  //檢視是否是本體
            if (isNode) {  // 本體
                Long nodeId = new Long(sourceAsMap.get(StaticParameterUtils.ES_NODE_ID_NAME).toString());
                if(!nodeIdShipIdList.contains(nodeId)){
                    nodeIdShipIdList.add(nodeId);
                }else{
                    continue;
                }
            } else {   //關係
                Long relId = new Long(sourceAsMap.get(StaticParameterUtils.ES_RELATIONSHIP_ID_NAME).toString());
                if(!nodeIdShipIdList.contains(relId)){
                    nodeIdShipIdList.add(relId);
                }else{
                    continue;
                }
            }

            datas.add(sourceAsMap);
        }
        nodeIdShipIdList.clear();
        nodeIdShipIdList=null;
        return datas;
    }

    public void setIndexs(List<String> indexs) {
        this.indexs = indexs;
    }

    public void setTypes(List<String> types) {
        this.types = types;
    }

    public void setProName(String proName) {
        this.proName = proName;
    }

    public void setProValueList(List<Object> proValueList) {
        this.proValueList = proValueList;
    }
}

call()返回指定的型別,如:List<Map<String, Object>>

進行方法定義,分批次,每一個批次都通過一個執行緒去執行:

@Override
public List<Map<String, Object>> termsDataByProMulti(List<String> indexs, String type, String proName, List<Object> proValueList) {
    List<Map<String, Object>> datas = new ArrayList<>();
    //定義執行緒池
    ExecutorService threadPool = Executors.newFixedThreadPool(5);
    try {

        List<String> types = new ArrayList<>();
        if (StringUtils.isNotBlank(type)) {
            types.add(type);
        }
        int loopNum = (int) Math.ceil((float) proValueList.size() / 1000);
        List<Object> subProValueList = null;
        List<Callable<List<Map<String, Object>>>> tasks = new ArrayList<>();

        for (int i = 0; i < loopNum; i++) {
            int fromIndex = i * 1000;
            int toIndex = 0;
            if (i == (loopNum - 1)) {
                toIndex = proValueList.size();
            } else {
                toIndex = (i + 1) * 1000;
            }
            subProValueList = proValueList.subList(fromIndex, toIndex);
            MultiTermsESQueryThread termsESQueryThread = (MultiTermsESQueryThread) SpringUtils.getBean("multiTermsESQueryThread");
            termsESQueryThread.setIndexs(indexs);
            termsESQueryThread.setProName(proName);
            termsESQueryThread.setTypes(types);
            termsESQueryThread.setProValueList(subProValueList);
            tasks.add(termsESQueryThread);
        }

        List<Future<List<Map<String, Object>>>> futures = threadPool.invokeAll(tasks);
        for (Future<List<Map<String, Object>>> future : futures) {
            List<Map<String, Object>> mapList = future.get();
            if (mapList != null) {
                datas.addAll(mapList);
            }
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    } finally {
        // 關閉執行緒池
        threadPool.shutdown();
    }

    return datas;
}