1. 程式人生 > >spark sql 將資料匯入到redis 裡面

spark sql 將資料匯入到redis 裡面

#coding=utf-8
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql import Row
import sys
from decimal import *
from rediscluster import StrictRedisCluster
reload(sys)
sys.setdefaultencoding( "utf-8" )

sql_dict={
    "0":"""SELECT
	b.equip_id,
	a.`code`,
	a.equip_type_id,
	b.threshold,
	b.type_index_id,
	b.`condition`,
	a.status
FROM
	t_equip_type_index a,
	t_equip_threshold b
WHERE
	a.id = b.type_index_id
and a.equip_type_id = b.equip_type_id
AND b.`condition` = '0'
AND a.`status` = '101001'
AND b.`status` = '101001'
GROUP BY
	b.equip_id,a.code,threshold
 """,
    "1":"""SELECT
	b.equip_id,
	a.`code`,
	a.equip_type_id,
	min(b.threshold) as threshold,
	b.type_index_id,
	b.`condition`,
	a.status
FROM
	t_equip_type_index a,
	t_equip_threshold b
WHERE
	a.id = b.type_index_id
and a.equip_type_id = b.equip_type_id
AND b.`condition` = '1'
AND a.`status` = '101001'
AND b.`status` = '101001'
GROUP BY
	b.equip_id,a.code
 """,
        "2":"""SELECT
	b.equip_id,
	a.`code`,
	a.equip_type_id,
	min(b.threshold) as threshold,
	b.type_index_id,
	b.`condition`,
	a.status
FROM
	t_equip_type_index a,
	t_equip_threshold b
WHERE
	a.id = b.type_index_id
and a.equip_type_id = b.equip_type_id
AND b.`condition` = '2'
AND a.`status` = '101001'
AND b.`status` = '101001'
GROUP BY
	b.equip_id,a.code
 """,
            "3":"""SELECT
	b.equip_id,
	a.`code`,
	a.equip_type_id,
	max(b.threshold) as threshold,
	b.type_index_id,
	b.`condition`,
	a.status
FROM
	t_equip_type_index a,
	t_equip_threshold b
WHERE
	a.id = b.type_index_id
and a.equip_type_id = b.equip_type_id
AND b.`condition` = '3'
AND a.`status` = '101001'
AND b.`status` = '101001'
GROUP BY
	b.equip_id,a.code
 """,
    "4":"""SELECT
	b.equip_id,
	a.`code`,
	a.equip_type_id,
	max(b.threshold) as threshold ,
	b.type_index_id,
	b.`condition`,
	a.status
FROM
	t_equip_type_index a,
	t_equip_threshold b
WHERE
	a.id = b.type_index_id
and a.equip_type_id = b.equip_type_id
AND b.`condition` = '4'
AND a.`status` = '101001'
AND b.`status` = '101001'
GROUP BY
	b.equip_id,a.code
 """
}

def redis_cluster(key,value):
    redis_nodes =  [{'host':'172.16.11.136','port':6379},
                    {'host':'172.16.11.136','port':6380},
                    {'host':'172.16.11.137','port':6379},
                    {'host':'172.16.11.137','port':6380},
                    {'host':'172.16.11.138','port':6379},
                    {'host':'172.16.11.138','port':6380}
                   ]
    try:
        redisconn = StrictRedisCluster(startup_nodes=redis_nodes)
    except Exception,e:
        print( "Connect Error!")
        sys.exit(1)
    redisconn.set(str(key),str(value),ex=86400)

MYSQL_CONF = {
    'host': '172.16.11.108',
    'user': 'iot',
    'password': '
[email protected]
#1234', 'db': 'test', 'port': 3306 } jdbc_url = 'jdbc:mysql://{0}:{1}/{2}'.format( MYSQL_CONF['host'], MYSQL_CONF['port'], MYSQL_CONF['db'] ) def jdbc_dataset(spark,contain_key,contain_sql): sql=contain_sql jdbcDF = spark.read \ .format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable", "("+sql+") tmp") \ .option("user", MYSQL_CONF['user']) \ .option("password", MYSQL_CONF['password'])\ .option("driver", 'com.mysql.jdbc.Driver')\ .load() dict_value={} for item in jdbcDF.collect(): equip_id=item['equip_id'] code=item['code'] equip_type_id=item['equip_type_id'] threshold=item['threshold'] type_index_id=item['type_index_id'] condition=item['condition'] status=item['status'] key="iotwjj"+"_"+str(equip_id)+"_"+str(code)+"_"+str(condition) if key not in dict_value.keys(): dict_value[key]={} dict_value[key]["equip_type_id"]=equip_type_id dict_value[key]["type_index_id"]=type_index_id dict_value[key]["threshold"]=[str(threshold.quantize(Decimal('0.0000')))] else: if str(threshold.quantize(Decimal('0.0000'))) not in dict_value[key]["threshold"]: dict_value[key]["threshold"].append(str(threshold.quantize(Decimal('0.0000')))) for itemkey in dict_value.keys(): redis_cluster(itemkey,dict_value[itemkey]) if __name__ == "__main__": spark = SparkSession.builder.appName("Python Spark SQL data source example").getOrCreate() #spark.set('spark.cores.max',3) #spark.set('spark.network.timeout',10000000) #spark.set('spark.executor.heartbeatInterval',10000000) #spark.set('spark.memory.fraction',0.75) #spark.set('spark.storage.memoryFraction',0.45) for dict_sql_key in sql_dict: key=dict_sql_key sql=sql_dict.get(dict_sql_key) jdbc_dataset(spark,key,sql) spark.stop() #####提交方式 /usr/local/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --jars /usr/local/iot/pyspark/mysql-connector-java-5.1.41.jar pyspark_sql_mysql_redis.py #####注意叢集需要安裝pyredis https://files.pythonhosted.org/packages/f1/dd/4bb27bb3e3d03a01b0afd4a4ba13a4677b0f2d6552ff2841ac56591bfb29/redis-py-cluster-1.3.5.tar.gz ####### https://files.pythonhosted.org/packages/3b/f6/7a76333cf0b9251ecf49efff635015171843d9b977e4ffcf59f9c4428052/redis-2.10.6-py2.py3-none-any.whl

相關推薦

spark sql 資料匯入redis 裡面

#coding=utf-8 from __future__ import print_function from pyspark.sql import SparkSession from pyspark.sql import Row import sys from decim

Spark SQL資料寫入Mysql表的一些坑

轉自:https://blog.csdn.net/dai451954706/article/details/52840011/  最近,在使用Spark SQL分析一些資料,要求將分析之後的結果資料存入到相應的MySQL表中。     但是將資料處理完了之後,存

通過管道傳輸快速MySQL的資料匯入Redis(自己做過測試)

通過管道傳輸快速將MySQL的資料匯入Redis 通過管道傳輸pipe將MySQL資料批量匯入Redis       自Redis 2.6以上版本起,Redis支援快速大批量匯入資料,即官網的Redis Mass Insertion,即

Spark SQLrdd轉換為資料集-以程式設計方式指定模式(Programmatically Specifying the Schema)

一:解釋 官網:https://spark.apache.org/docs/latest/sql-getting-started.html 這種場景是生活中的常態 When case classes cannot be defined ahead of time (for example

通過spark sql hdfs上文件導入到mongodb

通過 str nts mongod modify 運行 end tar and 功能:通過spark sql 將hdfs 中文件導入到mongdo 所需jar包有:mongo-spark-connector_2.11-2.1.2.jar、mongo-java-driver-

使用POI資料匯入匯出資料庫。

POI將資料匯入匯出資料庫。(採用ssm框架) 1.前臺列表的展示 <script type="text/javascript"> //進行資料的全選 function selectAll(){ var ids=document.get

java後端伺服器讀取excel資料匯入資料庫

使用的是easypoi,官網文件:http://easypoi.mydoc.io/ 1 /** 2 * 匯入Excel檔案 3 */ 4 @PostMapping("/importTeacher") 5 @ResponseBody 6 publi

使用PL SQL資料匯出為Excel格式檔案

使用PL SQL將資料匯出為Excel格式檔案有兩種方法,第一種是先將查詢結果匯出為CSV檔案,然後再轉為Excel檔案;第二種是選中要匯出的查詢結果,右鍵,選擇複製到xls,即可。 兩種方法各有優勢: 第一種方法適用於匯出資料量特別大,如超過140多萬行資料,因為excel表格有最大行數限

Shell_mysql命令以及資料匯入Mysql資料庫

連線MYSQL資料庫 mysql -h${db_ip} -u${db_user} -p${db_pawd} -P${db_port} -D${db_name} -s -e "${sql}"   db_ip:主機地址   db_user :資料庫使用者名稱   db_pwd:密碼   db

oracle pl/sql 資料寫入Csv檔案 且以附件的形式傳送郵件

內容介紹 這篇文章將介紹,oracle 中如何將資料庫中查找出來的資料寫入csv 檔案,且將這個csv 檔案 ,用郵件以附件的形式傳送出去。如果你也想實現這個功能,請參考以下程式碼。如果想用sqlplus 方式實現,請參考我的另外一篇文章:sqlplus spool 生成csv檔案,且用

Revit二次開發_資料匯入Excel

有時需要將Revit模型的一些資訊提取到Excel中進行處理或者是作為記錄進行儲存,但也許是因為Revit的資料結構相對複雜,並不支援直接將資料匯出Excel,所以平時通過二次開發將資訊輸出到Excel中。 常使用的輸出方法有三個,分別是com元件;NPOI庫;Epplus

如何使用Spark快速資料寫入Elasticsearch

如何使用Spark快速將資料寫入Elasticsearch 說到資料寫入Elasticsearch,最先想到的肯定是Logstash。Logstash因為其簡單上手、可擴充套件、可伸縮等優點被廣大使用者接受。但是尺有所短,寸有所長,Logstash肯定也有它無法適用的應用場景,比如:

Spark SQL資料處理並寫入Elasticsearch

1 # coding: utf-8 2 import sys 3 import os 4 5 pre_current_dir = os.path.dirname(os.getcwd()) 6 sys.path.append(pre_current_dir) 7 from pyspark.sq

oracle通過load data 資料匯入表中通過儲存過程進行批量處理

說明:雖然沒圖,但文字表述很清楚,自己做過的專案留著備用(這只是初版,比較繁瑣,但很明確) 準備工作做完之後,後期可直接使用。如後期excel資料有變更,只需改動對應的部分即可,不涉及改動的可直接使用。 實際操作步驟 依照excel資料模版格式準備好建表語句,將中間過渡

從mysql資料匯入hive

[[email protected] ~]$ sqoop import --connect jdbc:mysql://Hadoop48/toplists --verbose -m 1 --username root --hive-overwrite --direct --table award --

flume資料匯入到hbase中

安裝flume: [[email protected] ~]$ tar -zxvf flume-ng-1.6.0-cdh5.5.2.tar.gz 修改 flume-env.sh 配置檔案,主要是JAVA_HOME變數設定 [[email protect

模板word中的特定欄位替換(資料匯入word中)

一、 將模板word中的特定欄位替換(將資料匯入word中) 所用jar包 一、 將模板word中的特定欄位替換(將資料匯入word中) 所用jar包 開發程式碼 /** * @Title createContract * @description 生成合

Oracle資料匯入excel

開啟oracle-->登陸-->檔案-->新建-->sql視窗編寫sql語句,查詢出想要匯出來的數值然後在查詢出來資料的地方,不要選中資料-->右鍵點選複製到excel然後

資料存入redis進行讀取資料

<?php header("content-type:text/html;charset=utf8"); $page=isset($_GET['page'])?$_GET['page']:1;//判斷有沒有頁數,如果沒有頁數的話從一開始 $

使用POI操作Excel資料匯入mysql

原本的工程:POI-Excel.zip(下載)【需要自己註冊了一個碼雲帳號才可以下載】 利用POI將excel資料匯入資料庫目前還是比較流行,但是表格某些欄位需要進行特殊處理,比如日期,excel的日期在java裡面讀出來是一個數字(number)並非日期格式的字串,等等。1