1. 程式人生 > >hadoop:hdfs分佈儲存+ mr分佈計算

hadoop:hdfs分佈儲存+ mr分佈計算

  1. hdfs 和RDBMS區別
  2. mr 和 網格計算,志願計算

1,資料儲存

磁碟儲存 解決分散式問題 硬體需求 系統瓶頸
hdfs 磁碟陣列-叢集 硬體故障,多資料來源的資料準確性 普通機 資料傳輸:硬碟頻寬
RDBMS 單磁碟 專業伺服器 磁碟定址:大量資料更新

2,分析計算

適用場 特點 生態圈 結構特點 資料完整性 可擴充套件性 資料集結構化程度
mr PB級資料:批處理 一寫多讀 yarn整合其他分散式程式,hive,saprk 讀模式 半、非結構化
RDBMS GB級資料:實時檢索,更新 持續更新 寫模式 結構化

3,網格計算,志願計算

特點 適用場景
網格計算 分散節點計算+ 網路共享檔案系統 小規模資料:無網路傳輸瓶頸
網格計算 任務單元化+ 分散計算+ 校驗結果 cup密集型:計算時間>傳輸時間
mr 轉移計算+ 資料本地化 作業週期短(小時計),高速區域網內,高配硬體

4,mr 對比linux:awk流處理

1,awk處理: 年度最高溫度統計

在這裡插入圖片描述

2,mapreduce處理:每年最高溫度統計

idea +maven: 新增依賴

		<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>

map方法

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class Map1 extends Mapper<LongWritable, Text, IntWritable,IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //整理的資料輸入:
        //1982,-8
        //1931,-4
        String str = value.toString();
        String[] arr = str.split(",");
        int year=0, tmp=Integer.MIN_VALUE;

        //資料轉換
        try {
             year= Integer.parseInt(arr[0]);
             tmp= Integer.parseInt(arr[1]);
        }catch (Exception e){
            e.printStackTrace();
        }
        //輸出:新資料
        context.write(new IntWritable(year),new IntWritable(tmp));
    }
}

reduce方法

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;

public class Reduce1 extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
    @Override
    protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //輸入資料:1931,【-4,23,4,35,6】
        //聚合資料: 求每組資料中的max(tmp)
        int max=Integer.MIN_VALUE;
        Iterator<IntWritable> it = values.iterator();
        while (it.hasNext()){
            IntWritable next = it.next();
            int tmp = next.get();

            max= (max >tmp) ? max:tmp;
        }
        //輸出: 最高溫度
        context.write(key, new IntWritable(max));
    }
}

app類: 排程組織job

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class App1 {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(App1.class);
        job.setJobName("maxTmp");

        //map,reduce
        job.setMapperClass(Map1.class);
        job.setReducerClass(Reduce1.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        job.setNumReduceTasks(3);
        //輸入輸出
        FileInputFormat.addInputPath(job,new Path("/home/wang/txt/tmp.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/home/wang/tmp-out"));

        //提交等待
        job.waitForCompletion(true);
    }
}