1. 程式人生 > >spark從mongodb匯入資料到hive

spark從mongodb匯入資料到hive

1、首先新增mongo-spark依賴,官網地址 https://docs.mongodb.com/spark-connector/

       <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.10</artifactId>
            <version>1.1.0</version>
        </dependency>

2、程式碼

object Mongo2Hive {

  def MongodbToHive(args: Array[String], sc: SparkContext): Unit = {

    val hiveContext = new HiveContext(sc)

    val Array(schemaFilePath, queryCondition): Array[String] = args

    val schemaFile = sc.textFile(schemaFilePath).collect()

    val Array(schemaStruct, requireFields, tempTableName, sql, mongodbConfig) = schemaFile

    //MongoDB Config
    val json = JSON.parseObject(mongodbConfig)

    //mongodb host
    val hostList = json.getString("host")
    val dataBase = json.getString("database")
    val collection = json.getString("collection")

    import com.mongodb.spark._
    import com.mongodb.spark.config._

//    val df = MongoSpark.load(hiveContext, ReadConfig(Map("uri" -> s"mongodb://$hostList/$dataBase.$collection", "partitioner" -> "MongoSplitVectorPartitioner", "partitionKey" -> "updateTime")))
    val rdd = sc.loadFromMongoDB(ReadConfig(Map("uri" -> s"mongodb://$hostList/$dataBase.$collection", "partitioner" -> "MongoSplitVectorPartitioner", "partitionKey" -> "updateTime"))) // Uses the ReadConfig
    //查詢新增可以在配置檔案中配置
    val aggregatedRdd = rdd.withPipeline(Seq(Document.parse("{ $match: { readNum : { $eq: 35} } }")))

    //Require Column
    val requireFieldArrays = requireFields.split(",")

    val schema = DataType.fromJson(schemaStruct).asInstanceOf[StructType]
    val rowRdd = aggregatedRdd.toDF(schema).rdd

    // DataFrame Temp Table
    hiveContext.createDataFrame(rowRdd, schema).registerTempTable(tempTableName)

    //HiveContext 將臨時表匯入到Hive   Hive SQL
    hiveContext.sql("set hive.exec.dynamic.partition=true;")
    hiveContext.sql("set hive.exec.dynamic.partition.mode=nostrick;")
    hiveContext.sql(sql)
    sc.stop()
  }

3、資料載入配置檔案格式

{"type":"struct","fields":[{"name":"_id","type":"string","nullable":true,"metadata":{}},{"name":"addTime","type":"long","nullable":true,"metadata":{}},{"name":"articleUrl","type":"string","nullable":true,"metadata":{}},{"name":"author","type":"string","nullable":true,"metadata":{}},{"name":"position","type":"long","nullable":true,"metadata":{}},{"name":"content","type":"string","nullable":true,"metadata":{}},{"name":"coverPlan","type":"string","nullable":true,"metadata":{}},{"name":"dynamics","type":{"type":"array","elementType":{"type":"struct","fields":[{"name":"addTime","type":"long","nullable":true,"metadata":{}},{"name":"readNum","type":"long","nullable":true,"metadata":{}},{"name":"likeNum","type":"long","nullable":true,"metadata":{}}]},"containsNull":true},"nullable":true,"metadata":{}},{"name":"readNum","type":"long","nullable":true,"metadata":{}},{"name":"isOriginal","type":"long","nullable":true,"metadata":{}},{"name":"likeNum","type":"long","nullable":true,"metadata":{}},{"name":"pmid","type":"string","nullable":true,"metadata":{}},{"name":"postTime","type":"long","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}},{"name":"hotToken","type":{"type":"array","elementType":{"type":"struct","fields":[{"name":"score","type":"string","nullable":true,"metadata":{}},{"name":"token","type":"string","nullable":true,"metadata":{}}]},"containsNull":true},"nullable":true,"metadata":{}},{"name":"updateTime","type":"long","nullable":true,"metadata":{}}]}
pmid,articleUrl,readNum,likeNum,isOriginal,position,postTime,updateTime,hotToken,title,coverPlan,content
hiveTempTable
insert overwrite table gaia.wxArticleTest partition(time_date) select pmid,articleUrl,readNum,likeNum,isOriginal,position,postTime,updateTime,hotToken,title,coverPlan,content,from_unixtime(substring(updateTime,0,10),'yyyyMMddHH') time_date from hiveTempTable where readNum is not null and isOriginal is not null
{"host":"127.0.0.1:27017","database":"test","collection":"wxArticle"}


相關推薦

sparkmongodb匯入資料到hive

1、首先新增mongo-spark依賴,官網地址 https://docs.mongodb.com/spark-connector/ <dependency> <groupId>org.mongodb.spar

MongoDB抽取資料匯入mysql

# -*- coding: utf-8 -*- from pymongo import MongoClient import io import traceback import sys reload(sys) sys.setdefaultencoding('u

Oracle 使用SQL Loader 外部匯入資料

在專案中經常會有一些基礎資料需要從Excel或其他檔案中匯入。大部分的格式都是樹結構。如果是這樣,我們對資料稍加整理,即可使用Oracle的資料匯入工具SQL Loader匯入我們所需要的資料到指定的表中。SQL Loader的詳細用法,可自己查詢相關詳細的文件,這裡只做簡單的使用介紹。 1

MongoDB匯入資料資料夾(包括bson和json檔案)報錯

MongoDB匯入資料報錯 很多部落格都說在linux下 使用 mongorestore -d db_name 資料夾目錄 就可以匯入資料夾中的內容 記錄一個傻瓜錯誤: mongorestore是一個獨立可執行程式 這個命令不能放在mongo shell裡執行 應該

sparkmysql讀取資料(redis/mongdb/hbase等類似,換成各自RDD即可)

package com.ws.jdbc import java.sql.DriverManager import org.apache.spark.rdd.JdbcRDD import org.apache.spark.{SparkConf, SparkCont

學習筆記:0開始學習大資料-28. solr儲存資料在hdfs並mysql匯入資料

環境 centos7  hadoop2.6.0  solr-7.5.0 一、建立hdfs為儲存的core 1.在hdfs建立索引資料目錄 [[email protected] bin]# hadoop fs -mkdir /user/solr/ [[email&

Spark---WC---Spark外部讀取資料之textFile

測試資料 hello spark hello hadoop csdn hadoop csdn csdn hello world 結果 (spark,1) (hadoop,2) (csdn,3) (hello,3) (world,1) import org.

一個小例子集合xlrd,matplotlib,numpy,scipy使用方法(Excel匯入資料

最近因為一篇論文的原因,要從Excel中取得部分資料平作圖,但是得到的圖都是點陣圖,不是太好插入到論文中,因此,決定使用Python畫圖來解決此問題(不使用MATLAB的原因在於它畫出的圖是在是不好看呀) 首先使用的庫是xlrd庫,此庫的作用是從讀取Exc

解決sqoopmysql匯入hive表的多分割槽問題

參考:http://blog.csdn.net/liweiwei71/article/details/23434189 對於分割槽表 drop table track_log; create table track_log ( id                    

Solr資料庫匯入資料

一. 資料匯入(DataImportHandler-DIH) DIH 是solr 提供的一種針對資料庫、xml/HTTP、富文字物件匯入到solr 索引庫的工具包。這裡只針對資料庫做介紹。 A、準備以下jar包 apache-solr-dataimporthandl

C++Excel匯入資料

前三步和”C++將資料匯出到Excel “一樣。 第四步中程式碼不一樣,如下: int pathleng; char charpath[200]; GetCurrentDirectory(512,(LPSTR)charpath);

MongoDB匯入資料到HDFS方法3

1.背景 公司希望使用MongoDB作為後端業務資料庫,使用Hadoop平臺作為資料平臺。最開始是先把資料從MongoDB匯出來,然後傳到HDFS,然後用Hive/MR處理。我感覺這也太麻煩了,現在不可能沒有人想到這個問題,於是就搜了一下,結果真找到一個Mo

sqoop使用,hive向oracle匯入資料

author: lf sqoop export 使用說明 --export-dir 和 (--table和--call中的一個)是必須的。 指定了將要填充的表(或將要呼叫的儲存過程),以及hdfs包含資源資料的目錄 --columns 預設將查詢出表中所有的欄位。通過

mysql將資料匯入hive

[[email protected] ~]$ sqoop import --connect jdbc:mysql://Hadoop48/toplists --verbose -m 1 --username root --hive-overwrite --direct --table award --

sqooporacle資料庫抽取資料,匯入hive

環境: hadoop-2.7.5 sqoop-1.4.7 zookeeper-3.4.10 hive-2.3.3 (使用mysql配置元資料庫) jdk1.8.0_151 oracle 11.2.0.3.0 經過一番baidu,總算初步成功,現在記錄一下中間過程. 1.拷貝hive/conf/

hive Excel中匯入資料

拿到Excel表後將資料保留,其他的亂七八糟都刪掉,然後另存為txt格式的文字,用nodepad++將文字轉換為UTF-8編碼,此處命名為cityprovince.txt 將cityprovince.txt傳入操作的Linux環境中 hive建表,注

程式碼 | Spark讀取mongoDB資料寫入Hive普通表和分割槽表

版本: spark 2.2.0  hive 1.1.0  scala 2.11.8  hadoop-2.6.0-cdh5.7.0  jdk 1.8  MongoDB 3.6.4 一 原始資料及Hive表  MongoDB資

MongoDB資料匯入到HDFS上的Hive中記錄

需求 公司以前的舊資料存放在伺服器上面的MongoDB上,現在要使用這些資料進行大資料分析處理,那麼就出現了MongoDB的資料匯入到HDFS上的Hive資料表中的需求.現在寫下該部落格Mark一下! 實現步驟 1.下載jar檔案: 2.

scoop匯入資料mysql到hive

mysql裡的表格式: desc track_log18;CREATE TABLE `track_log18` (   `id` varchar(1000) DEFAULT NULL,   `url` varchar(5000) DEFAULT NULL,   `refer

Navicat匯出oracle建表語句,應用sqoop將oracle中的表資料匯入hive

                            ORACLE庫的操作首先選擇navicat作為這個檔案的工具,作為資料庫表結構的匯入和匯出工具。  匯出的時候 會連結構和資料都匯出來,會形成一個sql檔案  也可以只倒檢視或者表都可以的。但是以匯出的這個檔案匯入的時候