1. 程式人生 > >spark三種清理資料的方式:UDF,自定義函式,spark.sql;Python中的zip()與*zip()函式詳解//及python中的*args和**kwargs

spark三種清理資料的方式:UDF,自定義函式,spark.sql;Python中的zip()與*zip()函式詳解//及python中的*args和**kwargs

(1)UDF的方式清理資料

import sys

reload(sys)
sys.setdefaultencoding('utf8')

import re
import json

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType,StructField,StructType
import copy

master_url = 'spark://sc-bd-10:7077'
spark = SparkSession.builder \ .master(master_url) \ .appName("saic_crawler_huangyu") \ .getOrCreate() spark.conf.set("spark.driver.maxResultSize", "4g") spark.conf.set("spark.sql.broadcastTimeout", 1200) spark.conf.set("spark.sql.crossJoin.enabled", "true") ####重點從這看: spark.sparkContext.addPyFile("clean_utils.py"
) from clean_utils import name_clean, parse_date parse_name_clean_udf = udf(name_clean, StringType()) #用udf的方式清洗資料(1),先要“註冊”! def load_basic_info(): basic_info_field_name_list = ["eid", "name", "reg_no", "zczj", "zczjbz", "type", "status", "clrq", "hzrq"] basic_info_schema = StructType([StructField(field_name, StringType(), True
) for field_name in basic_info_field_name_list]) # basic_info_schema = StructType([StructField(field_name, StringType(), True) for field_name in ["eid", "name", "province", "ent_type", "clrq", "hzrq", "zczj", "zczjbz", "status"]]) df_basic_info = spark.read.load("hdfs://sc-bd-10:9000/scdata/huangyu/pre_result/basic_info.csv", format="csv",schema=basic_info_schema, delimiter=',') df_basic_info.createOrReplaceTempView("basic_info") spark.sql("""select eid, name from basic_info """).createOrReplaceTempView("name_eid") def clean_case_shixin(): shixin_schema = StructType([StructField(field_name, StringType(), True) for field_name in ["company_name", "sc_data_id", "publish_date"]]) df_shixin = spark.read.load("hdfs://sc-bd-10:9000/scdata/huangyu/case/shixin_company.csv", format="csv", schema=shixin_schema, delimiter=',') #不帶表頭,自動推斷; df_shixin = df_shixin.withColumn("company_name", parse_name_clean_udf(df_shixin['company_name'])) #用udf的方式清洗資料(2) df_shixin = df_shixin.withColumn("publish_date", parse_date_udf(df_shixin['publish_date'])) df_shixin.createOrReplaceTempView("shixin") df_shixin_diff = spark.sql(""" select max(company_name) as company_name, max(sc_data_id) as sc_data_id, max(publish_date) as publish_date from shixin GROUP BY company_name, sc_data_id """) df_shixin_diff.createOrReplaceTempView("shixin_diff")

方式2,直接匯入函式進行清洗:

    @staticmethod
    def clean_row_name(row, raw_zhixing_info_field):
        row_dict = dict([(k, row[k]) for k in raw_zhixing_info_field])
        row_dict["company_name"] = name_clean(row_dict["company_name"])
        return row_dict

    def rdd_map_reduce_demo(self):
        # 注意 rdd 的時候需要將map reduce中的self 去掉
        clean_row_name = self.clean_row_name
        raw_zhixing_info_field = self.raw_zhixing_info_field
        self.df_zhixing_info_sample = self.df_zhixing_info_sample \
            .rdd \
            .map(lambda row: Row(**clean_row_name(row, raw_zhixing_info_field))) \
            .filter(lambda row: len(row["company_name"]) >= 5) \
            .toDF(self.zhixing_info_schema)

        self.df_zhixing_info_sample.createOrReplaceTempView("zhixing_info_sample")

方式3,用spark.sql清洗;

這裡寫程式碼片
zip()函式的定義

    從引數中的多個迭代器取元素組合成一個新的迭代器;
    返回:
    返回一個zip物件,其內部元素為元組;可以轉化為列表或元組;

    傳入引數:
    元組、列表、字典等迭代器。
    zip()函式的用法

    當zip()函式中只有一個引數時
    zip(iterable)從iterable中依次取一個元組,組成一個元組。

    示例:

    ## zip()函式單個引數
    list1 = [1, 2, 3, 4]
    tuple1 = zip(list1)
    # 列印zip函式的返回型別
    print("zip()函式的返回型別:\n", type(tuple1))
    # 將zip物件轉化為列表
    print("zip物件轉化為列表:\n", list(tuple1))

輸出:

    zip()函式的返回型別:
    <class 'zip'>
    zip物件轉化為列表:
    [(1,), (2,), (3,), (4,)]

    當zip()函式有兩個引數時
        zip(a,b)zip()函式分別從a和b依次各取出一個元素組成元組,再將依次組成的元組組合成一個新的迭代器--新的zip型別資料。
        注意:
            要求a與b的維數相同,當兩者具有相同的行數與列數時,正常組合對應位置元素即可;
            當a與b的行數或列數不同時,取兩者結構中最小的行數和列數,依照最小的行數和列數將對應位置的元素進行組合;這時相當於呼叫itertools.zip_longest(*iterables)函式。
    舉例:
    m = [[1,2,3], [4,5,6], [7,8,9]]
    n = [[2,2,2], [3,3,3], [4,4,4]]
    p = [[2,2,2], [3,3,3,]



*zip函式:
*zip()函式是zip()函式的逆過程,將zip物件變成原先組合前的資料。

    程式碼示例:

    ## *zip()函式
    print('=*'*10 + "*zip()函式" + '=*'*10)
    m = [[1, 2, 3],  [4, 5, 6],  [7, 8, 9]]
    n = [[2, 2, 2],  [3, 3, 3],  [4, 4, 4]]
    print("*zip(m, n)返回:\n", *zip(m, n))
    m2, n2 = zip(*zip(m, n))
    # 若相等,返回True;說明*zip為zip的逆過程
    print(m == list(m2) and n == list(n2))

輸出:

    *zip(m, n)返回:
    ([1, 2, 3], [2, 2, 2]) ([4, 5, 6], [3, 3, 3]) ([7, 8, 9], [4, 4, 4])
    True

(4)python中的*args和**kwargs

先來看一個例子:
複製程式碼

 1 >>> def foo(*args, **kwargs):
 2     print 'args =', args
 3     print 'kwargs = ', kwargs
 4     print '-----------------------'
 5 
 6     
 7 >>> if __name__ == '__main__':
 8     foo(1, 2, 3, 4)
 9     foo(a=1, b=2, c=3)
10     foo(1,2,3,4, a=1, b=2, c=3)
11     foo('a', 1, None, a=1, b='2', c=3)

複製程式碼

其輸出結果如下:
複製程式碼

 1 args = (1, 2, 3, 4)
 2 kwargs =  {}
 3 -----------------------
 4 args = ()
 5 kwargs =  {'a': 1, 'c': 3, 'b': 2}
 6 -----------------------
 7 args = (1, 2, 3, 4)
 8 kwargs =  {'a': 1, 'c': 3, 'b': 2}
 9 -----------------------
10 args = ('a', 1, None)
11 kwargs =  {'a': 1, 'c': 3, 'b': '2'}
12 -----------------------

複製程式碼

  從以上例子可以看出,這兩個是python中的可變引數。*args表示任何多個無名引數,它是一個tuple;**kwargs表示關鍵字引數,它是一個 dict。並且同時使用*args**kwargs時,*args引數列必須要在**kwargs前,像foo(a=1, b='2', c=3, a', 1, None, )這樣呼叫的話,會提示語法錯誤“SyntaxError: non-keyword arg after keyword arg”。如同所示: