1. 程式人生 > >使Apache Spark和Mysql作資料分析

使Apache Spark和Mysql作資料分析

使用用spart-shell讀取MySQL表中的資料

步驟1: 執行spark-shell命令,進入spark-shell命令列,執行命令如下:

[email protected]:~/run/spark/bin$ ./spark-shell --master spark://ubuntu1:7077 --jars /home/bigdata/run/spark/mysql-connector-java-5.1.30-bin.jar

執行結果如下:
[email protected]:~/run/spark/bin$ ./spark-shell --master spark://ubuntu1:7077 --jars /home/bigdata/run/spark/mysql-connector-java-5.1.30-bin.jar
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/05/08 01:40:28 WARN spark.SparkConf: The configuration key 'spark.history.updateInterval' has been deprecated as of Spark 1.3 and may be removed in the future. Please use the new key 'spark.history.fs.update.interval' instead.
17/05/08 01:40:46 WARN spark.SparkConf: The configuration key 'spark.history.updateInterval' has been deprecated as of Spark 1.3 and may be removed in the future. Please use the new key 'spark.history.fs.update.interval' instead.
17/05/08 01:40:46 WARN spark.SparkConf: The configuration key 'spark.history.updateInterval' has been deprecated as of Spark 1.3 and may be removed in the future. Please use the new key 'spark.history.fs.update.interval' instead.
17/05/08 01:40:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/08 01:40:57 WARN spark.SparkConf: The configuration key 'spark.history.updateInterval' has been deprecated as of Spark 1.3 and may be removed in the future. Please use the new key 'spark.history.fs.update.interval' instead.
17/05/08 01:41:01 WARN DataNucleus.General: Plugin (Bundle) "org.datanucleus.api.jdo" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/home/bigdata/run/spark/jars/datanucleus-api-jdo-3.2.6.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/home/bigdata/run/spark-2.1.0-bin-hadoop2.7/jars/datanucleus-api-jdo-3.2.6.jar."
17/05/08 01:41:01 WARN DataNucleus.General: Plugin (Bundle) "org.datanucleus.store.rdbms" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/home/bigdata/run/spark-2.1.0-bin-hadoop2.7/jars/datanucleus-rdbms-3.2.9.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/home/bigdata/run/spark/jars/datanucleus-rdbms-3.2.9.jar."
17/05/08 01:41:01 WARN DataNucleus.General: Plugin (Bundle) "org.datanucleus" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/home/bigdata/run/spark-2.1.0-bin-hadoop2.7/jars/datanucleus-core-3.2.10.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/home/bigdata/run/spark/jars/datanucleus-core-3.2.10.jar."
17/05/08 01:41:10 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://10.3.19.171:4040
Spark context available as 'sc' (master = spark://ubuntu1:7077, app id = app-20170508014050-0004).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_25)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

步驟2: 建立變數sqlContext

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = [email protected]

步驟3:從Mysql中載入資料
scala> val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://127.0.0.1:3306/mydatabase").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "mytable").option("user", "myname").option("password", "mypassword").load()
dataframe_mysql: org.apache.spark.sql.DataFrame = [id: string, grouptype: int ... 16 more fields]

步驟4:顯示dataframe中的資料
scala> dataframe_mysql.show
+---+---------+-------+---------+------+------+---+--------------------+---+-----------+-----+----+--------------------+--------------------+-----+------+----------+---+
| id|grouptype|groupid|loginname|  name|   pwd|sex|            birthday|tel|mobilephone|email|isOk|       lastLoginTime|             addtime|intro|credit|experience|img|
+---+---------+-------+---------+------+------+---+--------------------+---+-----------+-----+----+--------------------+--------------------+-----+------+----------+---+
|  1|        1|      1|    admin| admin| admin|  1|2016-05-05 14:51:...|  1|          1|    1|   1|2016-05-10 14:52:...|2016-05-08 14:52:...|    1|     1|         1|  1|
|  2|        2|      2|   wanghb|wanghb|wanghb|  2|2016-05-10 14:56:...|  2|          2|    2|   2|2016-05-11 14:57:...|2016-05-10 14:57:...|    2|     2|        22|  2|
+---+---------+-------+---------+------+------+---+--------------------+---+-----------+-----+----+--------------------+--------------------+-----+------+----------+---+

步驟5:為了後續查詢,將dataframe中的資料註冊為一個臨時表

scala> dataframe_mysql.registerTempTable("tmp_tablename")
warning: there was one deprecation warning; re-run with -deprecation for details

步驟6:現在可以從臨時表"tmp_tablename"中查詢資料
scala> dataframe_mysql.sqlContext.sql("select * from tmp_tablename").collect.foreach(println)
[1,1,1,admin,admin,admin,1,2016-05-05 14:51:58.0,1,1,1,1,2016-05-10 14:52:07.0,2016-05-08 14:52:12.0,1,1,1,1]
[2,2,2,wanghb,wanghb,wanghb,2,2016-05-10 14:56:58.0,2,2,2,2,2016-05-11 14:57:05.0,2016-05-10 14:57:08.0,2,2,22,2]


通過Spark將資料寫入MySQL