1. 程式人生 > >安裝spark//python中os.path.abspath及os.path.join以及正態分佈PPF

安裝spark//python中os.path.abspath及os.path.join以及正態分佈PPF

命令:
vim ~/.bashrc
source ~/.bashrc
ps aux | grep spark
pkill -f "spark"
sudo chown -R sc:sc  spark-2.3.1-bin-hadoop2.7/
sudo mv /home/sc/Downloads/spark-2.3.1-bin-hadoop2.7 /opt/
$SPARK_HOME 檢視spark的路徑
http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz

None""在spark sql裡都是為null 
python funtools 模組常用函式
/home/sc/PycharmProjects/sc/model-feature-engine/biz/sub/dag.py

################
a = os.path.abspath(os.path.join(__file__, os.pardir, os.pardir)) b = os.pardir c = os.path.join(__file__) d1 = os.path.join(__file__,os.pardir) d2 = os.path.join(__file__,os.pardir,os.pardir) e1 = os.path.abspath(os.path.join(__file__)) e2 = os.path.abspath(os.path.join(__file__, os.pardir)) print(a) print(b) print(c) print(d1) print(d2) print(e1) print(e2) 結果: /home/sc/PycharmProjects/sc .. /home/sc/PycharmProjects/sc/model-feature-engine/temp4.py /home/sc/PycharmProjects/sc/model-feature-engine/temp4.py/.. /home/sc/PycharmProjects/sc/model-feature-engine/temp4.py/../.. /home/sc/PycharmProjects/sc/model-feature-engine/temp4.py /home/sc/PycharmProjects/sc/model-feature-engine Dataframe轉RDD RDD.map(func) RDD.map(lambda
x : func(x)) 上面兩個的區別;資料結構不一樣????? piplineRDD has no map 數值和鍵值對RDD 把一個普通的RDD轉化為pairRDD,可呼叫map函式實現,pairRDD schemaRDD讀取資料和執行查詢都會返回SchemaRDD。ShemaRDD和傳統資料庫裡面的表類似,從內部機理來看,SchemaRDD是由一個由Row物件組成的RDD,附帶包含每類資料型別的結構資訊。 Row物件只是對基本資料型別(如證性和字串型等)陣列的封裝。SchemaRDD仍然是RDD /home/sc/PycharmProjects/risk-model/etl_ljt_script/pysparkCsvUtils.py self.session.sparkContext.textFile filename = 'judgedoc_litigant.csv'
csvspath = readpath + filename sqlContext = SQLContext(sparkContext=sc) df = spark.sparkContext.textFile(csvspath) print(type(df)) # df = sqlContext.read.text(csvspath) # df.show() # 表是以逗號做出分隔 # dfrdd = df.rdd.map(lambda le: le.split(",")) dfrdd = df.map(mapper) # Infer the schema, and register the DataFrame as a table. 什麼情況下是pyspark..rdd sqlContext = SQLContext(sparkContext=sc) df = spark.sparkContext.textFile(hdfspath) #<class 'pyspark.rdd.RDD'> print(type(df)) dfrdd = df.map(mapper) print(type(dfrdd)) #<class 'pyspark.rdd.PipelinedRDD'> from pyspark.sql import SparkSession, SQLContext from pyspark.sql import Row from pyspark.sql.functions import udf from pyspark.sql.types import * def testjudgedoc(spark, sc): '''3)裁判文書(judgedoc_litigant.csv):''' filename = 'judgedoc_litigant.csv' csvspath = readpath + filename sqlContext = SQLContext(sparkContext=sc) df = spark.sparkContext.textFile(csvspath) if __name__ == '__main__': spark = SparkSession.builder.master(sparkpath) \ .appName("SC_ETL_ljt_spark").getOrCreate() sc = spark.sparkContext sc.addPyFile('pysparkCsvUtils.py') sc.addPyFile('caseReasonCode.py') sc.addPyFile('case_reason_reflection.py') sc.addPyFile('case_reason_map.py') # sc.addPyFile('parser.py') # sc.addPyFile('judgedoc.model.bin') sc.addPyFile('parse_util.py') sc.addPyFile('models.py') sc.addPyFile('parse_money.py') sc.addPyFile('map_litigant_type.py') # basic_datasource_example(spark) # testzhixing(spark,sc) # testRead(spark, sc, filename='company_patent.csv') # testjudgedoc(spark, sc) testcourtannouncement(spark, sc) # testcourtnotice(spark, sc) # testshixin(spark, sc) # testpublish(spark, sc) spark.stop() import math if revoke_prob == 1.0: score = 67 elif revoke_prob == 0.0: score = -133 else: score = (-6.78 + 14.13 * math.log(float(revoke_prob) / float(1.0 - revoke_prob))) score = float((score + 133))/2 return round(score, 2) In[12]: k1 = -2.78+14.13*math.log(100) In[13]: k1 Out[13]: 62.291054728011744 In[14]: k1 = -2.78+14.13*math.log(10000) In[15]: k1 Out[15]: 127.36210945602349 In[16]: k1 = -2.78+14.13*math.log(10000) In[17]: import scipy.stats as st In[18]: hyRankXPosition = st.norm.ppf(1, 0.3, 0.1) In[19]: hyRankXPosition Out[19]: inf In[20]: st.norm.ppf(0.5, 0.3, 0.1) Out[20]: 0.3 In[21]: st.norm.ppf(0.1, 0.3, 0.1) Out[21]: 0.17184484344553994 In[22]: st.norm.ppf(0.8, 0.3, 0.1) Out[22]: 0.38416212335729144 In[23]: st.norm.ppf(0.9, 0.3, 0.1) Out[23]: 0.42815515655446 In[24]: st.norm.ppf(0.9999, 0.3, 0.1) Out[24]: 0.6719016485455709 In[25]: st.norm.ppf(0.999999999, 0.3, 0.1) Out[25]: 0.8997807019601636 In[26]: st.norm.ppf(0.9999999999999, 0.3, 0.1) Out[26]: 1.0348754540300042 WORK_DIR = os.path.abspath(os.path.join(__file__, os.pardir, os.pardir)) _, WORK_PROJ = os.path.split(WORK_DIR) WORK_ZIP = os.path.join(WORK_DIR, "%s.zip" % WORK_PROJ) /home/sc/PycharmProjects/sc/model-feature-engine/dag.gv /home/sc/PycharmProjects/sc/model-feature-engine/biz/spark_session_utils/spark_session_utils.py HDFS_RISK_MODEL_NOT_DAILY_MID_SAIC +"/share_as_fr.csv" def bfs(self): source_set = set([link["source"] for link in self.Links]) target_set = set([link["target"] for link in self.Links]) root = source_set - target_set # 再加上不需要定時跑的表,且表已經存在了 not_need_to_run = set([link["target"] for link in self.Links if not link["need_to_run"]]) not_need_to_run_and_exist_set = set([node["id"] for node in self.Nodes if node.get("status") and node["id"] in not_need_to_run]) root = root.union(not_need_to_run_and_exist_set) step_limit = 10000 pre_set = root for i in range(step_limit+1): # 查詢能當前從存在的表中計算出的下一個表 links_as_pre_source = [link for link in self.Links if link["source"] in pre_set and not link.get("step")] tmp_target_to_add_pre = set() for link_as_pre_source in links_as_pre_source: tmp_source_set = set([link["source"] for link in self.Links if link["target"] == link_as_pre_source["target"]]) # 以target為終點的所有依賴的table都存在,則可以執行。 if len(tmp_source_set - pre_set) == 0: link_as_pre_source["step"] = i tmp_target_to_add_pre.add(link_as_pre_source["target"]) # 當前step判斷完了之後才能加入到pre_set pre_set = pre_set.union(tmp_target_to_add_pre) to_left_set = target_set - pre_set to_left_link = [link for link in self.Links if link["target"] in to_left_set] to_run_links = [link for link in self.Links if link["need_to_run"]] to_run_links = sorted(to_run_links, key=lambda _: _.get("step"), reverse=False) return to_left_link, to_run_links 改進: def bfs(self): source_set = set([link["source"] for link in self.Links]) target_set = set([link["target"] for link in self.Links]) root = source_set - target_set # 再加上不需要定時跑的表,且表已經存在了 # not_need_to_run_target = set([link["target"] for link in self.Links if not link["need_to_run"]]) # not_need_to_run_source = set([link["source"] for link in self.Links if not link["need_to_run"]]) # not_need_to_run_and_exist_set = set([node["id"] for node in self.Nodes if node.get("status") and node["id"] in not_need_to_run_target]) # root = root.union(not_need_to_run_and_exist_set) # root = root.union(not_need_to_run_source).union(not_need_to_run_source) step_limit = 10000 pre_set = root for i in range(1, step_limit+1): # 查詢能當前從存在的表中計算出的下一個表 links_as_pre_source = [link for link in self.Links if link["source"] in pre_set and not link.get("step")] tmp_target_to_add_pre = set() for link_as_pre_source in links_as_pre_source: tmp_source_set = set([link["source"] for link in self.Links if link["cls_name"] == link_as_pre_source["cls_name"]]) # 以target為終點的所有依賴的table都存在,則可以執行。 if len(tmp_source_set - pre_set) == 0: link_as_pre_source["step"] = i tmp_target_to_add_pre.add(link_as_pre_source["target"]) # 當前step判斷完了之後才能加入到pre_set pre_set = pre_set.union(tmp_target_to_add_pre) to_left_set = target_set - pre_set to_left_link = [link for link in self.Links if link["target"] in to_left_set] to_run_links = [link for link in self.Links if link["need_to_run"]] to_run_links = sorted(to_run_links, key=lambda _: _.get("step"), reverse=False) to_run_links_dif = [] to_run_cls_name_set = set() for a_t in to_run_links: if a_t["cls_name"] not in to_run_cls_name_set: to_run_links_dif.append(a_t) to_run_cls_name_set.add(a_t["cls_name"]) return to_left_link, to_run_links, to_run_links_dif 、#!/usr/bin/env python # encoding: utf-8 from conf.conf import SPARK_MASTER_URL, SPARK_TASK_NAME, WORK_ZIP from pyspark.sql import SparkSession from biz.sub.dag import SparkTask import abc class SparkSessionUtils(SparkTask): session = SparkSession.builder \ .master(SPARK_MASTER_URL) \ .appName(SPARK_TASK_NAME) \ .getOrCreate() session.conf.set("spark.driver.maxResultSize", "4g") session.conf.set("spark.sql.broadcastTimeout", 1200) session.conf.set("spark.sql.crossJoin.enabled", "true") # session.sparkContext.addPyFile(WORK_ZIP) # def add_zip_py(self): # self.session.sparkContext.addPyFile(WORK_ZIP) @abc.abstractmethod def run_task(self): raise NotImplementedError def _run_task(self): # self.add_zip_py() self.run_task() self.session.stop() 現在: from conf.conf import SPARK_MASTER_URL, SPARK_TASK_NAME, WORK_ZIP from pyspark.sql import SparkSession from biz.sub.dag import SparkTask import abc class SparkSessionUtils(SparkTask): session = None def __build_session(self): session = SparkSession.builder \ .master(SPARK_MASTER_URL) \ .appName(SPARK_TASK_NAME) \ .getOrCreate() session.conf.set("spark.driver.maxResultSize", "4g") session.conf.set("spark.sql.broadcastTimeout", 1200) session.conf.set("spark.sql.crossJoin.enabled", "true") self.session = session return self.session # session.sparkContext.addPyFile(WORK_ZIP) # def add_zip_py(self): # self.session.sparkContext.addPyFile(WORK_ZIP) @abc.abstractmethod def run_task(self): raise NotImplementedError def _run_task(self): self.__build_session() # self.add_zip_py() self.run_task() self.session.stop() #!/usr/bin/env python # encoding: utf-8 from conf.all_task_conf import ALL_SPARK_CLASS_TASK from conf.conf import HDFS_RISK_MODEL_AUTO_RAW from controller.oslo_utils.importutils import import_class from biz.sub.dag import TaskDag from fabric_utils.fabric_utils import FabricHdfsUtils, FabricDbUtils from scpy.logger import get_logger from biz.load_raw_data.sub.load_data_to_hdfs import LoadRawData import json logger = get_logger(__file__) class Controller(object): """ 控制層 負責 檢視 執行spark task class 裡面的那些表存在那些表不存在 生成計算圖,排程計算過程 """ def __init__(self): self.task_dag = TaskDag() self.cls_map = {} self._task_run_serial = [] self.fabric_hdfs_utils = FabricHdfsUtils() for cls_dict in ALL_SPARK_CLASS_TASK: cls_str = cls_dict.get("cls_name") this_cls = import_class(cls_str) self.cls_map[cls_str] = this_cls a_node_dag = getattr(this_cls(), "get_spark_task")() depend_tables = a_node_dag["depend_tables"] result_tables = a_node_dag["result_tables"] # 構建dag, 新增節點 self.task_dag.add_nodes(depend_tables+result_tables) # 構建dag, 新增邊 self.task_dag.add_dag(cls_dict, depend_tables, result_tables) def plot(self): self.analyse() self.task_dag.plot(view=True) def analyse(self): # 檢視那個表計算是存在的那個表是不存在的。 self.task_dag.set_table_info(self.fabric_hdfs_utils.hdfs_exits) # 做bfs to_left_link, self._task_run_serial = self.task_dag.bfs() to_left_tables = [_["target"] for _ in to_left_link] logger.info("to_left_tables:\n" + json.dumps(to_left_tables, ensure_ascii=False)) def run_all(self): self.analyse() for task_dict in self._task_run_serial: cls_name = task_dict.get("cls_name") if task_dict.get("need_to_run"): task = self.cls_map[cls_name] logger.info("task class %s starts" % cls_name) getattr(task(), "run_task")() logger.info("task class %s done" % cls_name) def run_single(self, cls_name): # self.analyse() task = self.cls_map[cls_name] getattr(task(), "run_task")() def load_not_exit(self): pass @staticmethod def reload_all_daily_hdfs(): fabric_hdfs_utils = FabricHdfsUtils() if fabric_hdfs_utils.hdfs_exits(HDFS_RISK_MODEL_AUTO_RAW): fabric_hdfs_utils.hdfs_rmr(HDFS_RISK_MODEL_AUTO_RAW) fabric_hdfs_utils.hdfs_mkdir(HDFS_RISK_MODEL_AUTO_RAW) LoadRawData().put_all_daily() @staticmethod def export_raw_data(): FabricDbUtils().export_all_raw_data_by_sh() def collect(self): """ 蒐集 """ pass def save_all(self): """ 儲存所有資料 :return: """ pass 現在: from conf.all_task_conf import ALL_SPARK_CLASS_TASK from conf.conf import HDFS_RISK_MODEL_AUTO_RAW from controller.oslo_utils.importutils import import_class from biz.sub.dag import TaskDag from fabric_utils.fabric_utils import FabricHdfsUtils, FabricDbUtils from scpy.logger import get_logger from biz.load_raw_data.sub.load_data_to_hdfs import LoadRawData import json logger = get_logger(__file__) class Controller(object): """ 控制層 負責 檢視 執行spark task class 裡面的那些表存在那些表不存在 生成計算圖,排程計算過程 """ def __init__(self): self.task_dag = TaskDag() self.cls_map = {} self._task_run_serial = [] self._task_run_serial_edg = [] self.fabric_hdfs_utils = FabricHdfsUtils() for cls_dict in ALL_SPARK_CLASS_TASK: cls_str = cls_dict.get("cls_name") this_cls = import_class(cls_str) self.cls_map[cls_str] = this_cls a_node_dag = getattr(this_cls(), "get_spark_task")() depend_tables = a_node_dag["depend_tables"] result_tables = a_node_dag["result_tables"] # 構建dag, 新增節點 self.task_dag.add_nodes(depend_tables+result_tables) # 構建dag, 新增邊 self.task_dag.add_dag(cls_dict, depend_tables, result_tables) def plot(self): self.analyse() self.task_dag.plot(view=True) def analyse(self): # 檢視那個表計算是存在的那個表是不存在的。 self.task_dag.set_table_info(self.fabric_hdfs_utils.hdfs_exits) # 做bfs to_left_link, self._task_run_serial_edg, self._task_run_serial = self.task_dag.bfs() to_left_tables = [_["target"] for _ in to_left_link] logger.info("to_left_tables:\n" + json.dumps(to_left_tables, ensure_ascii=False, indent=4)) logger.info("_task_run_serial:\n" + json.dumps(self._task_run_serial, ensure_ascii=False, indent=4)) def run_all(self): self.analyse() for task_dict in self._task_run_serial: cls_name = task_dict.get("cls_name") if task_dict.get("need_to_run"): task = self.cls_map[cls_name] logger.info("task class %s starts" % cls_name) getattr(task(), "run_task")() logger.info("task class %s done" % cls_name) def run_single(self, cls_name): # self.analyse() task = self.cls_map[cls_name] getattr(task(), "run_task")() def load_not_exit(self): pass @staticmethod def reload_all_daily_hdfs(): fabric_hdfs_utils = FabricHdfsUtils() if fabric_hdfs_utils.hdfs_exits(HDFS_RISK_MODEL_AUTO_RAW): fabric_hdfs_utils.hdfs_rmr(HDFS_RISK_MODEL_AUTO_RAW) fabric_hdfs_utils.hdfs_mkdir(HDFS_RISK_MODEL_AUTO_RAW) LoadRawData().put_all_daily() @staticmethod def export_raw_data(): FabricDbUtils().export_all_raw_data_by_sh() def collect(self): """ 蒐集 """ pass def save_all(self): """ 儲存所有資料 :return: """ pass /home/sc/PycharmProjects/sc/model-feature-engine/conf/all_task_conf.py /home/sc/PycharmProjects/sc/model-feature-engine/biz/feature/network/feature_extract_network_all_link_judgedoc_cnt.py h( t )=h _0( t )exp(%beta _1 x_1+%beta _2 x_2+...+%beta _p x_p ) from fabric_utils.fabric_utils import Deploy import argparse from conf.conf import WORK_DIR import os def build_run_task_fs(model="all", cls_name="", task_name="run.py"): task_fs_str = """ from controller.main_controller import Controller from functools import partial def run_model(model="all", cls_name=None): if model == "all": run = Controller().run_all elif model == "single" and cls_name and isinstance(cls_name, str): run = partial(Controller().run_single, cls_name=cls_name) else: raise Exception() return run run_model(model="%s", cls_name="%s")() """ % (model, cls_name) with open(os.path.join(WORK_DIR, task_name), "w") as fp: fp.write(task_fs_str) if __name__ == '__main__': task_name = "run.py" parser = argparse.ArgumentParser(add_help=False) parser.add_argument('-a', help='action. set_env, deploy_submit. (指定執行模式)', default='deploy_submit', type=str, choices=["set_env", "deploy_submit"]) parser.add_argument('-m', help='model. all, single. (提交執行的方式, all執行所有, single, 執行單一 class,執行單一class時需要指定cls)', default='all', choices=["all", "single"]) parser.add_argument('-cls', help='class name to run in single model。 單一模式下需要指定的cls', default="") parser.add_argument('--help', action='help') args = parser.parse_args() build_run_task_fs(args.m, args.cls, task_name) if args.a == "deploy_submit": Deploy().deploy() Deploy().run_submit_task(task_name) elif args.a == "set_env": Deploy().deploy() Deploy().setup_py_env() else: raise Exception("please run python deploy.py --help to get help")