1. 程式人生 > >MapReduce入門(二)合併小檔案

MapReduce入門(二)合併小檔案

hadoop為什麼要合併小檔案?

        小檔案是指檔案size小於HDFS上block大小的檔案。這樣的檔案會給hadoop的擴充套件性和效能帶來嚴重問題。首先,在HDFS中,任何block,檔案或者目錄在記憶體中均以物件的形式儲存,每個物件約佔150byte,如果有1000 0000個小檔案,每個檔案佔用一個block,則namenode大約需要2G空間。如果儲存1億個檔案,則namenode需要20G空間(見參考資料[1][4][5])。這樣namenode記憶體容量嚴重製約了叢集的擴充套件。 其次,訪問大量小檔案速度遠遠小於訪問幾個大檔案。HDFS最初是為流式訪問大檔案開發的,如果訪問大量小檔案,需要不斷的從一個datanode跳到另一個datanode,嚴重影響效能。最後,處理大量小檔案速度遠遠小於處理同等大小的大檔案的速度。每一個小檔案要佔用一個slot,而task啟動將耗費大量時間甚至大部分時間都耗費在啟動task和釋放task上。

一、建立MergeSmallFileJob 類:用於實現合併小檔案的任務(2M一下屬於小檔案) 

package cn.itxiaobai;

import com.google.common.io.Resources;
import com.utils.CDUPUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.Iterator;

/**
 * 合併小檔案的任務(2M一下屬於小檔案)
 */
public class MergeSmallFileJob {

    public static class MergeSmallFileMapper extends Mapper<LongWritable,Text,Text,Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
           //將檔名作為key,內容作為value輸出
           //1.獲取檔名
            FileSplit inputSplit = (FileSplit) context.getInputSplit();
            String fileName = inputSplit.getPath().getName();
            //列印檔名以及與之對應的內容
            context.write(new Text(fileName),value);
        }
    }

    public static class MergeSmallFileReduce extends Reducer<Text,Text,Text,Text>{
        /**
         *
         * @param key:檔名
         * @param values:一個檔案的所有內容
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //將迭代器中的內容拼接
            Iterator<Text> iterator = values.iterator();
            //使用StringBuffer
            StringBuffer stringBuffer = new StringBuffer();
            while (iterator.hasNext()){
                stringBuffer.append(iterator.next()).append(",");
            }
            //列印
            context.write(key,new Text(stringBuffer.toString()));
        }
    }

    public static class MyJob{
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration coreSiteConf = new Configuration();
            coreSiteConf.addResource(Resources.getResource("core-site-local.xml"));
            //設定一個任務
            Job job = Job.getInstance(coreSiteConf, "my small merge big file");
            //設定job的執行類
            job.setJarByClass(MyJob.class);

            //設定Map和Reduce處理類
            job.setMapperClass(MergeSmallFileMapper.class);
            job.setReducerClass(MergeSmallFileReduce.class);

            //map輸出型別
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            //設定job/reduce輸出型別
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            FileSystem fileSystem = FileSystem.get(coreSiteConf);
            //listFiles:可以迭代便利檔案
            RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path("/"), true);
            while (listFiles.hasNext()) {
                LocatedFileStatus fileStatus = listFiles.next();
                Path filesPath = fileStatus.getPath();
                if (!fileStatus.isDirectory()) {
                    //判斷大小 及格式
                    if (fileStatus.getLen() < 2 * 1014 * 1024 && filesPath.getName().contains(".txt")) {
                        //檔案輸入路徑
                        FileInputFormat.addInputPath(job,filesPath);
                    }
                }
            }

            //刪除存在目錄
            CDUPUtils.deleteFileName("/mymergeout");

            FileOutputFormat.setOutputPath(job, new Path("/mymergeout"));
            //執行任務
            boolean flag = job.waitForCompletion(true);
            if (flag){
                System.out.println("檔案讀取內容如下:");
                CDUPUtils.readContent("/mymergeout/part-r-00000");
            }else {
                System.out.println("檔案載入失敗....");
            }

        }
    }
}
二、裡面用到自己寫的工具類CDUPUtils :用於刪除已存在目錄以及閱讀檔案內容
package com.utils;

import com.google.common.io.Resources;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.util.ArrayList;

public class CDUPUtils {
    //刪除已經存在在hdfs上面的檔案檔案
    public static void deleteFileName(String path) throws IOException {
        //將要刪除的檔案
        Path fileName = new Path(path);
        Configuration entries = new Configuration();
        //解析core-site-master2.xml檔案
        entries.addResource(Resources.getResource("core-site-local.xml"));
        //coreSiteConf.set(,);--在這裡可以新增配置檔案
        FileSystem fileSystem = FileSystem.get(entries);
        if (fileSystem.exists(fileName)){
            System.out.println(fileName+"已經存在,正在刪除它...");
            boolean flag = fileSystem.delete(fileName, true);
            if (flag){
                System.out.println(fileName+"刪除成功");
            }else {
                System.out.println(fileName+"刪除失敗!");
                return;
            }
        }
        //關閉資源
        fileSystem.close();
    }

    //讀取檔案內容
    public static void readContent(String path) throws IOException {
        //將要讀取的檔案路徑
        Path fileName = new Path(path);
        ArrayList<String> returnValue = new ArrayList<String>();
        Configuration configuration = new Configuration();
        configuration.addResource(Resources.getResource("core-site-local.xml"));
        //獲取客戶端系統檔案
        FileSystem fileSystem = FileSystem.get(configuration);
        //open開啟檔案--獲取檔案的輸入流用於讀取資料
        FSDataInputStream inputStream = fileSystem.open(fileName);
        InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
        //一行一行的讀取資料
        LineNumberReader lineNumberReader = new LineNumberReader(inputStreamReader);
        //定義一個字串變數用於接收每一行的資料
        String str = null;
        //判斷何時沒有資料
        while ((str=lineNumberReader.readLine())!=null){
            returnValue.add(str);
        }
        //列印資料到控制檯
        System.out.println("MapReduce演算法操作的檔案內容如下:");
        for (String read :
                returnValue) {
            System.out.println(read);
        }
        //關閉資源
        lineNumberReader.close();
        inputStream.close();
        inputStreamReader.close();
    }
}

 配置檔案:cort-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://master2:9000</value>
    </property>
    <property>
        <name>fs.hdfs.impl</name>
        <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
    </property>
</configuration>

pom中新增的依賴 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zhiyou100</groupId>
    <artifactId>mrdemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <org.apache.hadoop.version>2.7.5</org.apache.hadoop.version>
    </properties>

    <!--分散式計算-->
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${org.apache.hadoop.version}</version>
        </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>${org.apache.hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
                <version>${org.apache.hadoop.version}</version>
            </dependency>

        <!--分散式儲存-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${org.apache.hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${org.apache.hadoop.version}</version>
        </dependency>

        <!--資料庫驅動-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.46</version>
        </dependency>
        </dependencies>
</project>

在本地直接執行(右擊Run)測試