Hadoop——HDFS以及MapReduce的一些總結
1、HDFS API簡單操作檔案
package cn.ctgu.hdfs;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
/*
* 讀取hdfs檔案
*
* */
public class TestHDFS {
//讀取hdfs檔案
@Test
public void readFile() throws IOException {
//註冊url流處理器工廠(hdfs)
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
//獲取hdfs的URL連線,即檔案所在地
URL url=new URL("hdfs://172.25.11.200:8020/user/hadoop/test.txt");
//開啟連線
URLConnection conn=url.openConnection();
//獲取輸入流
InputStream is=conn.getInputStream();
byte[]buf=new byte[is.available()];
is.read(buf);
is.close();
String str=new String(buf);
System.out.println(str);
}
/*
* 通過hadoop API訪問檔案
* */
@Test
public void readFileByAPI() throws IOException {
//建立配置檔案
Configuration conf=new Configuration();
//設定配置檔案
conf.set("fs.defaultFS", "hdfs://172.25.11.200:8020/");
//獲取檔案系統的一個控制代碼
FileSystem fs=FileSystem.get(conf);
//獲取檔案路徑
Path p=new Path("/user/hadoop/test.txt");
//開啟檔案
FSDataInputStream fis=fs.open(p);
byte[]buf=new byte[1024];
int len=-1;
ByteArrayOutputStream baos=new ByteArrayOutputStream();
while((len=fis.read(buf))!=-1) {
baos.write(buf,0,len);
}
fis.close();
baos.close();
System.out.println(new String(baos.toByteArray()));
}
@Test
public void readFileByAPI2() throws IOException {
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://172.25.11.200:8020/");
FileSystem fs=FileSystem.get(conf);
Path p=new Path("/user/hadoop/test.txt");
FSDataInputStream fis=fs.open(p);
ByteArrayOutputStream baos=new ByteArrayOutputStream();
//將fis中的資料寫入到baos中,緩衝區為1024個位元組
IOUtils.copyBytes(fis,baos,1024);
System.out.println(new String(baos.toByteArray()));
}
/*
*
* 許可權問題很重要,比如刪除檔案,我們在window下的角色就是Adminastrator
* 創建出來的就是Adminastrator的,刪除也因此只能刪除它的
*mkdir,建立目錄
*
*
* */
@Test
public void mkdir() throws IOException {
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://172.25.11.200:8020/");
FileSystem fs=FileSystem.get(conf);
fs.mkdirs(new Path("/user/hadoop/myhadoop"));
}
/*
*建立檔案
*
*
* */
@Test
public void putFile() throws IOException {
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://172.25.11.200:8020/");
FileSystem fs=FileSystem.get(conf);
FSDataOutputStream out=fs.create(new Path("/user/hadoop/myhadoop/a.txt"));
out.write("helloworld".getBytes());
out.close();
}
/*
*remove,刪除檔案
*
*
* */
@Test
public void removeFile() throws IOException {
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://172.25.11.200:8020/");
FileSystem fs=FileSystem.get(conf);
Path p=new Path("/user/hadoop/myhadoop/a.txt");
fs.delete(p,true);
}
//定製副本數和塊大小
@Test
public void testWriter2() throws IOException {
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(conf);
//第一個引數是路徑,第二引數是是否重寫配置,第三個引數是寫入檔案的緩衝區大小,第四個引數是副本數量,第五個引數是設定塊大小
//這裡設定為5個位元組,預設最小塊為1M,可以在hdfs-site.xml中配置,我們可以從hadoop的預設配置檔案hdfs-default.xml中找到相應的配置
//檔案太小影響效能,尤其是影響namenode,因為一個檔案namenode需要152個位元組對它進行索引,並且它是儲存在記憶體中的,所以當小檔案太多的話需要耗費巨大記憶體從而導致效能降低
FSDataOutputStream fout=fs.create(new Path("/user/hadoop/hello.txt"),
true,1024,(short)2,5);
}
}
2、MapReduce
2.1 MapReduce程式碼剖析
WCMapper.java
package cn.ctgu.Hdfs;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Created by Administrator on 2018/6/9.
*/
public class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text keyout=new Text();
IntWritable valueOut=new IntWritable();
String[] arr=value.toString().split(" ");
for(String s:arr){
keyout.set(s);
valueOut.set(1);
context.write(keyout,valueOut);
}
}
}
WCReducer.java
package cn.ctgu.Hdfs;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Created by Administrator on 2018/6/9.
*/
public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count=0;
for(IntWritable iw:values){
count=count+iw.get();
}
context.write(key,new IntWritable(count));
}
}
WCApp.java
package cn.ctgu.Hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* Created by Administrator on 2018/6/9.
*/
public class WCApp {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
//conf.set("fs.defaultFS","file:///");//如果在本地windows上執行就得加上這個
Job job=Job.getInstance(conf);
//設定job的各種屬性
job.setJobName("WCApp");//作業名稱
job.setJarByClass(WCApp.class);//搜尋類
job.setInputFormatClass(TextInputFormat.class);//設定輸入格式
//新增輸入路徑
FileInputFormat.addInputPath(job,new Path(args[0]));
//設定輸出路徑
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setMapperClass(WCMapper.class);//mapper類
job.setReducerClass(WCReducer.class);//reduce類
job.setNumReduceTasks(1);//reduce個數
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);//
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
}
}
Local模式執行MR流程
1.建立外部Job(mapreduce.Job),設定配置資訊
2.通過jobsubmitter將job.xml + split等檔案寫入臨時目錄
3.通過jobSubmitter提交job給localJobRunner,
4.LocalJobRunner將外部Job 轉換成成內部Job
5.內部Job執行緒,開放分執行緒執行job
6.job執行執行緒分別計算Map和reduce任務資訊並通過執行緒池孵化新執行緒執行MR任務。
在hadoop叢集上執行mrjob
1.匯出jar包,通過執行pom.xml中的package命令
maven
2.丟到hadoop
3.執行hadoop jar命令
$>hadoop jar HdfsDemo-1.0-SNAPSHOT.jar cn.ctgu.hdfs.mr.WCApp hdfs:/user/hadoop/wc/data hdfs:/user/hadoop/wc/out
2.2 Job的檔案split計演算法則
HDFS的寫入是以資料包的形式寫入的
切片
hdfs 切片計算方式(壓縮檔案不可切割,在原始碼中通過isSplitable(job,path)判斷)
getFormatMinSplitSize() = 1
//最小值(>=1) 1 0
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//最大值(<= Long.Max , mapreduce.input.fileinputformat.split.maxsize=)
long maxSize = getMaxSplitSize(job);
//得到block大小
long blockSize = file.getBlockSize();
//minSplit maxSplit blockSize
//Math.max(minSize, Math.min(maxSize, blockSize));
在最小切片、塊大小、最大切片之間取中間值,如果不配置最大、最小切片則取塊大小
LF : Line feed,換行符
private static final byte CR = '\r';
private static final byte LF = '\n';
3、壓縮檔案
檔案壓縮有兩大好處:減少儲存檔案所需要的磁碟空間,加速資料在網路和磁碟上的傳輸。壓縮分為RECORD,即針對每條記錄進行壓縮,以及BLOCK,即針對一組記錄進行壓縮,這種效率更高,一般推薦這種壓縮。可以使用mapred.output.compression.type屬性來設定壓縮格式。
package cn.ctgu.Hdfs.compress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Test;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
/**
* Created by Administrator on 2018/6/11.
*
* 測試壓縮
*/
public class TestCompress {
@Test
public void deflateCompress() throws IOException {
//deflate編解碼器類
Class codecClass= DeflateCodec.class;
//例項物件
CompressionCodec codec=(CompressionCodec)ReflectionUtils.newInstance(codecClass,new Configuration());
//建立檔案輸出流
FileOutputStream fos=new FileOutputStream("F:/comp/1.deflate");
//得到壓縮流
CompressionOutputStream zipout=codec.createOutputStream(fos);
IOUtils.copyBytes(new FileInputStream("F:/徐培成——spark/大資料Spark.docx"),zipout,1024);
zipout.close();
}
@Test
public void deflateCompress1() throws IOException {
Class[]zipClass={
DeflateCodec.class,
GzipCodec.class,
BZip2Codec.class,
SnappyCodec.class,
Lz4Codec.class
};
//壓縮
for(Class c:zipClass){
zip(c);
}
//解壓縮
for(Class c:zipClass){
unzip(c);
}
}
//壓縮測試
public void zip(Class codecClass) throws IOException {
long start=System.currentTimeMillis();
//例項化物件
CompressionCodec codec= (CompressionCodec) ReflectionUtils.newInstance(codecClass,new Configuration());
//建立檔案輸出流,得到預設副檔名
FileOutputStream fos=new FileOutputStream("F:/comp/b."+codec.getDefaultExtension());
//得到壓縮流
CompressionOutputStream zipOut=codec.createOutputStream(fos);
IOUtils.copyBytes(new FileInputStream("F:/徐培成——spark/大資料Spark.docx"),zipOut,1024);
zipOut.close();
System.out.println(codecClass.getSimpleName()+":"+(System.currentTimeMillis()-start));
}
//解壓縮
public void unzip(Class codecClass) throws IOException {
long start=System.currentTimeMillis();
//例項化物件
CompressionCodec codec= (CompressionCodec) ReflectionUtils.newInstance(codecClass,new Configuration());
//建立檔案輸入流
FileInputStream fis=new FileInputStream("F:/comp/b"+codec.getDefaultExtension());
//得到壓縮流
CompressionInputStream zipIn=codec.createInputStream(fis);
IOUtils.copyBytes(zipIn,new FileOutputStream("F:/comp/b"+codec.getDefaultExtension()+".txt"),1024);
zipIn.close();
System.out.println(codecClass.getSimpleName()+":"+(System.currentTimeMillis()-start));
}
}
1.Windows
原始檔大小:82.8k
原始檔型別:txt
壓縮效能比較
| DeflateCodec GzipCodec BZip2Codec Lz4Codec SnappyCodec |結論
------------|-------------------------------------------------------------------|----------------------
壓縮時間(ms)| 450 7 196 44 不支援 |Gzip > Lz4 > BZip2 > Deflate
------------|-------------------------------------------------------------------|----------------------
解壓時間(ms)| 444 66 85 33 |lz4 > gzip > bzip2 > Deflate
------------|-------------------------------------------------------------------|----------------------
佔用空間(k) | 19k 19k 17k 31k 不支援 |Bzip > Deflate = Gzip > Lz4
| |
2.CentOS
原始檔大小:82.8k
原始檔型別:txt
| DeflateCodec GzipCodec BZip2Codec Lz4Codec LZO SnappyCodec |結論
------------|---------------------------------------------------------------------------|----------------------
壓縮時間(ms)| 944 77 261 53 77 |Gzip > Lz4 > BZip2 > Deflate
------------|---------------------------------------------------------------------------|----------------------
解壓時間(ms)| 67 66 106 52 73 |lz4 > gzip > Deflate> lzo > bzip2
------------|---------------------------------------------------------------------------|----------------------
佔用空間(k) | 19k 19k 17k 31k 34k |Bzip > Deflate = Gzip > Lz4 > lzo
對於SnappyCodec出現java.lang.RuntimeException: native snappy library not available: this version of libhadoop was built without snappy support.
在centos上使用yum安裝snappy壓縮庫檔案
$>sudo yum search snappy #檢視是否有snappy庫
$>sudo yum install -y snappy.x86_64 #根據centos型號安裝snappy壓縮解壓縮庫
LZO不是Apache下的,所以需要另外新增依賴
1.在pom.xml引入lzo依賴
<dependency>
<groupId>org.anarres.lzo</groupId>
<artifactId>lzo-hadoop</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
</dependency>
2.在centos上安裝lzo庫
$>sudo yum -y install lzo
3.使用mvn命令下載工件中的所有依賴,進入pom.xml所在目錄,執行cmd:
mvn -DoutputDirectory=./lib -DgroupId=cn.ctgu -DartifactId=HdfsDemo -Dversion=1.0-SNAPSHOT dependency:copy-dependencies
4.在lib下存放依賴所有的第三方jar
5.找出lzo-hadoop.jar + lzo-core.jar複製到centos的hadoop的響應目錄下。
$>cp lzo-hadoop.jar lzo-core.jar /soft/hadoop/shared/hadoop/common/lib
4、遠端除錯
開發環境:windows的Intellij idea+centos的叢集環境
遠端除錯:先將在Intellij idea中編輯好的程式碼打包傳送到centos上,然後通過Intellij idea的遠端除錯機制,在同樣的程式碼上打斷點對centos上的執行程式碼進行除錯,和本機執行程式碼除錯機制並無差別,只是程式碼實質執行在centos上,錯誤訊息也顯示在centos上,windows的Intellji idea只是起個打斷點除錯的作用,兩者如此結合使得除錯變得更加的便捷。
1.設定伺服器java vm的-agentlib:jdwp選項,在centos上進行如下設定:
set JAVA_OPTS=%JAVA_OPTS% -agentlib:jdwp=transport=dt_socket,address=8888,server=y,suspend=n
下面的是上面的簡化,一般用下面的這種方式
export HADOOP_CLIENT_OPTS=-agentlib:jdwp=transport=dt_socket,address=8888,server=y,suspend=y
2.在server啟動java程式
hadoop jar HdfsDemo.jar cn.ctgu.hdfs.mr.compress.TestCompress
3.server會暫掛在8888.
Listening ...
4.客戶端通過遠端除錯連線到遠端主機的8888.
Run-->Edit Configurations-->Remotes
Intellij idea遠端除錯設定
當連線成功之後,centos會一直處於監聽狀態
當Intellij idea上顯示連線成功,就可以進行相應的除錯動作
在pom.xml中引入新的外掛(maven-antrun-plugin),實現檔案的複製.
<build>
<plugins>
...
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.8</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<echo>---------開始複製jar包到共享目錄下----------</echo>
<delete file="D:\downloads\bigdata\data\HdfsDemo-1.0-SNAPSHOT.jar"></delete>
<copy file="target/HdfsDemo-1.0-SNAPSHOT.jar" toFile="D:\downloads\bigdata\data\HdfsDemo.jar">
</copy>