1. 程式人生 > >Spark-數據源及機器學習算法部署

Spark-數據源及機器學習算法部署

gist user roc 加載 機器學習 mongo country 第三方 string

1、數據源讀取

使用的時候,需要加載驅動 --jars 或者添加到classpath中 或scaddjar

Spark對Oracle數據庫讀取,代碼如下:

conf = SparkConf().setAppName(string_test)
sc = SparkContext(conf=conf)
ctx = SQLContext(sc)
sqltext = "(select dbms_lob.substr(title,500) as title,id,content,country,languages,time as publishDate,source,subject,source_url from news t where id <= 24) news"
news =ctx.read \
.format("jdbc") \
.option("url", "jdbc:oracle:thin:username/[email protected]//ip:port/sid") \
.option("dbtable", sql) \
.option("user", "user") \
.option("password", "password") \
.option("driver", "oracle.jdbc.driver.OracleDriver") \
.load()

news.registerTempTable("news")

Spark 對Mongo讀數據

ctx = SQLContext(sc)
mongourl = "mongodb://username:[email protected]:port"
mongoDB = "dbname"
mongoCollection = "collectionName"
mongoRows = ctx.read.format("com.mongodb.spark.sql").options(uri=mongourl,database=mongoDB, collection=mongoCollection).load()
mongoResultRdd = mongoRows.rdd

2、機器學習算法轉換

機器學習算法有兩類不能直接添加到spark中:

1) 包中含有復雜依賴關系的,如scipy、numpy等,scipy.special.beta函數在spark中不可以使用的。

2) 包不是.py結尾的,而是有第三方編譯包的,不可以添加到spark中

解決辦法:

在spark改寫的代碼中使用到上述相關的程序,闊以用subprocess調用python程序,以進行數據處理,然後得到程序返回結果。如下:

test= subprocess.getoutput("python /home/pytest.py \""+content.replace("\‘","’")+"\"")
re= test[test.index("::")+2:len(test)].replace(" ","")

Spark-數據源及機器學習算法部署