1. 程式人生 > >Hadoop——HDFS以及MapReduce的一些總結

Hadoop——HDFS以及MapReduce的一些總結

1、HDFS API簡單操作檔案

package cn.ctgu.hdfs;

import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;


import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import
org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.junit.Test; /* * 讀取hdfs檔案 * * */ public class TestHDFS { //讀取hdfs檔案 @Test
public void readFile() throws IOException { //註冊url流處理器工廠(hdfs) URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); //獲取hdfs的URL連線,即檔案所在地 URL url=new URL("hdfs://172.25.11.200:8020/user/hadoop/test.txt"); //開啟連線 URLConnection conn=url.openConnection(); //獲取輸入流
InputStream is=conn.getInputStream(); byte[]buf=new byte[is.available()]; is.read(buf); is.close(); String str=new String(buf); System.out.println(str); } /* * 通過hadoop API訪問檔案 * */ @Test public void readFileByAPI() throws IOException { //建立配置檔案 Configuration conf=new Configuration(); //設定配置檔案 conf.set("fs.defaultFS", "hdfs://172.25.11.200:8020/"); //獲取檔案系統的一個控制代碼 FileSystem fs=FileSystem.get(conf); //獲取檔案路徑 Path p=new Path("/user/hadoop/test.txt"); //開啟檔案 FSDataInputStream fis=fs.open(p); byte[]buf=new byte[1024]; int len=-1; ByteArrayOutputStream baos=new ByteArrayOutputStream(); while((len=fis.read(buf))!=-1) { baos.write(buf,0,len); } fis.close(); baos.close(); System.out.println(new String(baos.toByteArray())); } @Test public void readFileByAPI2() throws IOException { Configuration conf=new Configuration(); conf.set("fs.defaultFS", "hdfs://172.25.11.200:8020/"); FileSystem fs=FileSystem.get(conf); Path p=new Path("/user/hadoop/test.txt"); FSDataInputStream fis=fs.open(p); ByteArrayOutputStream baos=new ByteArrayOutputStream(); //將fis中的資料寫入到baos中,緩衝區為1024個位元組 IOUtils.copyBytes(fis,baos,1024); System.out.println(new String(baos.toByteArray())); } /* * * 許可權問題很重要,比如刪除檔案,我們在window下的角色就是Adminastrator * 創建出來的就是Adminastrator的,刪除也因此只能刪除它的 *mkdir,建立目錄 * * * */ @Test public void mkdir() throws IOException { Configuration conf=new Configuration(); conf.set("fs.defaultFS", "hdfs://172.25.11.200:8020/"); FileSystem fs=FileSystem.get(conf); fs.mkdirs(new Path("/user/hadoop/myhadoop")); } /* *建立檔案 * * * */ @Test public void putFile() throws IOException { Configuration conf=new Configuration(); conf.set("fs.defaultFS", "hdfs://172.25.11.200:8020/"); FileSystem fs=FileSystem.get(conf); FSDataOutputStream out=fs.create(new Path("/user/hadoop/myhadoop/a.txt")); out.write("helloworld".getBytes()); out.close(); } /* *remove,刪除檔案 * * * */ @Test public void removeFile() throws IOException { Configuration conf=new Configuration(); conf.set("fs.defaultFS", "hdfs://172.25.11.200:8020/"); FileSystem fs=FileSystem.get(conf); Path p=new Path("/user/hadoop/myhadoop/a.txt"); fs.delete(p,true); } //定製副本數和塊大小 @Test public void testWriter2() throws IOException { Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(conf); //第一個引數是路徑,第二引數是是否重寫配置,第三個引數是寫入檔案的緩衝區大小,第四個引數是副本數量,第五個引數是設定塊大小 //這裡設定為5個位元組,預設最小塊為1M,可以在hdfs-site.xml中配置,我們可以從hadoop的預設配置檔案hdfs-default.xml中找到相應的配置 //檔案太小影響效能,尤其是影響namenode,因為一個檔案namenode需要152個位元組對它進行索引,並且它是儲存在記憶體中的,所以當小檔案太多的話需要耗費巨大記憶體從而導致效能降低 FSDataOutputStream fout=fs.create(new Path("/user/hadoop/hello.txt"), true,1024,(short)2,5); } }

這裡寫圖片描述

2、MapReduce

2.1 MapReduce程式碼剖析

WCMapper.java

package cn.ctgu.Hdfs;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


/**
 * Created by Administrator on 2018/6/9.
 */
public class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text keyout=new Text();
        IntWritable valueOut=new IntWritable();
        String[] arr=value.toString().split(" ");
        for(String s:arr){
            keyout.set(s);
            valueOut.set(1);
            context.write(keyout,valueOut);
        }
    }
}

WCReducer.java

package cn.ctgu.Hdfs;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Created by Administrator on 2018/6/9.
 */
public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable>{

    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count=0;
        for(IntWritable iw:values){
            count=count+iw.get();
        }
        context.write(key,new IntWritable(count));
    }
}

WCApp.java

package cn.ctgu.Hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


import java.io.IOException;

/**
 * Created by Administrator on 2018/6/9.
 */
public class WCApp {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf=new Configuration();
        //conf.set("fs.defaultFS","file:///");//如果在本地windows上執行就得加上這個
        Job job=Job.getInstance(conf);
        //設定job的各種屬性
        job.setJobName("WCApp");//作業名稱
        job.setJarByClass(WCApp.class);//搜尋類
        job.setInputFormatClass(TextInputFormat.class);//設定輸入格式

        //新增輸入路徑
        FileInputFormat.addInputPath(job,new Path(args[0]));
        //設定輸出路徑
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        job.setMapperClass(WCMapper.class);//mapper類
        job.setReducerClass(WCReducer.class);//reduce類

        job.setNumReduceTasks(1);//reduce個數

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);//
        job.setOutputValueClass(IntWritable.class);

        job.waitForCompletion(true);

    }
}

這裡寫圖片描述

Local模式執行MR流程

1.建立外部Job(mapreduce.Job),設定配置資訊
2.通過jobsubmitter將job.xml + split等檔案寫入臨時目錄
3.通過jobSubmitter提交job給localJobRunner,
4.LocalJobRunner將外部Job 轉換成成內部Job
5.內部Job執行緒,開放分執行緒執行job
6.job執行執行緒分別計算Map和reduce任務資訊並通過執行緒池孵化新執行緒執行MR任務。

在hadoop叢集上執行mrjob

1.匯出jar包,通過執行pom.xml中的package命令

maven 

2.丟到hadoop
3.執行hadoop jar命令

$>hadoop jar HdfsDemo-1.0-SNAPSHOT.jar cn.ctgu.hdfs.mr.WCApp hdfs:/user/hadoop/wc/data hdfs:/user/hadoop/wc/out

這裡寫圖片描述

2.2 Job的檔案split計演算法則

HDFS的寫入是以資料包的形式寫入的
這裡寫圖片描述

切片
hdfs 切片計算方式(壓縮檔案不可切割,在原始碼中通過isSplitable(job,path)判斷)

getFormatMinSplitSize() = 1

    //最小值(>=1)                          1                       0
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

    //最大值(<= Long.Max , mapreduce.input.fileinputformat.split.maxsize=)
    long maxSize = getMaxSplitSize(job);

    //得到block大小
    long blockSize = file.getBlockSize();

    //minSplit maxSplit blockSize
    //Math.max(minSize, Math.min(maxSize, blockSize));
在最小切片、塊大小、最大切片之間取中間值,如果不配置最大、最小切片則取塊大小

LF : Line feed,換行符

private static final byte CR = '\r';
private static final byte LF = '\n';

這裡寫圖片描述
這裡寫圖片描述
3、壓縮檔案
檔案壓縮有兩大好處:減少儲存檔案所需要的磁碟空間,加速資料在網路和磁碟上的傳輸。壓縮分為RECORD,即針對每條記錄進行壓縮,以及BLOCK,即針對一組記錄進行壓縮,這種效率更高,一般推薦這種壓縮。可以使用mapred.output.compression.type屬性來設定壓縮格式。

package cn.ctgu.Hdfs.compress;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Test;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

/**
 * Created by Administrator on 2018/6/11.
 *
 * 測試壓縮
 */
public class TestCompress {
    @Test
    public void deflateCompress() throws IOException {
        //deflate編解碼器類
        Class codecClass= DeflateCodec.class;
        //例項物件
        CompressionCodec codec=(CompressionCodec)ReflectionUtils.newInstance(codecClass,new Configuration());
        //建立檔案輸出流
        FileOutputStream fos=new FileOutputStream("F:/comp/1.deflate");
        //得到壓縮流
        CompressionOutputStream zipout=codec.createOutputStream(fos);
        IOUtils.copyBytes(new FileInputStream("F:/徐培成——spark/大資料Spark.docx"),zipout,1024);
        zipout.close();
    }
    @Test
    public void deflateCompress1() throws IOException {
        Class[]zipClass={
                DeflateCodec.class,
                GzipCodec.class,
                BZip2Codec.class,
                SnappyCodec.class,
                Lz4Codec.class
        };
        //壓縮
        for(Class c:zipClass){
            zip(c);
        }
        //解壓縮
        for(Class c:zipClass){
            unzip(c);
        }
    }
    //壓縮測試
    public void zip(Class codecClass) throws IOException {
        long start=System.currentTimeMillis();
        //例項化物件
        CompressionCodec codec= (CompressionCodec) ReflectionUtils.newInstance(codecClass,new Configuration());
        //建立檔案輸出流,得到預設副檔名
        FileOutputStream fos=new FileOutputStream("F:/comp/b."+codec.getDefaultExtension());
        //得到壓縮流
        CompressionOutputStream zipOut=codec.createOutputStream(fos);
        IOUtils.copyBytes(new FileInputStream("F:/徐培成——spark/大資料Spark.docx"),zipOut,1024);
        zipOut.close();
        System.out.println(codecClass.getSimpleName()+":"+(System.currentTimeMillis()-start));
    }
    //解壓縮
    public void unzip(Class codecClass) throws IOException {
        long start=System.currentTimeMillis();
        //例項化物件
        CompressionCodec codec= (CompressionCodec) ReflectionUtils.newInstance(codecClass,new Configuration());
        //建立檔案輸入流
        FileInputStream fis=new FileInputStream("F:/comp/b"+codec.getDefaultExtension());
        //得到壓縮流
        CompressionInputStream zipIn=codec.createInputStream(fis);
        IOUtils.copyBytes(zipIn,new FileOutputStream("F:/comp/b"+codec.getDefaultExtension()+".txt"),1024);
        zipIn.close();
        System.out.println(codecClass.getSimpleName()+":"+(System.currentTimeMillis()-start));
    }
}
1.Windows
        原始檔大小:82.8k
        原始檔型別:txt
        壓縮效能比較
                    |   DeflateCodec    GzipCodec   BZip2Codec  Lz4Codec    SnappyCodec |結論
        ------------|-------------------------------------------------------------------|----------------------
        壓縮時間(ms)|   450             7           196         44          不支援     |Gzip > Lz4 > BZip2 > Deflate
        ------------|-------------------------------------------------------------------|----------------------
        解壓時間(ms)|   444             66          85          33                      |lz4  > gzip > bzip2 > Deflate
        ------------|-------------------------------------------------------------------|----------------------
        佔用空間(k) |   19k             19k         17k         31k         不支援     |Bzip > Deflate = Gzip > Lz4
                    |                                                                   |

    2.CentOS
        原始檔大小:82.8k
        原始檔型別:txt
                    |   DeflateCodec    GzipCodec   BZip2Codec  Lz4Codec    LZO     SnappyCodec |結論
        ------------|---------------------------------------------------------------------------|----------------------
        壓縮時間(ms)|   944             77          261         53          77              |Gzip > Lz4 > BZip2 > Deflate
        ------------|---------------------------------------------------------------------------|----------------------
        解壓時間(ms)|   67              66          106         52          73                  |lz4  > gzip > Deflate> lzo > bzip2 
        ------------|---------------------------------------------------------------------------|----------------------
        佔用空間(k) |   19k             19k         17k         31k         34k                 |Bzip > Deflate = Gzip > Lz4 > lzo

對於SnappyCodec出現java.lang.RuntimeException: native snappy library not available: this version of libhadoop was built without snappy support.
在centos上使用yum安裝snappy壓縮庫檔案
$>sudo yum search snappy               #檢視是否有snappy庫
$>sudo yum install -y snappy.x86_64        #根據centos型號安裝snappy壓縮解壓縮庫

LZO不是Apache下的,所以需要另外新增依賴

1.在pom.xml引入lzo依賴

<dependency>
        <groupId>org.anarres.lzo</groupId>
        <artifactId>lzo-hadoop</artifactId>
        <version>1.0.0</version>
        <scope>compile</scope>
</dependency>

2.在centos上安裝lzo庫

$>sudo yum -y install lzo

3.使用mvn命令下載工件中的所有依賴,進入pom.xml所在目錄,執行cmd:

mvn -DoutputDirectory=./lib -DgroupId=cn.ctgu -DartifactId=HdfsDemo -Dversion=1.0-SNAPSHOT dependency:copy-dependencies

4.在lib下存放依賴所有的第三方jar

5.找出lzo-hadoop.jar + lzo-core.jar複製到centos的hadoop的響應目錄下。

$>cp lzo-hadoop.jar lzo-core.jar /soft/hadoop/shared/hadoop/common/lib

4、遠端除錯

開發環境:windows的Intellij idea+centos的叢集環境
遠端除錯:先將在Intellij idea中編輯好的程式碼打包傳送到centos上,然後通過Intellij idea的遠端除錯機制,在同樣的程式碼上打斷點對centos上的執行程式碼進行除錯,和本機執行程式碼除錯機制並無差別,只是程式碼實質執行在centos上,錯誤訊息也顯示在centos上,windows的Intellji idea只是起個打斷點除錯的作用,兩者如此結合使得除錯變得更加的便捷。

1.設定伺服器java vm的-agentlib:jdwp選項,在centos上進行如下設定:

set JAVA_OPTS=%JAVA_OPTS% -agentlib:jdwp=transport=dt_socket,address=8888,server=y,suspend=n
下面的是上面的簡化,一般用下面的這種方式
export HADOOP_CLIENT_OPTS=-agentlib:jdwp=transport=dt_socket,address=8888,server=y,suspend=y

2.在server啟動java程式

hadoop jar HdfsDemo.jar cn.ctgu.hdfs.mr.compress.TestCompress

3.server會暫掛在8888.

Listening ...

4.客戶端通過遠端除錯連線到遠端主機的8888.

Run-->Edit Configurations-->Remotes

Intellij idea遠端除錯設定

這裡寫圖片描述

當連線成功之後,centos會一直處於監聽狀態
這裡寫圖片描述

當Intellij idea上顯示連線成功,就可以進行相應的除錯動作

在pom.xml中引入新的外掛(maven-antrun-plugin),實現檔案的複製.

<build>
            <plugins>
                ...
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-antrun-plugin</artifactId>
                    <version>1.8</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>run</goal>
                            </goals>
                            <configuration>
                                <tasks>
                                    <echo>---------開始複製jar包到共享目錄下----------</echo>
                                    <delete file="D:\downloads\bigdata\data\HdfsDemo-1.0-SNAPSHOT.jar"></delete>
                                    <copy file="target/HdfsDemo-1.0-SNAPSHOT.jar" toFile="D:\downloads\bigdata\data\HdfsDemo.jar">
                                    </copy>