1. 程式人生 > >MapReduce中,從HDFS讀取資料計算後寫入HBase

MapReduce中,從HDFS讀取資料計算後寫入HBase

基於上個例子。做一下簡單的改造。

在原本的例子中,從HDFS中讀取資料計算之後再寫會HDFS裡,現在講Reducer類改造一下,把計算後的資料。寫入到HBase當中,寫完之後我們會使用HBase的命令查詢一下寫入資料。

開啟原有的Reducer類,程式碼如下:

import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.mapreduce.Reducer

class TempReducer : Reducer<Text, IntWritable, Text, IntWritable>() {
    override fun reduce(key: Text?, values: Iterable<IntWritable>?, context: Context?) {
        var maxValue : Int = Int.MIN_VALUE
        var sb : StringBuffer = StringBuffer()
        //取values的最大值
        if (values != null) {
            for (value in values) {
                maxValue = Math.max(maxValue, value.get())
                sb.append(value).append(", ")
            }
        }
        print("Before Reduce:" + key + ", " + sb.toString())
        if (context != null) {
            context.write(key, IntWritable(maxValue))
        }
        print("After Reduce:" + key + ", " + maxValue)
    }
}

現在將程式碼改成如下:

import org.apache.hadoop.hbase.mapreduce.TableReducer
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes

//把原本繼承Reducer改成繼承TableReducer
class TempReducer : TableReducer<Text, IntWritable, Text>() {
    override fun reduce(key: Text?, values: Iterable<IntWritable>?, context: Context?) {
        var maxValue : Int = Int.MIN_VALUE
        var sb : StringBuffer = StringBuffer()
        //取values的最大值
        if (values != null) {
            for (value in values) {
                maxValue = Math.max(maxValue, value.get())
                sb.append(value).append(", ")
            }
        }
        print("Before Reduce:" + key + ", " + sb.toString())
        if (context != null) {
            val put = Put(Bytes.toBytes(key.toString()))
            //一個數據存進一行,列族為content,列為count,列值為數目
            put.add(Bytes.toBytes("content"), Bytes.toBytes("count"),
                    Bytes.toBytes(maxValue.toString()))
            context.write(key, put)
        }
        print("After Reduce:" + key + ", " + maxValue)
    }
}

將繼承類改變一下,使Reducer類繼承TableReducer,對應的KV也跟著修改下。然後原本write的地方跟著例項程式碼改變一下。

接著改變Main類,原有的Main類程式碼如下:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

fun resource(name: String) = TempMain::class.java.getResource(name)

object TempMain{
    fun run (input : String?, outPut : String?) {
        var hadoopConfig : Configuration = Configuration()

        hadoopConfig.set("fs.hdfs.impl",
                org.apache.hadoop.hdfs.DistributedFileSystem::class.java.name)
        hadoopConfig.set("fs.file.impl",
                org.apache.hadoop.fs.LocalFileSystem::class.java.name)
        //Hadoop的配置檔案
        hadoopConfig.addResource(resource("/hdfs-conf/core-site.xml"))
        hadoopConfig.addResource(resource("/hdfs-conf/hdfs-site.xml"))

        var job : Job = Job(hadoopConfig)

        //如果需要打成jar執行,需要下面這句
        job.setJarByClass(TempMain::class.java)

        //job執行作業時輸入和輸出檔案的路徑
        FileInputFormat.addInputPath(job, Path(input))
        FileOutputFormat.setOutputPath(job, Path(outPut))

        //指定自定義的Mapper和Reducer作為兩個階段的任務處理類
        job.mapperClass = TempMapper::class.java
        job.reducerClass = TempReducer::class.java

        //設定最後輸出結果的Key和Value的型別
        job.outputKeyClass = Text::class.java
        job.outputValueClass = IntWritable::class.java

        //執行job,直到完成
        job.waitForCompletion(true)
        print("Finished")
    }
}

fun main(args: Array<String>) {
    //輸入路徑 這邊的IP是Hadoop的Master地址
    val dst = "hdfs://172.16.134.251:9000/test/input.txt"
    //輸出路徑
    val dstOut = "hdfs://172.16.134.251:9000/test/output3"
    TempMain.run(dst, dstOut)
}

其中。改後的所有程式碼如下:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.HTableDescriptor
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import java.io.IOException


fun resource(name: String) = TempMain::class.java.getResource(name)

object TempMain{
    fun run (input : String?, outPut : String?) {
        var hadoopConfig : Configuration = Configuration()

        hadoopConfig.set("fs.hdfs.impl",
                org.apache.hadoop.hdfs.DistributedFileSystem::class.java.name)
        hadoopConfig.set("fs.file.impl",
                org.apache.hadoop.fs.LocalFileSystem::class.java.name)

        hadoopConfig.addResource(resource("/hdfs-conf/core-site.xml"))
        hadoopConfig.addResource(resource("/hdfs-conf/hdfs-site.xml"))

        val tableName = "TempResult"
        hadoopConfig.set(TableOutputFormat.OUTPUT_TABLE, tableName)

        val conf = HBaseConfiguration.create()
        TempMain.createHBaseTable(tableName)
        var job : Job = Job(hadoopConfig, "TempResult table ")

        //如果需要打成jar執行,需要下面這句
        job.setJarByClass(TempMain::class.java)
        job.setNumReduceTasks(3);

        //指定自定義的Mapper和Reducer作為兩個階段的任務處理類
        job.mapperClass = TempMapper::class.java
        job.reducerClass = TempReducer::class.java
        //指定輸出的鍵值型別
        job.mapOutputKeyClass = Text::class.java
        job.mapOutputValueClass = IntWritable::class.java
        //指定格式化輸入的型別
        job.inputFormatClass = TextInputFormat::class.java
        //指定格式化出入的型別
        job.outputFormatClass = TableOutputFormat::class.java
        //設定最後輸出結果的Key和Value的型別
//        job.outputKeyClass = Text::class.java
//        job.outputValueClass = IntWritable::class.java

        //job執行作業時輸入和輸出檔案的路徑
        FileInputFormat.addInputPath(job, Path(input))
        //結果已寫入HBase中,註釋掉一下程式碼
//        FileOutputFormat.setOutputPath(job, Path(outPut))

        //執行job,直到完成
        job.waitForCompletion(true)
        print("Finished")
    }

    @Throws(IOException::class)
    fun createHBaseTable(tableName: String) {
        val htd = HTableDescriptor(tableName)
        val col = HColumnDescriptor("content")
        htd.addFamily(col)
        val conf = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.quorum", "172.16.134.251,172.16.134.250,172.16.134.249,172.16.134.252")
        val admin = HBaseAdmin(conf)
        if (admin.tableExists(tableName)) {
            println("該表已存在!請重新新建其他表...")
            admin.disableTable(tableName)
            admin.deleteTable(tableName)
        }
        println("create new table:" + tableName)
        admin.createTable(htd)
    }
}

fun main(args: Array<String>) {
    //輸入路徑
    val dst = "hdfs://172.16.134.251:9000/test/input.txt"
    //輸出路徑
    val dstOut = "hdfs://172.16.134.251:9000/test/output4"
    TempMain.run(dst, dstOut)
}

同樣根據之前的例子打包,然後傳到Hadoop伺服器,解壓後執行程式,一下為日誌:

[[email protected] bin]# ./mapdemo
log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hadoop/runfile/mapdemo-1.0-SNAPSHOT/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hadoop/runfile/mapdemo-1.0-SNAPSHOT/lib/mapdemo-1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
create new table:T_Temp_Result
Before Mapper:0, 2014010114After Mapper:2014, 14
Before Mapper:11, 2014010216After Mapper:2014, 16
Before Mapper:22, 2014010317After Mapper:2014, 17
Before Mapper:33, 2014010410After Mapper:2014, 10
Before Mapper:44, 2014010506After Mapper:2014, 6
Before Mapper:55, 2012010609After Mapper:2012, 9
Before Mapper:66, 2012010732After Mapper:2012, 32
Before Mapper:77, 2012010812After Mapper:2012, 12
Before Mapper:88, 2012010919After Mapper:2012, 19
Before Mapper:99, 2012011023After Mapper:2012, 23
Before Mapper:110, 2001010116After Mapper:2001, 16
Before Mapper:121, 2001010212After Mapper:2001, 12
Before Mapper:132, 2001010310After Mapper:2001, 10
Before Mapper:143, 2001010411After Mapper:2001, 11
Before Mapper:154, 2001010529After Mapper:2001, 29
Before Mapper:165, 2013010619After Mapper:2013, 19
Before Mapper:176, 2013010722After Mapper:2013, 22
Before Mapper:187, 2013010812After Mapper:2013, 12
Before Mapper:198, 2013010929After Mapper:2013, 29
Before Mapper:209, 2013011023After Mapper:2013, 23
Before Mapper:220, 2008010105After Mapper:2008, 5
Before Mapper:231, 2008010216After Mapper:2008, 16
Before Mapper:242, 2008010337After Mapper:2008, 37
Before Mapper:253, 2008010414After Mapper:2008, 14
Before Mapper:264, 2008010516After Mapper:2008, 16
Before Mapper:275, 2007010619After Mapper:2007, 19
Before Mapper:286, 2007010712After Mapper:2007, 12
Before Mapper:297, 2007010812After Mapper:2007, 12
Before Mapper:308, 2007010999After Mapper:2007, 99
Before Mapper:319, 2007011023After Mapper:2007, 23
Before Mapper:330, 2010010114After Mapper:2010, 14
Before Mapper:341, 2010010216After Mapper:2010, 16
Before Mapper:352, 2010010317After Mapper:2010, 17
Before Mapper:363, 2010010410After Mapper:2010, 10
Before Mapper:374, 2010010506After Mapper:2010, 6
Before Mapper:385, 2015010649After Mapper:2015, 49
Before Mapper:396, 2015010722After Mapper:2015, 22
Before Mapper:407, 2015010812After Mapper:2015, 12
Before Mapper:418, 2015010999After Mapper:2015, 99
Before Mapper:429, 2015011023After Mapper:2015, 23
Before Reduce:2012, 19, 12, 32, 9, 23, 
After Reduce:2012, 32
Before Reduce:2015, 49, 22, 12, 99, 23, 
After Reduce:2015, 99
Before Reduce:2001, 29, 11, 10, 12, 16, 
After Reduce:2001, 29
Before Reduce:2007, 19, 12, 99, 23, 12, 
After Reduce:2007, 99
Before Reduce:2010, 14, 16, 17, 10, 6, 
After Reduce:2010, 17
Before Reduce:2013, 23, 29, 12, 22, 19, 
After Reduce:2013, 29
Before Reduce:2008, 5, 16, 37, 14, 16, 
After Reduce:2008, 37
Before Reduce:2014, 6, 10, 17, 16, 14, 
After Reduce:2014, 17
Finished

在伺服器中。使用以下命令開啟HBase的命令列:

hbase shell

使用命令:

scan TempResult

可以檢視到我們原本寫在hdfs中的結果已經存入到了HBase當中了。

以上

相關推薦

MapReduceHDFS讀取資料計算寫入HBase

基於上個例子。做一下簡單的改造。在原本的例子中,從HDFS中讀取資料計算之後再寫會HDFS裡,現在講Reducer類改造一下,把計算後的資料。寫入到HBase當中,寫完之後我們會使用HBase的命令查詢一下寫入資料。開啟原有的Reducer類,程式碼如下:import org

sparkmysql讀取資料(redis/mongdb/hbase等類似換成各自RDD即可)

package com.ws.jdbc import java.sql.DriverManager import org.apache.spark.rdd.JdbcRDD import org.apache.spark.{SparkConf, SparkCont

Java呼叫R 資料庫讀取資料用R根據讀取資料生成圖片儲存

Java呼叫R 以及用R生成的圖片儲存 使用 png()可以保證圖片質量 有人說繪製的圖片上有中文會出現亂碼,其實設定下字型就好了 family=‘STXihei’ Java呼叫R 以及用R生成的圖片儲存 Rserve的配置和啟動: ①安裝Rserve包。

HBase建表高階屬性hbase應用案例看行鍵設計HBasemapreduce結合Hbase讀取資料、分析寫入hdfshdfs讀取資料寫入Hbase協處理器和二級索引

1. Hbase高階應用 1.1建表高階屬性 下面幾個shell 命令在hbase操作中可以起到很到的作用,且主要體現在建表的過程中,看下面幾個create 屬性 1、 BLOOMFILTER 預設是NONE 是否使用布隆過慮及使用何種方式 布隆

第一篇 部落格:java資料庫讀取資料寫入到excel表格

  今天,組長分配了查詢資料庫記錄,並把這些記錄寫入到excel表格中,以前沒有嘗試過,借鑑了一些別人的程式碼,最終實現了功能,寫一篇部落格,總結一下這個過程。1.準備需要用到的jar包    1.jxl.jar     2.mysql-connector-java-5.1.

FlumeKafka讀取資料寫入Hdfs

需求:kafka有五個主題  topic topic-app-startuptopic topic-app-errortopic topic-app-eventtopic topic-app-usagetopic topic-app-pageflume讀取Kafka 5個主題

SQL與eclipse的連線資料庫讀取資料將二維陣列資料匯入表

示例: import java.util.List; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; impor

資料庫讀取原始資料插入新表對處理原始資料的原則總結

在讀取原始資料的時候會有可能屬性名的名字與要建立的表的名字不符,這個時候就要為讀取到的資料重新命名屬性名。 如果資料中存在中文,還要宣告資料庫的編碼。 在原始表中可能會有重複資料,需要事先將重複資料進行刪除,然後再做其他處理。 在設定主鍵的時候會發現有些資料的主鍵相同,但是其他屬性值不同,需要對已經插入

簡單實現將介面返回的資料寫入文字文字讀取引數實現介面引數自動化

import requestsimport jsonimport datetimeimport re# 需要測試的環境api_host = "192.168.10.XX:XXXX"#發貨寶登入 15023621999headers_null = { 'Content-Type' : 'applicati

oracle db某一種資料字典pct miss的計算公式

本文為原創文章,轉載請註明出處: 本文提到的資料字典,以dc_histogram_defs為例子來說明dc_histogram_defs的計算公式。 dc_histogram_defs這個依據字面的猜測,應該是直方圖的definitions 下圖中使用的db,是orac

flume本地讀取資料錄入到hdfs檔案系統

配置檔案 agent.sources = origin   agent.channels = memorychannel   agent.sinks = target      agent.sources.origin.type = TAILDIR agent.source

STM32操作訪問flash包括寫入資料到flash和flash讀取資料

STM32中儲存區分為:隨機存取儲存器RAM和只讀儲存器ROM。 其中: RAM為常說的記憶體,比如手機的2G記憶體4G記憶體等,就是程式跑起來的時候所佔用的儲存空間,特點是掉電資料丟失。 ROM為常說的硬碟,比如手機的64G和128G等,可以簡單的理解為

用c# 採用BIT-map 方式一堆資料找出不重複的

採用的是bit-map演算法,關於什麼是bit-map,百度一下; 直接上一段程式碼,用C#改寫的! 如下:  class Program     {         private static int[] flags = new int[1000000];    

Java控制檯讀取資料輸入流System.in的幾種方法效率比較

這是一道PTA上關於複雜度演算法的題目——最大子列和問題,程式簡單。 我分別用 1)構造Scanner物件:Scanner in = new Scanner(System.in); 2)構造BufferedReader物件:BufferedReader br = new B

shellredis讀取資料

這裡介紹在shell中,一種讀取redis值方式,程式碼如下: #!/usr/bin/env bash redis-cli -h 127.0.0.1 -p 6379 -a 123456 -n 1 k

springMVC+webUploader後臺讀取資料回顯圖片,超過三張不再新增

<script type="text/javascript">$(function(){var data = ${photoList};if(data){for(var i in data){$('#fileList').append('<li id="K

resource的raw資料獲取檔案並讀取資料(資原始檔只能讀不能寫)

轉載:http://blog.sina.com.cn/s/blog_4d25c9870100qpax.html 一、 從resource中的raw資料夾中獲取檔案並讀取資料(資原始檔只能讀不能寫) String res = ""; try{ InputStre

Tensorflow使用tfrecord佇列方式讀取資料

標準TensorFlow格式       有一種儲存記錄的方法可以允許你講任意的資料轉換為TensorFlow所支援的格式, 這種方法可以使TensorFlow的資料集更容易與網路應用架構相匹配。這種建議的方法就是使用TFRecords檔案,TFRecords檔案包含了tf.

將控制臺輸入的資料存到文檔並按照存入資料的年齡進行排序

file 實現 冒泡 控制 eno rabl trace gen 是否 package LX10_11; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File;

tensorflow--檔案讀取資料

讀取資料-csv tensorflow讀取資料流程 構造檔案佇列 讀取佇列內容 reader = tf.TextLineReader() 解析成一個樣本資料 example, label = tf.decode_csv(value, record_defaults