1. 程式人生 > >5.大資料學習之旅——hadoop-HDFS

5.大資料學習之旅——hadoop-HDFS

NameNode

檢視edits檔案:
hdfs oev -i edits_0000000000000000022-0000000000000000023 -o edits.xml
檢視fsimage檔案:
hdfs oiv -i fsimage_0000000000000000024 -o fsimage.xml -p XML
fsimage_0000000000000000023.md5 — 用於校驗的
VERSION
Namenode:clusterID=CID-e7c4b16d-dc36-4cca-ac67-3d28aac9dfeb
Datanode:clusterID=CID-e7c4b16d-dc36-4cca-ac67-3d28aac9dfeb — 用於進
行標記的。

SecondaryNameNode

在這裡插入圖片描述

合併過程

  1. 將edits和fsimage檔案通過網路拷貝到Secondarynamenode上
  2. 在namenode產生一個edits.new記錄合併期間的操作
  3. 拷貝完成之後,fsimage就會將其中的資料存到secondarynamenode的內
    存中
  4. 將edits的操作更新到secondarynamenode的記憶體中
  5. 更新完成之後,將記憶體中的資料寫到fsimage.ckpt檔案中
  6. 通過網路將fsimage.ckpt拷貝到namenode中
  7. 將fsimage.ckpt重新命名為fsimage,並且將edits.new也重新命名為edits
    合併過程
    SecondaryNameNode只負責進行資料的合併,不是Namenode的熱備,但是
    也能起到一定的備份作用,會產生資料的丟失

Datanode

儲存資料,並且是以資料塊的形式來儲存。
datanode儲存namenode對應的clusterID以確定當前的datanode歸哪一個
namenode管理
datanode每隔一段時間(3s)會主動向namenode傳送心跳資訊(節點狀態,節
點資料)
如果namenode超過了10min沒有收到datanode的心跳,則認為這個datanode
產生lost,那麼namenode就會將這個datanode上的資料copy到其他節點上

HDFS的操作流程

讀取資料

  1. 客戶端發起RPC請求訪問Namenode
  2. namenode會查詢元資料,找到這個檔案的儲存位置對應的資料塊的信
    息。
  3. namenode將檔案對應的資料塊的節點地址的全部或者部分放入一個佇列
    中然後返回
  4. client收到這個資料塊對應的節點地址
  5. client會從佇列中取出第一個資料塊對應的節點地址,會從這些節點地址
    中選擇一個最近的節點進行讀取
  6. 將Block讀取之後,對Block進行checksum的驗證,如果驗證失敗,說明數
    據塊產生損壞,那麼client會向namenode傳送資訊說明該節點上的資料
    塊損壞,然後從其他節點中再次讀取這個資料塊
  7. 驗證成功,則從佇列中取出下一個Block的地址,然後繼續讀取
  8. 當把這一次的檔案塊全部讀完之後,client會向namenode要下一批block
    的地址
  9. 當把檔案全部讀取完成之後,從client會向namenode傳送一個讀取完畢
    的訊號,namenode就會關閉對應的檔案
    在這裡插入圖片描述

寫流程

  1. client傳送RPC請求給namenode
  2. namenode接收到請求之後,對請求進行驗證,例如這個請求中的檔案是
    否存在,再例如許可權驗證
  3. 如果驗證通過,namenode確定檔案的大小以及分塊的數量,確定對應的
    節點(會去找磁碟空間相對空閒的節點來使用),將節點地址放入佇列
    中返回
  4. 客戶端收到地址之後,從佇列中依次取出節點地址,然後資料塊依次放
    入對應的節點地址上
  5. 客戶端在寫完之後就會向namenode傳送寫完資料的訊號,namenode會
    給客戶端返回一個關閉檔案的訊號
  6. datanode之間將會通過管道進行自動的備份,保證複本數量
    在這裡插入圖片描述

刪除流程

  1. Client發起RPC請求到namenode
  2. namenode收到請求之後,會將這個操作記錄到edits中,然後將資料從內
    存中刪掉,給客戶端返回一個刪除成功的訊號
  3. 客戶端收到訊號之後認為資料已經刪除,實際上資料依然存在datanode上
  4. 當datanode向namenode傳送心跳訊息(節點狀態,節點資料)的時候,
    namenode就會檢查這個datanode中的節點資料,發現datanode中的節點
    資料在namenode中的元資料中沒有記錄,namenode就會做出響應,就
    會命令對應的datanode刪除指定的資料

hdfs的操作指令

hadoop fs -put a.txt /a.txt - 上傳檔案
hadoop fs -mkdir /hadoopnode01 - 建立目錄
hadoop fs -rm /hadoop-2.7.1_64bit.tar.gz - 刪除檔案
hadoop fs -rmdir /hadoopnode01 - 刪除目錄
hadoop fs -rmr /a - 遞迴刪除
hadoop fs -get /a.txt /home - 下載
hadoop fs -ls / - 檢視
hadoop fs -lsr / - 遞迴檢視
hadoop fs -cat /a.txt - 檢視內容
hadoop fs -tail /a.txt - 產看檔案的最後1000個位元組
hadoop fs -mv /a/a.txt /a/b.txt - 移動或者重新命名
hadoop fs -touchz /demo.txt - 建立空檔案
hadoop fs -getmerge /a demo.txt - 合併下載

Hadoop外掛的使用:

  1. 將hadoopbin_for_hadoop2.7.1.zip解壓

  2. 複製hadoop-eclipse-plugin-2.7.1.jar 2.

  3. 找到eclipse的安裝目錄,將外掛複製到eclipse安裝目錄下的子目錄plugins

  4. 重啟eclipse

  5. 在eclipse中指定hadoop的安裝目錄
    在這裡插入圖片描述

  6. Window -show view
    在這裡插入圖片描述

  7. 需要新增環境變數:HADOOP_USER_NAME=使用者名稱

HDFS的API操作

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class HDFSDemo {

	@Test
	public void get() throws IOException, URISyntaxException {

		// 建立連線
		// uri - 連線地址
		// conf - 配置
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://192.168.60.132:9000"), conf);

		// 獲取讀取檔案的流
		InputStream in = fs.open(new Path("/a/a.txt"));

		FileOutputStream out = new FileOutputStream("a.txt");

		// 讀取資料
		byte[] bs = new byte[1024];
		int len = -1;
		while ((len = in.read(bs)) != -1) {
			out.write(bs, 0, len);
		}

		out.close();
	}

	@Test
	public void put() throws IOException, URISyntaxException, InterruptedException {

		// 建立連線
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://192.168.60.132:9000"), conf, "root");

		OutputStream out = fs.create(new Path("/a/c.txt"));

		FileInputStream in = new FileInputStream("c.txt");

		// byte[] bs = new byte[1024];
		// int len = -1;
		// while ((len = in.read(bs)) != -1) {
		// out.write(bs, 0, len);
		// }
		//
		// in.close();
		IOUtils.copyBytes(in, out, conf);
	}

}

MapReduce


概述:

是hadoop中的分散式的計算框架
MapReduce意味著在計算過程中實際分為兩大步:Map過程和Reduce過
程。
在這裡插入圖片描述
案例:統計檔案中每一個單詞出現的次數
WordCountMapper

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// 案例:統計每一個單詞出現的次數
// KEYIN - 這一行的偏移量 --- 
// VALUEIN - 讀取到這一行的資料
// KEYOUT - 輸出的鍵的型別 --- 這一行中的每一個單詞
// VALUEOUT - 輸出的值的型別 --- 表示這一行中這個單詞出現的次數
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable >{

	@Override
	// key -- 這一行的偏移量
	// value --- 這一行的資料
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
			throws IOException, InterruptedException {
		
		System.err.println(value);
		// 獲取到一行資料
		String str = value.toString();
		// 以空格為單位進行切分
		String[] arr = str.split(" ");
		
		for (String s : arr) {
			context.write(new Text(s), new LongWritable(1));
		}
	}
	
}

WordCountReducer

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

// 案例:統計每一個單詞出現的次數
// KEYIN --- 單詞
// VALUEIN --- 次數
// KEYOUT --- 單詞
// VALUEOUT
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable > {
	
	@Override
	protected void reduce(Text key, Iterable<LongWritable> arg1,
			Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
	
		long sum = 0;
		// 計算總的次數
		for (LongWritable count : arg1) {
			sum += count.get();
		}
		
		// 將單詞以及對應的總次數寫出
		context.write(key, new LongWritable(sum));
		
	}

}

WorldCountDriver

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WorldCountDriver {
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		// 需要給當前的job指定執行的入口類
		job.setJarByClass(WorldCountDriver.class);
		
		// 指定要執行的mapper
		job.setMapperClass(WordCountMapper.class);
		
		// 指定mapper執行完成之後的結果型別
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		
		// 指定要執行的reducer
		job.setReducerClass(WordCountReducer.class);
		
		// 指定reducer執行完成之後的結果型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		
		// 指定讀取的檔案
		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.60.132:9000/mr/words.txt"));
		// 指定寫出的路徑
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.60.132:9000/result"));
		
		job.waitForCompletion(true);
	}

}

上一篇 5.大資料學習之旅——hadoop-簡介及偽分散式安裝