1. 程式人生 > >SparkSQL實現查詢Hive表集合中的多個元素匹配

SparkSQL實現查詢Hive表集合中的多個元素匹配

#encodig=utf-8
# _*_ coding:utf-8 _*_
# Writer : byz
# dateTime : 2016-08-3

import sys
sys.path.append("/home/mysql1/anqu/python/code")
reload(sys)
import config
sys.setdefaultencoding('utf8') 

from pyspark import SparkContext, SparkConf
SparkAppName = "Anqu_serchAPP資料處理"
conf = SparkConf().setAppName(SparkAppName).setMaster("local"
) #master = "192.168.40.128:7077" sc = SparkContext(conf=conf) from pyspark.sql import HiveContext sqlContext = HiveContext(sc) from pyspark.sql.functions import * # from selectWord import selectWord # 資料處理 class sparkDF_searchAPP(): #初始化 def __init__(self, database = "mysql_anqu_chi"): self.sqlContext = sqlContext self.sqlContext.sql("use "
+ database) def in_genre_select_app(self, genres, table = "ansearchapp"): #self.sqlContext.sql("use mysql_anqu_chi") # sdf.select(sdf['genres'], sdf['searchapp']).filter(sdf.genres == 6017).take(5) result = None num = 0 for g in genres: if
g: res = sqlContext.sql("select genres, word, priority, searchapp, searchcount, genre, type from ansearchapp LATERAL VIEW OUTER explode(genre) s AS genres where genres = %d " %g) if result: # print "g---",g result = result.unionAll(res) # result.show() # print "if---" else: result = res # print "else--" else: pass return result def in_searchapp_select_app(self, searchapp, table = "ansearchapp"): #self.sqlContext.sql("use mysql_anqu_chi") #sdf = sqlContext.sql("select app, word, priority, searchapp, searchcount, genre, type from ansearchapp LATERAL VIEW OUTER explode(searchapp) s AS app ") # sdf.first() # sdf.select(sdf['genres'], sdf['searchapp']).filter(sdf.genres == 6017).take(5) result = None num = 0 for g in searchapp: if g: res = sqlContext.sql("select app, word, priority, searchapp, searchcount, genre, type from ansearchapp LATERAL VIEW OUTER explode(searchapp) s AS app where app = %d " %g) if result: # print "g---",g result = result.unionAll(res) # print "if---" else: result = res # result.show() # print "else--" else: pass return result def main(): ss = sparkDF_searchAPP() ss.__init__() #------------------input genres: gen = [6017, 6002] result_gen = ss.in_genre_select_app(gen) # output:(genres, word, priority, searchapp, searchcount, genre, type) print result_gen.show() #------------------input searchapp: app = [1124330238,1128622678] result_app = ss.in_searchapp_select_app(app) # output:(app, word, priority, searchapp, searchcount, genre, type) aa = result_app.select(result_app['searchapp']) print "aa---",aa.show() if __name__ == '__main__': main()