1. 程式人生 > >DataX學習筆記-Reader外掛開發

DataX學習筆記-Reader外掛開發

DataX開發基於讀取ElasticSearch資料的Reader外掛

1、檢出DataX原始碼(git clone https://github.com/alibaba/DataX.git DataX),匯入專案,新建一個esreader的maven專案進行外掛開發。

2、在DataX安裝目錄的plugins/reader目錄下新建esreader目錄,目錄下包含plugin_job_template.json、plugin.json、esreader-0.0.1-SNAPSHOT.jar,同時在目錄下建立一個libs目錄,存放相關依賴的jar檔案。

相關程式碼:

package com.alibaba.datax.plugin.reader.esreader;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.google.gson.Gson;
import com.umeng.es.config.EsServerAddress;

public class ESReader extends Reader {

	public static class Job extends Reader.Job {
		
		private Configuration originalConfiguration = null;
		
		@Override
		public void preCheck() {
			super.preCheck();
		}

		@Override
		public void preHandler(Configuration jobConfiguration) {
			super.preHandler(jobConfiguration);
		}
		
		@Override
		public void init() {
			this.originalConfiguration = super.getPluginJobConf();
		}
		
		@Override
		public void prepare() {
			super.prepare();
		}

		@Override
		public void post() {
			super.post();
		}
		
		@Override
		public void postHandler(Configuration jobConfiguration) {
			super.postHandler(jobConfiguration);
		}
		
		@Override
		public void destroy() {
		}

		@Override
		public List<Configuration> split(int adviceNumber) {
			List<Configuration> readerSplitConfigurations = new ArrayList<Configuration>();
			for (int i = 0; i < adviceNumber; i++) {
                        Configuration readerSplitConfiguration = this.originalConfiguration.clone();
				readerSplitConfigurations.add(readerSplitConfiguration);
			}
			return readerSplitConfigurations;
		}
		
	}
	
	public static class Task extends Reader.Task {
		
		private Configuration readerSliceConfiguration = null;
		
		private String esClusterName = null;
		
		private String esClusterIP = null;

		private Integer esClusterPort = null;
		
		private String esIndex = null;
		
		private String esType = null;
		
		private Gson gson = null;
		
		private TransportClient client = null;
		
		private Integer batchSize = null;
		
		private static final Logger LOG = LoggerFactory.getLogger(Task.class);

		@Override
		public void preCheck() {
			super.preCheck();
		}
		
		@Override
		public void preHandler(Configuration jobConfiguration) {
			super.preHandler(jobConfiguration);
		}
		
		@Override
		public void init() {
			this.readerSplitConfiguration= super.getPluginJobConf();
			this.esClusterName = readerSplitConfiguration.getString(Key.esClusterName);
			this.esClusterIP = readerSplitConfiguration.getString(Key.esClusterIP);
			this.esClusterPort = readerSplitConfiguration.getInt(Key.esClusterPort, 9300);
			this.esIndex = readerSplitConfiguration.getString(Key.esIndex);
			this.esType = readerSplitConfiguration.getString(Key.esType);
			this.batchSize = readerSplitConfiguration.getInt(Key.batchSize, 1000);
			this.gson = new Gson();
		}
		
		@Override
		public void prepare() {
			super.prepare();
			Settings settings = Settings.builder().put("cluster.name", esClusterName)
					.put("client.tansport.sniff", true).build();
			client = TransportClient.builder().settings(settings).build();
			List<EsServerAddress> serverAddress = new ArrayList<EsServerAddress>();
			String[] esClusterIPs = esClusterIP.contains(",") ? 
					esClusterIP.split(",") : new String[]{esClusterIP};
			for (int i = 0, len = esClusterIPs.length; i < len; i++) {
				serverAddress.add(new EsServerAddress(esClusterIPs[i], esClusterPort));
			}
			for (EsServerAddress address : serverAddress) {
				client.addTransportAddress(new InetSocketTransportAddress(
						new InetSocketAddress(address.getHost(), address.getPort())));
			}
		}
		
		@Override
		public void post() {
			super.post();
		}
		
		@Override
		public void postHandler(Configuration jobConfiguration) {
			super.postHandler(jobConfiguration);
		}

		@Override
		public void destroy() {
			client.close();
		}
		
		@Override
		public void startRead(RecordSender recordSender) {
			SearchResponse response = client.prepareSearch(esIndex).setTypes(esType)
					.setQuery(QueryBuilders.matchAllQuery()).setSearchType(SearchType.QUERY_THEN_FETCH)
						.setScroll(new TimeValue(60000)).setSize(batchSize).setExplain(false).execute().actionGet();
			int totalSize = 0;
			Record record = null;
			while (true) {
				SearchHit[] hitArray = response.getHits().getHits();
				SearchHit hit = null;
				for (int i = 0, len = hitArray.length; i < len; i++) {
					record = recordSender.createRecord();
					hit = hitArray[i];
					record.addColumn(new StringColumn(gson.toJson(hit.getSource())));
					recordSender.sendToWriter(record);
				}
				if (hitArray.length == 0) break;
				totalSize += hitArray.length;
				response = client.prepareSearchScroll(response.getScrollId())
								.setScroll(new TimeValue(60000)).execute().actionGet();
			}
			LOG.info("total size : " + totalSize);
		}
		
	}
	
}
package com.alibaba.datax.plugin.reader.esreader;

public final class Key {
	
	/*
	 * @name:  esClusterName
	 * @description:  elastic search cluster name
	*/
	public final static String esClusterName = "esClusterName";
	
	/*
	 * @name:  esClusterIP
	 * @description:  elastic search cluster ip
	*/
	public final static String esClusterIP = "esClusterIP";
	
	/*
	 * @name:  esClusterPort
	 * @description:  elastic search cluster port
	*/
	public final static String esClusterPort = "esClusterPort";
	
	/*
	 * @name: esIndex 
	 * @description:  elastic search index
	 */
	public final static String esIndex = "esIndex";
	
	/*
	 * @name: esType
	 * @description:  elastic search type
	 */
	public final static String esType = "esType";
	
	/*
	 * @name: batchSize
	 * @description: elasticsearch batch size
	 */
	public final static String batchSize = "batchSize";
	
}

plugin_job_template.json

{
    "name": "esreader",
    "parameter": {
        "esClusterName": "",
        "esClusterIP": "",
        "esClusterPort": "",
        "esIndex": "",
        "esType": "",
        "batchSize": ""
    }
}


plugin.json

{
    "name": "esreader",
    "class": "com.alibaba.datax.plugin.reader.esreader.ESReader",
    "description": {
        "useScene": "only for developer test.",
        "mechanism": "use datax framework to transport elastic search data to channel.",
        "warn": "Never use it in your real job."
    },
    "developer": "wulin"
}

3、根據python bin/datax.py -r esreader -w hdfswriter生成一個job/es_to_hdfs.json檔案,填寫相關內容。

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "esreader", 
                    "parameter": {
                        "batchSize": "1000", 
                        "esClusterIP": "192.168.0.114", 
                        "esClusterName": "elasticsearch", 
                        "esClusterPort": "9300", 
                        "esIndex": "data", 
                        "esType": "t1"
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [{"name":"data","type":"string"}], 
                        "defaultFS": "hdfs://192.168.0.114:9000", 
			"compress": "gzip",
                        "fieldDelimiter": ",", 
                        "fileName": "esdata", 
                        "fileType": "text", 
                        "path": "/user/data/es", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

4、執行python bin/datax.py job/es_to_hdfs.json