1. 程式人生 > >ES-Hadoop學習之ES和HDFS資料交換

ES-Hadoop學習之ES和HDFS資料交換

ES作為強大的搜尋引擎,HDFS是分散式檔案系統。ES可以將自身的Document匯入到HDFS中用作備份,ES也可以將儲存在HDFS上的結構化檔案匯入為ES的中的Document。而ES-Hadoop正是這兩者之間的一個connector

1,將資料從ES匯出到HDFS

1.1,資料準備,在ES中建立Index和Type,並建立document。在我的例子中,Index是mydata,type是person,建立了兩條如下圖所示的document


1.2 在專案中引入ES-Hadoop庫

<dependency>
  <groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId> <version>5.5.2</version> </dependency>

值得注意的是,上面的dependency只會引入ES-Hadoop相關的Jar包,和Hadoop相關的包,例如hadoop-common, hadoop-hdfs等等,依然還需要新增依賴。

1.3,建立從ES到Hadoop的資料遷移的Mapper類

package com.wjm.es_hadoop.example1;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import org.elasticsearch.hadoop.mr.LinkedMapWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; class E2HMapper01 extends Mapper<Text, LinkedMapWritable, Text, LinkedMapWritable> { private static final Logger LOG
= LoggerFactory.getLogger(E2HMapper01.class); @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override protected void map(Text key, LinkedMapWritable value, Context context) throws IOException, InterruptedException { LOG.info("key {} value {}", key, value); context.write(key, value); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } }

這個Mapper非常簡單,它並沒有對從ES獲取的資料進行任何的處理,只是寫到了context中。map方法中,引數key的值,就是ES中document的id的值,引數value是一個LinkedMapWritable,它包含的就是一個document的內容。只是在這個mapper中,我們沒有處理document,而是直接輸出。

1.4,建立從ES到Hadoop的資料遷移的Job類

package com.wjm.es_hadoop.example1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.LinkedMapWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class E2HJob01 {
    private static Logger LOG = LoggerFactory.getLogger(E2HJob01.class);
    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
//ElasticSearch節點
conf.set("es.nodes", "192.168.8.194:9200");
//ElaticSearch Index/Type
conf.set("es.resource", "mydata/person/");
            if (args.length != 1) {
                LOG.error("error : " + args.length);
System.exit(2);
}
            Job job = Job.getInstance(conf, "JOBE2H01");
job.setJarByClass(E2HJob01.class);
job.setInputFormatClass(EsInputFormat.class);
job.setMapperClass(E2HMapper01.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LinkedMapWritable.class);
FileOutputFormat.setOutputPath(job, new Path(args[0]));
System.out.println(job.waitForCompletion(true));
} catch (Exception e) {
            LOG.error(e.getMessage(), e);
}
    }
}

這個Job有兩點需要注意一下:

1,它沒有reducer,因為就是資料的透傳,不需要reduce過程。

2,InputFormatClass被設定為EsInputFormat,正是這個類,負責將從ES讀出的資料,轉換成mapper的輸入引數(Text,LinkedMapWritable)

1.5,打包執行

以下面的命令來啟動MapReduce任務:

hadoop jar es-hadoop-1.0-SNAPSHOT.jar com.wjm.es_hadoop.example1.E2HJob01 hdfs://bigdata-191:8020/wangjinming

執行完這個命令之後,看到/wangjinming目錄下面產生了檔案



檢視其中一個檔案,會發現資料被分為兩列,第一列為id,第二列為document的內容


另外,在執行hadoop jar命令的時候,需要把es-hadoop的jar包放到hadoop jar能訪問到的classpath下面。我查了一些方法都沒成功,最後使用了一個笨方法,用hadoop classpath方法檢視hadoop的classpath有哪些,然後將es-hadoop相關的jar包copy到其中一個目錄下。

2,將資料從HDFS中匯入到ES中。

2.1,資料準備。建立下面的這樣一個檔案並put到hdfs檔案系統中(我放在hdfs://bigdata-191:8020/input/perosn)

{"id":"3", "name":"jerry", "age":"23", "info":"hello hadoop"}
{"id":"4", "name":"russell", "age":"15", "info":"hello elasticsearch"}

2.2,Mapper編寫

package com.wjm.es_hadoop.example1;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
class H2EMapper01 extends Mapper<LongWritable, Text, NullWritable, Text> {

    @Override
protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
}

    @Override
public void run(Context context) throws IOException, InterruptedException {
        super.run(context);
}

    @Override
protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        context.write(NullWritable.get(), value);
}

    @Override
protected void cleanup(Context context) throws IOException,InterruptedException {
        super.cleanup(context);
}

}

這個Mapper也很簡單,只是把從HDFS中讀取到的資料透傳給ES。因為Mapper的input是一個HDFS檔案,所以,mapper的入參跟其他從hdfs多資料的mapper沒有任何區別。寫入到context的是,入參的key值是沒有意義的,所以忽略掉,直接把Text型別的value寫入到context就可以了。

2.3,編寫job

package com.wjm.es_hadoop.example1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class H2EJob01 {
    private static Logger LOG = LoggerFactory.getLogger(H2EJob01.class);
    public static void main(String args[]) {
        try {
            Configuration conf = new Configuration();
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
//ElasticSearch節點
conf.set("es.nodes", "192.168.8.194:9200");
//ElaticSearch Index/Type
conf.set("es.resource", "mydata/person/");
//Hadoop上的資料格式為JSON,可以直接匯入
conf.set("es.input.json", "yes");
conf.set("es.mapping.id", "id");
            if (args.length != 1) {
                LOG.error("error : " + args.length);
System.exit(2);
}
            Job job = Job.getInstance(conf, "51JOBH2E");
job.setJarByClass(H2EJob01.class);
job.setMapperClass(H2EMapper01.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(EsOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
System.out.println(job.waitForCompletion(true));
} catch (Exception e) {
            LOG.error(e.getMessage(), e);
}
    }

}

這個Job有幾個需要注意的地方

es.input.json引數設定為true告訴ES-Hadoop,mapper輸出的結果是一個json格式的Text。

es.mapping.id引數指定json物件中那種field對應的值為es中document的id

OutputFormatClass被設定為EsOutputFormat,正是這個類負責將MapReduce的輸出結果(一個json格式的Text)轉換為ES的ID和document的內容

2.4,執行命令

hadoop jar es-hadoop-1.0-SNAPSHOT.jar com.wjm.es_hadoop.example1.H2EJob01 hdfs://bigdata-191:8020/input/person

命令成功執行之後,可以通過ES的命令看到資料已經在ES中建立了相應的document