spark+phoenix 通過jdbc讀取表中的資料
阿新 • • 發佈:2019-01-29
廢話不說,直接程式碼,解決燃煤之急
新增maven配置 |
<dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-core</artifactId> <version>${phoenix.version}</version> </dependency> <dependency> <groupId>org.apache.phoenix</groupId> <artifactId> |
spark讀取phoenix中的資料 |
package com.cctsoft.spark.offline; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; /** * Created with IntelliJ IDEA. * User: Kevin Liu * CreateDate: 2018/6/8 10:32 * Description: 讀取phoenix表中資料 */ public class FaceCrashImsiJob { public static void main(String[] args) throws AnalysisException { // $example on:init_session$ SparkSession spark = SparkSession .builder() .appName("Java Spark SQL basic example") .config("spark.some.config.option", "some-value") .master("local") .getOrCreate(); // $example off:init_session$ runBasicDataFrameExample(spark); spark.stop(); } private static void runBasicDataFrameExample(SparkSession spark) throws AnalysisException { /** * 獲取4G資料 * String tableName= "(select * from LTE_DATA where to_char(cap_time) >= '"+imsiDataStartTime+"' and to_char(cap_time) < '"+imsiDataEndTime+"') as LTE_DATA_FILTER"; */ String imsiTableName = "(select * from LTE_DATA where to_char(cap_time) >= '"+imsiDataStartTime+"' and to_char(cap_time) < '"+imsiDataEndTime+"') as LTE_DATA_FILTER"; logger.info("imsiTableName:"+imsiTableName); Dataset<Row> df = spark.read().format("jdbc") .option("driver","org.apache.phoenix.jdbc.PhoenixDriver") .option("url","jdbc:phoenix:"+zookeeper+":2181") .option("dbtable",imsiTableName) .load(); df.registerTempTable("lte_data_tmp"); Dataset<Row> lteDataAll = spark.sql("select lte_dev_code,cap_time,imsi from lte_data_tmp order by cap_time desc"); lteDataAll.show(); } } |
通過spark寫入資料到phoenix |
package com.cctsoft.spark.offline import org.apache.spark.api.java.JavaRDD import org.apache.spark.sql.Row /** * Created with IntelliJ IDEA. * User: Kevin Liu * CreateDate: 2018/6/15 12:32 * Description: 寫入phoenix表資料 */ object TestMain { def main(args: Array[String]): Unit |