1. 程式人生 > >DataX學習筆記-Writer外掛開發(續)

DataX學習筆記-Writer外掛開發(續)

之前那篇筆記基於的DataX版本比較低,現換成git上最新版本的DataX重新開發基於將資料寫入ElasticSearch的Writer外掛

1、檢出DataX原始碼(git clone https://github.com/alibaba/DataX.git DataX),匯入專案,新建一個eswriter的maven專案進行外掛開發。

2、在DataX安裝目錄的plugins/writer目錄下新建eswriter目錄,目錄下包含plugin_job_template.json、plugin.json、eswriter-0.0.1-SNAPSHOT.jar,同時在目錄下建立一個libs目錄,存放相關依賴的jar檔案。

相關程式碼:

package com.alibaba.datax.plugin.writer.eswriter;

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.google.gson.Gson;

public class ESWriter extends Writer {

	public static class Job extends Writer.Job {
		
		private Configuration originalConfiguration = null;
		
		@Override
		public void init() {
			this.originalConfiguration = super.getPluginJobConf();
		}
		
		@Override
		public void prepare() {
			super.prepare();
		}
		
		@Override
		public void preCheck() {
			super.preCheck();
		}
		
		@Override
		public void preHandler(Configuration jobConfiguration) {
			super.preHandler(jobConfiguration);
		}
		
		@Override
		public void post() {
			super.post();
		}
		
		@Override
		public void postHandler(Configuration jobConfiguration) {
			super.postHandler(jobConfiguration);
		}

		@Override
		public void destroy() {
			
		}

		@Override
		public List<Configuration> split(int mandatoryNumber) {
			List<Configuration> writerSplitConfiguration = new ArrayList<Configuration>();
			for (int i = 0; i < mandatoryNumber; i++) {
				writerSplitConfiguration.add(this.originalConfiguration);
			}
			return writerSplitConfiguration;
		}
		
	}
	
	public static class Task extends Writer.Task {
		
		private Configuration writerSliceConfiguration = null;
		
		private String esClusterName = null;
		
		private String esClusterIP = null;

		private Integer esClusterPort = null;
		
		private String esIndex = null;
		
		private String esType = null;
		
		private String attributeNameString = null;
		
		private String attributeNameSplit = null;
		
		private String[] attributeNames = null;
		
		private String className = null;
		
		private Gson gson = null;
		
		private TransportClient client = null;
		
		private Integer batchSize = null;
		
		private static final Logger LOG = LoggerFactory.getLogger(Task.class);

		@Override
		public void init() {
			this.writerSliceConfiguration = super.getPluginJobConf();
			this.esClusterName = writerSliceConfiguration.getString(Key.esClusterName);
			this.esClusterIP = writerSliceConfiguration.getString(Key.esClusterIP);
			this.esClusterPort = writerSliceConfiguration.getInt(Key.esClusterPort, 9300);
			this.esIndex = writerSliceConfiguration.getString(Key.esIndex);
			this.esType = writerSliceConfiguration.getString(Key.esType);
			this.attributeNameString = writerSliceConfiguration.getString(Key.attributeNameString);
			this.attributeNameSplit = writerSliceConfiguration.getString(Key.attributeNameSplit, ",");
			attributeNames = attributeNameString.split(attributeNameSplit);
			this.className = writerSliceConfiguration.getString(Key.className);
			this.batchSize = writerSliceConfiguration.getInt(Key.batchSize, 1000);
			this.gson = new Gson();
		}
		
		@Override
		public void prepare() {
			super.prepare();
			Settings settings = Settings.builder().put("cluster.name", esClusterName)
					.put("client.tansport.sniff", true).build();
			client = TransportClient.builder().settings(settings).build();
			List<EsServerAddress> serverAddress = new ArrayList<EsServerAddress>();
			String[] esClusterIPs = esClusterIP.contains(",") ? 
					esClusterIP.split(",") : new String[]{esClusterIP};
			for (int i = 0, len = esClusterIPs.length; i < len; i++) {
				serverAddress.add(new EsServerAddress(esClusterIPs[i], esClusterPort));
			}
			for (EsServerAddress address : serverAddress) {
				client.addTransportAddress(new InetSocketTransportAddress(
						new InetSocketAddress(address.getHost(), address.getPort())));
			}
		}
		
		@Override
		public void preCheck() {
			super.preCheck();
		}
		
		@Override
		public void preHandler(Configuration jobConfiguration) {
			super.preHandler(jobConfiguration);
		}
		
		@Override
		public void post() {
			super.post();
		}
		
		@Override
		public void postHandler(Configuration jobConfiguration) {
			super.postHandler(jobConfiguration);
		}

		@Override
		public void destroy() {
			client.close();
		}

		@Override
		public void startWrite(RecordReceiver lineReceiver) {
			List<Record> writerBuffer = new ArrayList<Record>(this.batchSize);
			Record record = null;
			while ((record = lineReceiver.getFromReader()) != null) {
				writerBuffer.add(record);
				if (writerBuffer.size() >= this.batchSize) {
					bulkSaveOrUpdateES(writerBuffer);
					writerBuffer.clear();
				}
			}
			if (!writerBuffer.isEmpty()) {
				bulkSaveOrUpdateES(writerBuffer);
				writerBuffer.clear();
			}
		}
		
		private void bulkSaveOrUpdateES(List<Record> writerBuffer) {
			Record record = null;
			Object object = null;
			Map<String, String> attributeValueMap = null;
			List<ESEntity> entities = new ArrayList<ESEntity>();
			try {
				for (int w = 0, wlen = writerBuffer.size(); w < wlen; w++) {
					record = writerBuffer.get(w);
					object = Class.forName(className).newInstance();
					int fieldNum = record.getColumnNumber();
					if (null != record && fieldNum > 0) {
						attributeValueMap = new HashMap<String, String>();
						for (int i = 0; i < fieldNum; i++) {
							attributeValueMap.put(attributeNames[i].toLowerCase(), record.getColumn(i).asString());
						}
						for (Class<?> superClass = object.getClass(); 
								superClass != Object.class; superClass = superClass.getSuperclass()) {
				        	Field[] fields = superClass.getDeclaredFields();
				    		for (int i = 0, len = fields.length; i < len; i++) {
								Field field = fields[i];
								String fieldNameLowerCase = field.getName().toLowerCase();
								if (!attributeValueMap.containsKey(fieldNameLowerCase)) continue;
								String valueString = attributeValueMap.get(fieldNameLowerCase);
								Object value = convertValueByFieldType(field.getType(), valueString);
								if (field.isAccessible()) {
						            field.set(object, value);
						        } else {
						            field.setAccessible(true);
						            field.set(object, value);
						            field.setAccessible(false);
						        }
				    		}
				        }
						entities.add((ESEntity) object);
					}
				}
			} catch (Exception e) {
				LOG.error(e.getMessage(), e);
			}
			bulkSaveOrUpdate(entities, esIndex, esType);
		}
		
		private void bulkSaveOrUpdate(List<ESEntity> entities, String database, String table) {
			if (null == entities || entities.isEmpty()) return;
			BulkRequestBuilder prepareBulk = client.prepareBulk();
			for (ESEntity entity : entities) {
				IndexRequestBuilder irb = client.prepareIndex()
						.setIndex(database).setType(table).setId(entity.get_id());
				entity.remove_id();
				String source = gson.toJson(entity);
				irb.setSource(source);
				prepareBulk.add(irb);
			}
			prepareBulk.execute().actionGet();
		}
		
		private Object convertValueByFieldType(Class<?> type, Object value) {
	    	Object finalValue = value;
	    	if (String.class.isAssignableFrom(type)) {
	    		finalValue = null == value ? "NA" : String.valueOf(value);
			} else if (Boolean.class.isAssignableFrom(type)) {
	    		finalValue = null == value ? Boolean.FALSE : Boolean.parseBoolean(String.valueOf(value));
			} else if (Integer.class.isAssignableFrom(type)) {
	    		finalValue = null == value ? 0 : Integer.parseInt(String.valueOf(value));
			} else if (Long.class.isAssignableFrom(type)) {
				finalValue = null == value ? 0 : Long.parseLong(String.valueOf(value));
			} else if (Float.class.isAssignableFrom(type)) {
				finalValue = null == value ? 0 : Float.parseFloat(String.valueOf(value));
			} else if (Double.class.isAssignableFrom(type)) {
				finalValue = null == value ? 0 : Double.parseDouble(String.valueOf(value));
			} else if (Date.class.isAssignableFrom(type)) {
				try {
					value = null == value ? DateFormat.TIME.get().format(new Date()) : value;
					finalValue = DateFormat.TIME.get().parse(String.valueOf(value));
				} catch (ParseException e) {
					LOG.error(e.getMessage(), e);
				}
			} 
	    	return finalValue;
	    }
		
	}
	
}
package com.alibaba.datax.plugin.writer.eswriter;

public final class Key {
	
	/*
	 * @name:  esClusterName
	 * @description:  elastic search cluster name
	*/
	public final static String esClusterName = "esClusterName";
	
	/*
	 * @name:  esClusterIP
	 * @description:  elastic search cluster ip
	*/
	public final static String esClusterIP = "esClusterIP";
	
	/*
	 * @name:  esClusterPort
	 * @description:  elastic search cluster port
	*/
	public final static String esClusterPort = "esClusterPort";
	
	/*
	 * @name: esIndex 
	 * @description:  elastic search index
	 */
	public final static String esIndex = "esIndex";
	
	/*
	 * @name: esType
	 * @description:  elastic search type
	 */
	public final static String esType = "esType";
	
	/*
	 * @name: attributeNameString
	 * @description:  attribute name list 
	 */
	public final static String attributeNameString = "attributeNameString";
	
	/*
	 * @name: attributeNameSplit
	 * @description: separator to split attribute name string
	 */
	public final static String attributeNameSplit = "attributeNameSplit";
	
	/*
	 * @name: className
	 * @description: qualified class name 
	 */
	public final static String className = "className";
	
	/*
	 * @name: batchSize
	 * @description: commit to elasticsearch batch size
	 */
	public final static String batchSize = "batchSize";
	
}

plugin_job_template.json

{
    "name": "eswriter",
    "parameter": {
        "esClusterName": "",
        "esClusterIP": "192.168.0.1,192.168.0.2",
        "esClusterPort": "9300",
        "esIndex": "user",
        "esType": "student",
        "attributeNameString": "id,userid,name,phone",
        "attributeNameSplit": ",",
        "className": "com.alibaba.datax.plugin.writer.eswriter.Student",
        "batchSize": "1000"
    }
}

plugin.json

{
    "name": "eswriter",
    "class": "com.alibaba.datax.plugin.writer.eswriter.ESWriter",
    "description": {
        "useScene": "only for developer test.",
        "mechanism": "use datax framework to transport data to elasticsearch.",
        "warn": ""
    },
    "developer": "wulin"
}

3、根據python bin/datax.py -r mysqlreader -w eswriter生成一個job/mysql_to_es.json檔案,填寫相關內容。

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id,userid,name,phone"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://192.168.0.114:3306/student?useUnicode=true&characterEncoding=UTF-8"], 
                                "table": ["info"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "test", 
                        "where": "id < 10"
                    }
                }, 
                "writer": {
                    "name": "eswriter", 
                    "parameter": {
                        "attributeNameSplit": ",", 
                        "attributeNameString": "id,userid,name,phone", 
                        "className": "com.alibaba.datax.plugin.writer.eswriter.Student", 
            		"esClusterName": "elasticsearch",
                	"esClusterIP": "192.168.0.105,192.168.0.108",
                	"esClusterPort": "9300",
                        "esIndex": "user", 
                        "esType": "student",
            		"batchSize": "1000"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "10"
            }
        }
    }
}

4、執行python bin/datax.py job/mysql_to_es.json