1. 程式人生 > >hadoop程式設計(2)-準備程式設計和本地測試環境

hadoop程式設計(2)-準備程式設計和本地測試環境

作為開發人員,我們可以暫時忽略叢集等部署環境,首要關注開發環境。本文介紹一種可在IDE上執行\除錯MapReduce程式的方法,方便程式設計師儘快開始大資料MapReduce程式設計。

maven依賴

按規範新建maven專案,下面是我的pom:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>4.0.0</modelVersion> <groupId>org.lanqiao</groupId> <artifactId>bigData</artifactId> <version>1.0</version> <packaging>jar</packaging> <properties> <!--logger--> <slf4j-api.version>1.7.25</slf4j-api.version
>
<logback.version>1.2.3</logback.version> <java.version>1.8</java.version> <!--hadoop-core--> <hadoop-core.version>1.2.1</hadoop-core.version> <hadoop.version>2.6.5</hadoop.version> <junit.version>4.12</junit.version
>
</properties> <dependencies> <!--logger--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j-api.version}</version> </dependency> <!-- Hadoop main client artifact --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- Unit test artifacts --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.6.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.mrunit</groupId> <artifactId>mrunit</artifactId> <version>1.1.0</version> <classifier>hadoop2</classifier> <scope>test</scope> </dependency> <!-- Hadoop test artifact for running mini clusters --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> <version>${hadoop.version}</version> <scope>test</scope> </dependency> </dependencies> <build> <!--打包資原始檔--> <resources> <resource> <directory>src/main/java</directory> <excludes> <exclude>**/*.java</exclude> </excludes> </resource> <resource> <directory>src/main/resources</directory> </resource> </resources> <plugins> <!--編譯外掛--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <!--打包外掛--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>org.lanqiao.mr.WordCount</mainClass> <!--<addClasspath>true</addClasspath>--> <!--<classpathPrefix>lib/</classpathPrefix>--> </manifest> </archive> <classesDirectory> </classesDirectory> </configuration> </plugin> </plugins> </build> </project>

For building MapReduce jobs, you only need to have the hadoop-client dependency, which contains all the Hadoop client-side classes needed to
interact with HDFS and MapReduce. For running unit tests, we use junit, and for writing MapReduce tests, we use mrunit. The hadoop-minicluster library contains the “mini-” clusters that are useful for testing with Hadoop clusters running in a single JVM.

WordCount程式

TokenizerMapper

package org.lanqiao.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.stream.Stream;

/**
 *  Mapper程式繼承Mapper類;
 *  泛型四個型別引數分別是map函式的輸入鍵、輸入值、輸出鍵、輸出值的型別
 */
public  class TokenizerMapper
    extends Mapper<LongWritable, Text, Text, IntWritable> {

  //hadoop重新定義了一些區別於Java原生的資料型別,IntWritable是int的替代
  private final  IntWritable one = new IntWritable(1);
  //Text是String的替代
  private Text word = new Text();

  /*這裡實現資料準備的邏輯
  * 預設採用文字檔案輸入格式,框架將<行偏移量,改行文字>作為此函式的引數*/
  public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //對value(一行文字)按字元分割,使用Java8流式處理,對每個單詞用context寫入<單詞,1>鍵值對
    //map運算完成後會有一個排序合併的過程,即鍵相同的所有value會合併成一個集合
    //所有mapper的整體輸出為<單詞,[1,1,1,1...]>
    Stream.of(value.toString().split("\\s|\\.|,|=")).forEach((e) -> {
      try {
        word.set(e);
        context.write(word, one);
      } catch (IOException e1) {
        e1.printStackTrace();
      } catch (InterruptedException e1) {
        e1.printStackTrace();
      }
    });
  }
}

IntSumReducer

package org.lanqiao.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * Reducer程式繼承Reducer類;
 * */
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

  private static Logger logger = LoggerFactory.getLogger(IntSumReducer.class);

  /**實現資料聚合的邏輯
   * @param key mapper的輸出鍵
   * @param values mapper輸出鍵關聯的資料集和*/
  public void reduce(Text key, Iterable<IntWritable> values,
                     Context context) throws IOException, InterruptedException {
    logger.debug("正在統計單詞" + key.toString());
    int sum = 0;
    //迭代values,把每個value(本例中每個value都是1)累加到sum
    for (Iterator<IntWritable> iter = values.iterator();iter.hasNext();){
      sum+=iter.next().get();
    }
    logger.debug("單詞個數為:" + sum);
    //hadoop只認IntWritable,不認int,這裡將sum包裝為IntWritable
    //一個Reducer例項,只處理一個key,將結果寫入Context
    context.write(key, new IntWritable(sum));
  }
}

編寫MapReduce的單元測試

開發階段,我們並不想啟動hdfs和叢集,不想提交任務到雲,而只是想測試下演算法邏輯,這就需要用到單元測試了。

MRUnit

MRUnit是一款由Couldera公司開發的專門針對Hadoop中編寫MapReduce單元測試的框架。

它可以用於0.18.x版本中的經典org.apache.hadoop.mapred.*的模型,也能在0.20.x版本org.apache.hadoop.mapreduce.*的新模型中使用。

官方的介紹如下:

MRUnit is a unit test library designed to facilitate easy integration between your MapReduce development process and standard development and testing tools such as JUnit. MRUnit contains mock objects that behave like classes you interact with during MapReduce execution (e.g., InputSplit and OutputCollector) as well as test harness “drivers” that test your program’s correctness while maintaining compliance with the MapReduce semantics. Mapper and Reducer implementations can be tested individually, as well as together to form a full MapReduce job.

MRUnit安裝

<dependency>
  <groupId>org.apache.mrunit</groupId>
  <artifactId>mrunit</artifactId>
  <version>1.1.0</version>
  <classifier>hadoop2</classifier>
  <scope>test</scope>
</dependency>

單測的程式碼

package org.lanqiao.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;

public class WordCountTest {

  MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
  ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
  MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;

  @Before
  public void setUp() {
    final TokenizerMapper mapper = new TokenizerMapper();
    final IntSumReducer reducer = new IntSumReducer();
    mapDriver = MapDriver.newMapDriver(mapper);
    reduceDriver = ReduceDriver.newReduceDriver(reducer);
    mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
  }

  @Test
  public void testMapper() throws IOException {
    mapDriver
        .withInput(new LongWritable(0), new Text("zhangsan lisi zhangsan"))
        .withOutput(new Text("zhangsan"), new IntWritable(1))
        .withOutput(new Text("lisi"), new IntWritable(1))
        .withOutput(new Text("zhangsan"), new IntWritable(1))
        .runTest();
  }

  @Test
  public void testReducer() throws IOException {
    reduceDriver
        .withInput(new Text("zhangsan"), Arrays.asList(new IntWritable(1), new IntWritable(1)))
        .withOutput(new Text("zhangsan"), new IntWritable(2))
        .runTest();
  }

  @Test
  public void testMapperReducer() throws IOException {
    mapReduceDriver
        .withInput(new LongWritable(0), new Text("zhangsan lisi zhangsan"))
        // .withInput(new LongWritable(1), new Text("hello  zhangsan"))
        .withOutput(new Text("lisi"), new IntWritable(1))
        .withOutput(new Text("zhangsan"), new IntWritable(2))
        .runTest();
  }
}

這裡我們貼出了針對WordCount的全部測試程式碼。
通過Junit的方式呼叫執行就可以了。這樣就可以實現了在本地沒有叢集環境的方式下快速方便的進行MR功能測試。
這些程式碼都比較簡單,withInput用於傳遞輸入記錄,withOutput設定期望的輸出。如果期望的輸出和真實的輸出不一致,測試用例將失敗。

侷限

通過閱讀MRUnit的原始碼我們會發現:

  1. 不支援MapReduce框架中的分割槽和排序操作:從Map輸出的值經過shuffle處理後直接就匯入Reduce中了。
  2. 不支援Streaming實現的MapReduce操作。

雖然MRUnit有這些侷限,但是足以完成大多數的需求。

參考資料

小資料集,提交本地任務

WordCountDriver

package org.lanqiao.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WordCountDriver extends Configured implements Tool {
  private static Logger logger = LoggerFactory.getLogger(WordCountDriver.class);

  public static void main(String[] args) throws Exception {
    final WordCountDriver driver = new WordCountDriver();
    Configuration conf = new Configuration();
    // 配置檔案
    conf.addResource("hadoop-local.xml");
    driver.setConf(conf);

    //輸入資料的位置,可替換成小資料集樣本
    Path in=new Path("src/main/resources/log4j.properties");
    //輸出資料的位置
    Path out = new Path("output");
    //刪除輸出目錄,因為hadoop不會覆蓋已有的目錄,如果目錄存在會報錯
    FileSystem fs = FileSystem.get(conf);
    fs.delete(out, true);

    //執行任務
    int exitCode = ToolRunner.run(driver, new String[]{in.toString(),out.toString()});
    System.exit(exitCode);
  }

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    // 處理main的引數
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      ToolRunner.printGenericCommandUsage(System.err);
      System.exit(-1);
    }

    String jobName = "word count";
    //job的各種設定
    Job job = Job.getInstance(conf, jobName);//new Job(conf, "word count");
    job.setJarByClass(getClass());

    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));

    return job.waitForCompletion(true) ? 0 : 1;
  }
}

hadoop-local.xml

<?xml version="1.0"?>
<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>file:///</value>
  </property>
  <property>
    <name>mapreduce.framework.name</name>
    <value>local</value>
  </property>
</configuration>

說明

main函式中用Configuration來控制任務該如何執行,在程式碼中寫定了資料的輸入和輸出路徑,最終使用ToolRunner來提交了任務。而Tool介面的run方法主要用於構建Job例項。

由於任務在本地提交,輸入和輸出路徑都是在本地檔案系統中,可以用肉眼檢視輸出路徑下的結果資料。

Driver的單元測試

我們可以很輕鬆地把main方法改寫為一個測試方法:

package org.lanqiao.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class WordCountDriverTest {
  @Test
  public void run() throws Exception {
    final WordCountDriver driver = new WordCountDriver();
    Configuration conf = new Configuration();
    // 配置檔案
    conf.addResource("hadoop-local.xml");
    driver.setConf(conf);

    //輸入資料的位置,可替換成小資料集樣本
    Path in = new Path("src/main/resources/log4j.properties");
    //輸出資料的位置
    Path out = new Path("output");
    //刪除輸出目錄,因為hadoop不會覆蓋已有的目錄,如果目錄存在會報錯
    FileSystem fs = FileSystem.get(conf);
    fs.delete(out, true);

    //執行任務
    int exitCode = ToolRunner.run(driver, new String[]{in.toString(), out.toString()});

    // 這裡使用斷言
    assertThat(exitCode).isEqualTo(0);

    //checkOutput(conf, output); //這裡可以檢查輸出檔案是否符合預期
  }

}

小資料集,提交任務到叢集(偽分佈)

上文,啟動偽分散式叢集的hdfs。

修改Driver的單測程式碼

// 其餘程式碼不必改動
    // 配置檔案
    // conf.addResource("hadoop-local.xml");
    conf.addResource("hadoop-localhost.xml");//本地偽分佈叢集連線資訊
    driver.setConf(conf);

    //輸入資料的位置,可替換成小資料集樣本
    // Path in = new Path("src/main/resources/log4j.properties");
    Path in = new Path("input/log4j.properties");// hdfs檔案路徑
    //輸出資料的位置
    Path out = new Path("output");
//其餘程式碼省略……

我們只需把配置檔案替換成hadoop-localhost.xml,然後注意資料集的位置是hdfs檔案系統上的路徑,而且我們應事先把資料上傳。

hadoop-localhost.xml的內容如下:

<?xml version="1.0"?>
<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000/</value>
  </property>
</configuration>

這必須和我們在上一章中搭建偽分散式環境時用到的配置一樣(地址和埠)。

總結

有很多初學者,一開始就被複雜的hadoop環境迷惑,事實上我們幾乎無需任何環境就能開發MapReduce程式。
引入hadoop-client的API後就可以編寫MapReduce程式,然後利用mrunit-API就可以對Mapper和Reducer進行單元測試。
進一步我們可以用Local Job Runner來分析本地小資料集樣本,並檢視分析結果。
再進一步,我們可以把任務提交給一個叢集。不過本文示例是從開發角度出發的。在生產環節下,沒有IDE,也不會把引數寫死,我們需要將Job打包部署,這些內容待叢集部分再深入研討。
目前,我們能開發MapReduce程式就足夠了。

相關推薦

hadoop程式設計(2)-準備程式設計本地測試環境

作為開發人員,我們可以暫時忽略叢集等部署環境,首要關注開發環境。本文介紹一種可在IDE上執行\除錯MapReduce程式的方法,方便程式設計師儘快開始大資料MapReduce程式設計。 maven依賴 按規範新建maven專案,下面是我的pom:

linux基礎之shell程式設計(2)-條件判斷,算數運算,測試

bash中如果實現條件判斷? 條件測試型別 整數測試 字元測試 檔案測試 條件測試的表示式 有三種 [ expression ] --方括號與表示式之間一定要有一個空格 [[ expression ]] test exp

程式設計中UTC時間本地時間(LocalTime)的區別

 什麼是UTC時間,UTC時間和本地時間(LocalTime)的區別 世界協調時間(Universal Time Coordinated,UTC),GPS 系統中有兩種時間區分,一為UTC,另一為LT(地方時)兩者的區別為時區不同,UTC就是0時區的時間,(LocalTime

使用python腳本代碼本地測試環境部署啟動 V0.1

svn python #!/usr/bin/evn python#encoding=utf-8#author:[email protected]/* */import sysimport osimport shutilimport commandsimport timeimport sub

hadoop 2.7.7 安裝(測試環境部署) hadoop2.x部署

hadoop 2.7.7 安裝(測試環境部署) hadoop2.x部署 系統環境(censtos 6.5 ): 172.16.57.97 namenodeyw 172.16.57.98 datanodeyw1 172.16.57.238 datanodeyw2

https,https的本地測試環境搭建,asp.net結合https的代碼實現,http網站轉換成https網站之後遇到的問題

基本 解密 req with 網址 orm forms 訪問 art 一:什麽是https SSL(Security Socket Layer)全稱是加密套接字協議層,它位於HTTP協議層和TCP協議層之間,用於建立用戶與服務器之間的加密通信,確保所傳遞信息的安全性

MapReduce兩種執行環境介紹:本地測試環境,服務器環境

拷貝 本地測試 servle 第一個 host lang hdf ces ati 本地測試環境(windows):1、在windows下配置hadoop的環境變量2、拷貝debug工具(winutils.exe)到hadoop目錄中的bin目錄,註意winutils.exe

https及https的本地測試環境搭建

asp.net結合https的程式碼實現http網站轉換成https網站,以及之後遇到的問題等。 一:什麼是https SSL(Security Socket Layer)全稱是加密套接字協議層,它位於HTTP協議層和TCP協議層之間,用於建立使用者與伺服器之間的加密通訊,確保所傳遞資訊的安

Robot Framework - 建立本地測試環境

1.2 extern 功能 file crypto edi 源碼 com create 註意:本文內容是以“在Window7系統中安裝本地RobotFrmamework自動化測試環境”為例。 Robot Framework簡介 HomePage:http://robotfr

Elasticsearch 7.x:1、圖解Windows本地測試環境搭建

1.1 JDK配置 (1)下載Windows版本的JDK8,並安裝。 (2)配置JAVA_HOME環境變數 (3)Win+R開啟執行視窗,輸入cmd,開啟命令列視窗 (4)輸入java -version檢視JDK版本 1.2 Windows版本下載 (1)登入官網,分別

MapReduce的兩種執行環境本地測試環境,伺服器環境

本地測試環境(windows):1、在windows下配置hadoop的環境變數2、拷貝debug工具(winutils.exe)到hadoop目錄中的bin目錄,注意winutils.exe的版本要

搭建可除錯的微信公眾平臺本地測試環境

背景: 最近在籌建一個協同開發的開源專案NWechat,專案開始前,有幾樣事情要準備。 1)專案管理系統的選定; 2)原始碼版本控制系統的選定; 3)開發環境的搭建。 4)團隊的建設。 開發環境搭建便是專案啟動前,要做的幾件事情之一。 一、問題是這樣的

基於RESTful風格的controller層(SpringMVC)的測試:MockMVC(SpringBootJUnit4測試環境

個人程式碼 首先附上個人測試過的程式碼: /** * Description * * @author Amethyst * @date 2017/5/2 15:28 //SpringRunner繼承自:SpringJUnit4ClassR

微信開發 網頁應用 本地測試環境的搭建(多圖)

剛剛接觸微信公眾號開發,完全是一個小白。今天搭建本地測試環境真的是弄了整整一下午,一直到剛剛,才弄好。話不多說,下面分享一下整個搭建過程; 1.下載微信web開發者工具 安裝並登入,登陸前要確保已經登陸的賬號有微信公眾號的測試許可權 2.將本地tomcat伺服器埠改成80

微信公眾號本地測試環境搭建(附帶內網穿透工具使用)

前言: 一、現在越來越多的web專案都整合到微信公眾號中,針對小型專案有諸多好處:一是可以免去專門編寫一個用處不大的app,縮短開發週期;二是可以獲取更多的潛在使用者;三是後期維護簡單。 二、在開發階段一般不會將專案直接放到線上伺服器,並且開發

https的本地測試環境搭建 http網站轉換成https網站之後的問題

  http://www.admin5.com/article/20150525/600227.shtml 一:什麼是https   SSL(Security Socket Layer)全稱是加密套接字協議層,它位於HTTP協議層和TCP協議層之間,用於建立使用者與伺

kafka本地測試環境搭建

需求 由於共有云的kafka叢集只對測試機(阡陌機器等)開放,本地是無法訪問的,所以為了開發方便搭建一套kafka的測試環境是有必要的 軟體 kafka_2.11-0.10.0.1 步驟 根據開發環境建立好相配置檔案,開發啟動

利用Docker Compose快速搭建本地測試環境

前言 Compose是一個定義和執行多個Docker應用的工具,用一個YAML(dockder-compose.yml)檔案就能配置我們的應用。然後用一個簡單命令就能啟動所有的服務。Compose編排Docker服務的優勢是在單機測試場景,因為Compose的安裝簡單,開箱即用,yaml的定義也複用了Doc

Hadoop 0.20.2+Ubuntu13.04配置WordCount測試

password trac 讓我 說明 core jvm -m launchpad 1.7 事實上這篇博客寫的有些晚了。之前做過一些總結後來學校的事給忘了,這幾天想又一次拿來玩玩發現有的東西記不住了。翻博客發現居然沒有。好吧,所以趕緊寫一份留著自己用吧。這東西網上有非常

Unity程式設計入門2 地形系統遊戲物件

預設體(prefab) 將某個遊戲物件以檔案的形式儲存起來 作用:用來批量管理遊戲物件 1)修改預設體的縮放,會對所有的遊戲物件都產生影響 2)給一個預設體新增元件,那麼所有的遊戲物件都會被加上這個元件 Select:快速找到這個遊戲物件對應的預設體檔案 Revert:將遊戲物件