5.大資料學習之旅——hadoop-HDFS
NameNode
檢視edits檔案:
hdfs oev -i edits_0000000000000000022-0000000000000000023 -o edits.xml
檢視fsimage檔案:
hdfs oiv -i fsimage_0000000000000000024 -o fsimage.xml -p XML
fsimage_0000000000000000023.md5 — 用於校驗的
VERSION
Namenode:clusterID=CID-e7c4b16d-dc36-4cca-ac67-3d28aac9dfeb
Datanode:clusterID=CID-e7c4b16d-dc36-4cca-ac67-3d28aac9dfeb — 用於進
行標記的。
SecondaryNameNode
合併過程
- 將edits和fsimage檔案通過網路拷貝到Secondarynamenode上
- 在namenode產生一個edits.new記錄合併期間的操作
- 拷貝完成之後,fsimage就會將其中的資料存到secondarynamenode的內
存中 - 將edits的操作更新到secondarynamenode的記憶體中
- 更新完成之後,將記憶體中的資料寫到fsimage.ckpt檔案中
- 通過網路將fsimage.ckpt拷貝到namenode中
- 將fsimage.ckpt重新命名為fsimage,並且將edits.new也重新命名為edits
合併過程
SecondaryNameNode只負責進行資料的合併,不是Namenode的熱備,但是
也能起到一定的備份作用,會產生資料的丟失
Datanode
儲存資料,並且是以資料塊的形式來儲存。
datanode儲存namenode對應的clusterID以確定當前的datanode歸哪一個
namenode管理
datanode每隔一段時間(3s)會主動向namenode傳送心跳資訊(節點狀態,節
點資料)
如果namenode超過了10min沒有收到datanode的心跳,則認為這個datanode
產生lost,那麼namenode就會將這個datanode上的資料copy到其他節點上
HDFS的操作流程
讀取資料
- 客戶端發起RPC請求訪問Namenode
- namenode會查詢元資料,找到這個檔案的儲存位置對應的資料塊的信
息。 - namenode將檔案對應的資料塊的節點地址的全部或者部分放入一個佇列
中然後返回 - client收到這個資料塊對應的節點地址
- client會從佇列中取出第一個資料塊對應的節點地址,會從這些節點地址
中選擇一個最近的節點進行讀取 - 將Block讀取之後,對Block進行checksum的驗證,如果驗證失敗,說明數
據塊產生損壞,那麼client會向namenode傳送資訊說明該節點上的資料
塊損壞,然後從其他節點中再次讀取這個資料塊 - 驗證成功,則從佇列中取出下一個Block的地址,然後繼續讀取
- 當把這一次的檔案塊全部讀完之後,client會向namenode要下一批block
的地址 - 當把檔案全部讀取完成之後,從client會向namenode傳送一個讀取完畢
的訊號,namenode就會關閉對應的檔案
寫流程
- client傳送RPC請求給namenode
- namenode接收到請求之後,對請求進行驗證,例如這個請求中的檔案是
否存在,再例如許可權驗證 - 如果驗證通過,namenode確定檔案的大小以及分塊的數量,確定對應的
節點(會去找磁碟空間相對空閒的節點來使用),將節點地址放入佇列
中返回 - 客戶端收到地址之後,從佇列中依次取出節點地址,然後資料塊依次放
入對應的節點地址上 - 客戶端在寫完之後就會向namenode傳送寫完資料的訊號,namenode會
給客戶端返回一個關閉檔案的訊號 - datanode之間將會通過管道進行自動的備份,保證複本數量
刪除流程
- Client發起RPC請求到namenode
- namenode收到請求之後,會將這個操作記錄到edits中,然後將資料從內
存中刪掉,給客戶端返回一個刪除成功的訊號 - 客戶端收到訊號之後認為資料已經刪除,實際上資料依然存在datanode上
- 當datanode向namenode傳送心跳訊息(節點狀態,節點資料)的時候,
namenode就會檢查這個datanode中的節點資料,發現datanode中的節點
資料在namenode中的元資料中沒有記錄,namenode就會做出響應,就
會命令對應的datanode刪除指定的資料
hdfs的操作指令
hadoop fs -put a.txt /a.txt - 上傳檔案
hadoop fs -mkdir /hadoopnode01 - 建立目錄
hadoop fs -rm /hadoop-2.7.1_64bit.tar.gz - 刪除檔案
hadoop fs -rmdir /hadoopnode01 - 刪除目錄
hadoop fs -rmr /a - 遞迴刪除
hadoop fs -get /a.txt /home - 下載
hadoop fs -ls / - 檢視
hadoop fs -lsr / - 遞迴檢視
hadoop fs -cat /a.txt - 檢視內容
hadoop fs -tail /a.txt - 產看檔案的最後1000個位元組
hadoop fs -mv /a/a.txt /a/b.txt - 移動或者重新命名
hadoop fs -touchz /demo.txt - 建立空檔案
hadoop fs -getmerge /a demo.txt - 合併下載
Hadoop外掛的使用:
-
將hadoopbin_for_hadoop2.7.1.zip解壓
-
複製hadoop-eclipse-plugin-2.7.1.jar 2.
-
找到eclipse的安裝目錄,將外掛複製到eclipse安裝目錄下的子目錄plugins
中 -
重啟eclipse
-
在eclipse中指定hadoop的安裝目錄
-
Window -show view
-
需要新增環境變數:HADOOP_USER_NAME=使用者名稱
HDFS的API操作
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class HDFSDemo {
@Test
public void get() throws IOException, URISyntaxException {
// 建立連線
// uri - 連線地址
// conf - 配置
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.60.132:9000"), conf);
// 獲取讀取檔案的流
InputStream in = fs.open(new Path("/a/a.txt"));
FileOutputStream out = new FileOutputStream("a.txt");
// 讀取資料
byte[] bs = new byte[1024];
int len = -1;
while ((len = in.read(bs)) != -1) {
out.write(bs, 0, len);
}
out.close();
}
@Test
public void put() throws IOException, URISyntaxException, InterruptedException {
// 建立連線
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.60.132:9000"), conf, "root");
OutputStream out = fs.create(new Path("/a/c.txt"));
FileInputStream in = new FileInputStream("c.txt");
// byte[] bs = new byte[1024];
// int len = -1;
// while ((len = in.read(bs)) != -1) {
// out.write(bs, 0, len);
// }
//
// in.close();
IOUtils.copyBytes(in, out, conf);
}
}
MapReduce
概述:
是hadoop中的分散式的計算框架
MapReduce意味著在計算過程中實際分為兩大步:Map過程和Reduce過
程。
案例:統計檔案中每一個單詞出現的次數
WordCountMapper
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// 案例:統計每一個單詞出現的次數
// KEYIN - 這一行的偏移量 ---
// VALUEIN - 讀取到這一行的資料
// KEYOUT - 輸出的鍵的型別 --- 這一行中的每一個單詞
// VALUEOUT - 輸出的值的型別 --- 表示這一行中這個單詞出現的次數
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable >{
@Override
// key -- 這一行的偏移量
// value --- 這一行的資料
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
System.err.println(value);
// 獲取到一行資料
String str = value.toString();
// 以空格為單位進行切分
String[] arr = str.split(" ");
for (String s : arr) {
context.write(new Text(s), new LongWritable(1));
}
}
}
WordCountReducer
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
// 案例:統計每一個單詞出現的次數
// KEYIN --- 單詞
// VALUEIN --- 次數
// KEYOUT --- 單詞
// VALUEOUT
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable > {
@Override
protected void reduce(Text key, Iterable<LongWritable> arg1,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long sum = 0;
// 計算總的次數
for (LongWritable count : arg1) {
sum += count.get();
}
// 將單詞以及對應的總次數寫出
context.write(key, new LongWritable(sum));
}
}
WorldCountDriver
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WorldCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 需要給當前的job指定執行的入口類
job.setJarByClass(WorldCountDriver.class);
// 指定要執行的mapper
job.setMapperClass(WordCountMapper.class);
// 指定mapper執行完成之後的結果型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 指定要執行的reducer
job.setReducerClass(WordCountReducer.class);
// 指定reducer執行完成之後的結果型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 指定讀取的檔案
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.60.132:9000/mr/words.txt"));
// 指定寫出的路徑
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.60.132:9000/result"));
job.waitForCompletion(true);
}
}