1. 程式人生 > >【Elasticsearch】Java Client連線池程式碼實現

【Elasticsearch】Java Client連線池程式碼實現

用過Elasticsearch API的都知道,在Java端使用是ES服務需要建立Java Client,但是每一次連線都例項化一個client,對系統的消耗很大,而且最令人頭疼的是它的連線非常慢。所以為了解決上述問題並提高client利用率,用池化技術複用client,第一次用去建立client,後面使用就直接去池子裡面拿就可以。但是第一次的連線還是很慢,在這裡就不做過多闡述。

第一步,建立一個PoolableUserFactory,需要繼承BasePooledObjectFactory:

package com.yuguo.es.poolutils;

@Slf4j
public class PoolableUserFactory extends BasePooledObjectFactory<EsClient>{

	private EsEnv esEnv = null;
	
	public PoolableUserFactory(EsEnv esEnv) {
		super();
		this.esEnv = esEnv;
	}
	
	@Override
	public void destroyObject(PooledObject<EsClient> poolObject) throws Exception {
		EsClient esClient=poolObject.getObject();
		esClient.close();
		log.info("destroyObject EsClient! " + esClient);
		super.destroyObject(poolObject);
	}
	
	@Override
	public void passivateObject(PooledObject<EsClient> p) throws Exception {
		// TODO Auto-generated method stub
		super.passivateObject(p);
	}
	
	@Override
	public EsClient create() throws Exception {
		EsClient esClient = new EsClient(esEnv);
		log.info("create EsClient! " + esClient);
		System.out.println("create EsClient! " + esClient);
		return esClient;
	}
	
	@Override
	public PooledObject<EsClient> wrap(EsClient esClient) {
		return new DefaultPooledObject<EsClient>(esClient);
	}
}

第二步,建立獲取ES Client例項物件:

package com.yuguo.es.poolutils;

/**
 * 獲取ES Client單例
 * 
 * @ClassName: ClusterClient
 */
@Slf4j
public  class EsClient {
	
	private EsEnv esEnv;
	private Client client;
	
	protected EsClient(String clusterName, int numberOfShards, int numberOfReplicas) {
		Settings settings = Settings.builder().put("cluster.name", esEnv.getClusterName()).build();
		client = new PreBuiltTransportClient(settings);
	}
	
	protected EsClient addTransport(String host, int port) {
		((TransportClient) client).addTransportAddress(new TransportAddress(new InetSocketAddress(host, port)));
		return this;
	}
	
	private void buildClient(){
		Builder builder = Settings.builder();
		builder.put("cluster.name",  esEnv.getClusterName());
		
		String[] arrIp = esEnv.getIp().split(",");
		String[] arrPort = esEnv.getPort().split(",");
		
		TransportAddress[] addressArr = new TransportAddress[arrPort.length];
		for (int i = 0 ,size = arrIp.length; i < size; i++) {
			String objIp = arrIp[i];
			int port = 9300;
			try {
				port = Integer.valueOf(arrPort[i]);
			} catch (NumberFormatException e) {
				log.error("port trans error !");
			}
			addressArr[i] = new TransportAddress(new InetSocketAddress(objIp, port));
		}
		try {
			Settings settings = Settings.builder().put("cluster.name", esEnv.getClusterName()).build();
			client = new PreBuiltTransportClient(settings).addTransportAddresses(addressArr);
		} catch (Exception e) {
			e.printStackTrace();
		}  
		log.info("開闢叢集連線,address:"+"\t連線物件"+client);
	}
	
	public Client getClient() {
		return client;
	}

	public void rebuildClient(){
		log.info("上次client連線發生錯誤,重新開闢連線!");
		if(client != null){
			close();
		}
		buildClient();
	}
	
	public EsClient(EsEnv esEnv) {
		this.esEnv = esEnv;
		buildClient();
	}
	
	public BulkRequestBuilder getBulkRequestBuilder(){
		return client.prepareBulk();
	}
	
	public IndexRequestBuilder getIndexRequestBuilder(){
		return client.prepareIndex();
	}
	
	public SearchRequestBuilder getEsSearch(){
		return client.prepareSearch();
	}
	
	public void esRefresh(){
		client.admin().indices().prepareRefresh().execute().actionGet();
	}
	
	/**
	 * 獲取ES伺服器的所有開啟的索引
	 * 
	 * @Description: 
	 * @return
	 */
	public ClusterHealthResponse getClusterHealthResponse(){
		return client.admin().cluster().health(Requests.clusterHealthRequest().waitForGreenStatus()).actionGet();
	}
	
	/**
	 * 獲取ES伺服器的所有索引(包括開啟和關閉的索引)
	 * 
	 * @Description: 
	 * @return
	 */
	public MetaData getMetaData(){
		ClusterState state = client.admin().cluster().prepareState().execute().actionGet().getState();
		return state.getMetaData();
	}
	
	
	public boolean isExists(String indexName){
		return client.admin().indices().prepareExists(indexName).execute().actionGet().isExists();
	}
	
	public boolean closeIndex(String indexName){
		return client.admin().indices().prepareClose(indexName).execute().actionGet().isAcknowledged();
	}
	
	public boolean deleteIndex(String indexName){
		return client.admin().indices().prepareDelete(indexName).execute().actionGet().isAcknowledged();
	}
	
	public boolean createIndex(Settings settings, String indexName){
		return client.admin().indices().prepareCreate(indexName).setSettings(settings).execute().actionGet().isAcknowledged();
	}
	
	public void createAlias(String indexName, String aliasName){
		client.admin().indices().prepareAliases().addAlias(indexName, aliasName).execute().actionGet();
	}
	
	public boolean putMapping(PutMappingRequest mappingRequest){
		return client.admin().indices().putMapping(mappingRequest).actionGet().isAcknowledged();
	}
	
	
	/**
	 * 關閉ES客戶端
	 * 
	 * @Description:
	 */
	public void close(){
		log.info("\t關閉連線物件"+client);
		client.close(); 
	}	
}

第三步,建立連線池物件:

package com.yuguo.es.poolutils;

@Slf4j
public class EsClientPool {
	
	@Getter
	@Setter
	private String clusterName;
	
	@Getter
	@Setter
	private String ip;
	
	@Getter
	@Setter
	private String port;
	
	@Getter
	@Setter
	private int keepClienNum;
	
	private ObjectPool<EsClient> pool = null;
	
	public EsClientPool(String clusterName, String ip, String port,int keepClienNum) {
		super();
		this.clusterName = clusterName;
		this.ip = ip;
		this.port = port;
		this.keepClienNum = keepClienNum;
		
		EsEnv esEnv = new EsEnv();
	    	esEnv.setClusterName(clusterName);
	    	esEnv.setIp(ip);
	    	esEnv.setPort(port);
	    	PoolableUserFactory poolFactory=new PoolableUserFactory(esEnv);
	    	GenericObjectPoolConfig config=new GenericObjectPoolConfig();
	    	config.setMaxTotal(keepClienNum);
	    	pool = new GenericObjectPool<EsClient>(poolFactory, config);
	}
    
	public EsClient getEsClient(){
    	EsClient esClient = null;
    	try {
			esClient = pool.borrowObject();
		} catch (Exception e) {
			log.error("create Client error!" , e);
		}
    	return esClient;
    }
    
    public EsClient removeEsClient(EsClient esClient){
    	try {
			pool.returnObject(esClient);
		} catch (Exception e) {
			log.error("Client return to pool error!" , e);
		}
    	return esClient;
    }
    
}

然後寫一個utils類來獲取elasticsearch連線:

private static EsClientPool pool;
	
public static EsClientPool esClientPool(ElasticsearchConfig esConfig){
	if(pool == null){
		pool = new EsClientPool(getEsClusterName(esConfig), getEsIp(esConfig), 	getEsPort(esConfig).toString(), 5);
	}
	return pool;
}

到此,一個簡單的Elasticsearch連線池就寫好了!