【Elasticsearch】Java Client連線池程式碼實現
阿新 • • 發佈:2018-12-10
用過Elasticsearch API的都知道,在Java端使用是ES服務需要建立Java Client,但是每一次連線都例項化一個client,對系統的消耗很大,而且最令人頭疼的是它的連線非常慢。所以為了解決上述問題並提高client利用率,用池化技術複用client,第一次用去建立client,後面使用就直接去池子裡面拿就可以。但是第一次的連線還是很慢,在這裡就不做過多闡述。
第一步,建立一個PoolableUserFactory,需要繼承BasePooledObjectFactory:
package com.yuguo.es.poolutils; @Slf4j public class PoolableUserFactory extends BasePooledObjectFactory<EsClient>{ private EsEnv esEnv = null; public PoolableUserFactory(EsEnv esEnv) { super(); this.esEnv = esEnv; } @Override public void destroyObject(PooledObject<EsClient> poolObject) throws Exception { EsClient esClient=poolObject.getObject(); esClient.close(); log.info("destroyObject EsClient! " + esClient); super.destroyObject(poolObject); } @Override public void passivateObject(PooledObject<EsClient> p) throws Exception { // TODO Auto-generated method stub super.passivateObject(p); } @Override public EsClient create() throws Exception { EsClient esClient = new EsClient(esEnv); log.info("create EsClient! " + esClient); System.out.println("create EsClient! " + esClient); return esClient; } @Override public PooledObject<EsClient> wrap(EsClient esClient) { return new DefaultPooledObject<EsClient>(esClient); } }
第二步,建立獲取ES Client例項物件:
package com.yuguo.es.poolutils; /** * 獲取ES Client單例 * * @ClassName: ClusterClient */ @Slf4j public class EsClient { private EsEnv esEnv; private Client client; protected EsClient(String clusterName, int numberOfShards, int numberOfReplicas) { Settings settings = Settings.builder().put("cluster.name", esEnv.getClusterName()).build(); client = new PreBuiltTransportClient(settings); } protected EsClient addTransport(String host, int port) { ((TransportClient) client).addTransportAddress(new TransportAddress(new InetSocketAddress(host, port))); return this; } private void buildClient(){ Builder builder = Settings.builder(); builder.put("cluster.name", esEnv.getClusterName()); String[] arrIp = esEnv.getIp().split(","); String[] arrPort = esEnv.getPort().split(","); TransportAddress[] addressArr = new TransportAddress[arrPort.length]; for (int i = 0 ,size = arrIp.length; i < size; i++) { String objIp = arrIp[i]; int port = 9300; try { port = Integer.valueOf(arrPort[i]); } catch (NumberFormatException e) { log.error("port trans error !"); } addressArr[i] = new TransportAddress(new InetSocketAddress(objIp, port)); } try { Settings settings = Settings.builder().put("cluster.name", esEnv.getClusterName()).build(); client = new PreBuiltTransportClient(settings).addTransportAddresses(addressArr); } catch (Exception e) { e.printStackTrace(); } log.info("開闢叢集連線,address:"+"\t連線物件"+client); } public Client getClient() { return client; } public void rebuildClient(){ log.info("上次client連線發生錯誤,重新開闢連線!"); if(client != null){ close(); } buildClient(); } public EsClient(EsEnv esEnv) { this.esEnv = esEnv; buildClient(); } public BulkRequestBuilder getBulkRequestBuilder(){ return client.prepareBulk(); } public IndexRequestBuilder getIndexRequestBuilder(){ return client.prepareIndex(); } public SearchRequestBuilder getEsSearch(){ return client.prepareSearch(); } public void esRefresh(){ client.admin().indices().prepareRefresh().execute().actionGet(); } /** * 獲取ES伺服器的所有開啟的索引 * * @Description: * @return */ public ClusterHealthResponse getClusterHealthResponse(){ return client.admin().cluster().health(Requests.clusterHealthRequest().waitForGreenStatus()).actionGet(); } /** * 獲取ES伺服器的所有索引(包括開啟和關閉的索引) * * @Description: * @return */ public MetaData getMetaData(){ ClusterState state = client.admin().cluster().prepareState().execute().actionGet().getState(); return state.getMetaData(); } public boolean isExists(String indexName){ return client.admin().indices().prepareExists(indexName).execute().actionGet().isExists(); } public boolean closeIndex(String indexName){ return client.admin().indices().prepareClose(indexName).execute().actionGet().isAcknowledged(); } public boolean deleteIndex(String indexName){ return client.admin().indices().prepareDelete(indexName).execute().actionGet().isAcknowledged(); } public boolean createIndex(Settings settings, String indexName){ return client.admin().indices().prepareCreate(indexName).setSettings(settings).execute().actionGet().isAcknowledged(); } public void createAlias(String indexName, String aliasName){ client.admin().indices().prepareAliases().addAlias(indexName, aliasName).execute().actionGet(); } public boolean putMapping(PutMappingRequest mappingRequest){ return client.admin().indices().putMapping(mappingRequest).actionGet().isAcknowledged(); } /** * 關閉ES客戶端 * * @Description: */ public void close(){ log.info("\t關閉連線物件"+client); client.close(); } }
第三步,建立連線池物件:
package com.yuguo.es.poolutils; @Slf4j public class EsClientPool { @Getter @Setter private String clusterName; @Getter @Setter private String ip; @Getter @Setter private String port; @Getter @Setter private int keepClienNum; private ObjectPool<EsClient> pool = null; public EsClientPool(String clusterName, String ip, String port,int keepClienNum) { super(); this.clusterName = clusterName; this.ip = ip; this.port = port; this.keepClienNum = keepClienNum; EsEnv esEnv = new EsEnv(); esEnv.setClusterName(clusterName); esEnv.setIp(ip); esEnv.setPort(port); PoolableUserFactory poolFactory=new PoolableUserFactory(esEnv); GenericObjectPoolConfig config=new GenericObjectPoolConfig(); config.setMaxTotal(keepClienNum); pool = new GenericObjectPool<EsClient>(poolFactory, config); } public EsClient getEsClient(){ EsClient esClient = null; try { esClient = pool.borrowObject(); } catch (Exception e) { log.error("create Client error!" , e); } return esClient; } public EsClient removeEsClient(EsClient esClient){ try { pool.returnObject(esClient); } catch (Exception e) { log.error("Client return to pool error!" , e); } return esClient; } }
然後寫一個utils類來獲取elasticsearch連線:
private static EsClientPool pool;
public static EsClientPool esClientPool(ElasticsearchConfig esConfig){
if(pool == null){
pool = new EsClientPool(getEsClusterName(esConfig), getEsIp(esConfig), getEsPort(esConfig).toString(), 5);
}
return pool;
}
到此,一個簡單的Elasticsearch連線池就寫好了!