1. 程式人生 > >spark+phoenix 通過jdbc讀取表中的資料

spark+phoenix 通過jdbc讀取表中的資料

廢話不說,直接程式碼,解決燃煤之急

新增maven配置
<dependency>
   <groupId>org.apache.phoenix</groupId>
   <artifactId>phoenix-core</artifactId>
   <version>${phoenix.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.phoenix</groupId>
  <artifactId>
phoenix-spark</artifactId> <version>${phoenix.version}</version> </dependency>
spark讀取phoenix中的資料
package com.cctsoft.spark.offline;

import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

/**
 * Created with IntelliJ IDEA.
 * User: Kevin Liu
 * CreateDate: 2018/6/8 10:32
 * Description: 讀取phoenix表中資料
 */
public class FaceCrashImsiJob {
    public static void main(String[] args)  throws AnalysisException {
        // $example on:init_session$
        SparkSession spark = SparkSession
                .builder()
                .appName("Java Spark SQL basic example")
                .config("spark.some.config.option", "some-value")
                .master("local")
                .getOrCreate();
        // $example off:init_session$

        runBasicDataFrameExample(spark);


        spark.stop();
    }


    private static void runBasicDataFrameExample(SparkSession spark) throws AnalysisException {

       

    /**
     * 獲取4G資料
     * String tableName= "(select * from LTE_DATA where to_char(cap_time) >= '"+imsiDataStartTime+"' and to_char(cap_time) < '"+imsiDataEndTime+"') as LTE_DATA_FILTER";
     */
    String imsiTableName = "(select * from LTE_DATA where to_char(cap_time) >= '"+imsiDataStartTime+"' and to_char(cap_time) < '"+imsiDataEndTime+"') as LTE_DATA_FILTER";
    logger.info("imsiTableName:"+imsiTableName);
    Dataset<Row> df = spark.read().format("jdbc")
            .option("driver","org.apache.phoenix.jdbc.PhoenixDriver")
            .option("url","jdbc:phoenix:"+zookeeper+":2181")
            .option("dbtable",imsiTableName)
            .load();
    df.registerTempTable("lte_data_tmp");
    Dataset<Row> lteDataAll = spark.sql("select lte_dev_code,cap_time,imsi from lte_data_tmp order by cap_time desc");
    lteDataAll.show();



    }




}
通過spark寫入資料到phoenix
package com.cctsoft.spark.offline
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.sql.Row

/**
  * Created with IntelliJ IDEA.
  * User: Kevin Liu
  * CreateDate: 2018/6/15 12:32
  * Description: 寫入phoenix表資料
  */
object TestMain {
  def main(args: Array[String]): Unit 
= { batchSaveFaceImsi(null) } def batchSaveFaceImsi(imsiRdd: JavaRDD[Row]): Unit ={ import org.apache.phoenix.spark._ val rdd = imsiRdd.rdd.map(x=>{ (x.get(0).toString+":"+x.get(1).toString,x.get(0).toString,x.get(1).toString) }).filter(f=>f._3.toString!=null && f._3.toString!="") rdd.foreach(println) rdd.saveToPhoenix( "RESIDENT_TMP", Seq("ID","DEVICE_MAC","IMSI"), zkUrl = Some("jdbc:phoenix:abigdataclient1:2181") ) } }