pyspark工具機器學習(自然語言處理和推薦系統)2資料處理2

圖片.png
使用者定義函式(UDF:User-Defined Functions)
UDF廣泛用於資料處理,以轉換資料幀。 PySpark中有兩種型別的UDF:常規UDF和Pandas UDF。 Pandas UDF在速度和處理時間方面更加強大。
- 傳統的Python函式
>>> from pyspark.sql.functions import udf >>> def price_range(brand): ...prices = {"Samsung":'High Price', "Apple":'High Price', "MI":'Mid Price'} ...return prices.get('test',"Low Price") ... >>> brand_udf=udf(price_range,StringType()) >>> df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False) +-------+---+----------+------+-------+-----------+ |ratings|age|experience|family|mobile |price_range| +-------+---+----------+------+-------+-----------+ |3|32 |9.0|3|Vivo|Low Price| |3|27 |13.0|3|Apple|Low Price| |4|22 |2.5|0|Samsung|Low Price| |4|37 |16.5|4|Apple|Low Price| |5|27 |9.0|1|MI|Low Price| |4|27 |9.0|0|Oppo|Low Price| |5|37 |23.0|5|Vivo|Low Price| |5|37 |23.0|5|Samsung|Low Price| |3|22 |2.5|0|Apple|Low Price| |3|27 |6.0|0|MI|Low Price| +-------+---+----------+------+-------+-----------+ only showing top 10 rows >>>
- Lambda函式
>>> age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType()) >>> df.withColumn("age_group", age_udf(df.age)).show(10,False) +-------+---+----------+------+-------+---------+ |ratings|age|experience|family|mobile |age_group| +-------+---+----------+------+-------+---------+ |3|32 |9.0|3|Vivo|senior| |3|27 |13.0|3|Apple|young| |4|22 |2.5|0|Samsung|young| |4|37 |16.5|4|Apple|senior| |5|27 |9.0|1|MI|young| |4|27 |9.0|0|Oppo|young| |5|37 |23.0|5|Vivo|senior| |5|37 |23.0|5|Samsung|senior| |3|22 |2.5|0|Apple|young| |3|27 |6.0|0|MI|young| +-------+---+----------+------+-------+---------+ only showing top 10 rows

圖片.png
- PandasUDF(向量化UDF)
有兩種型別的Pandas UDF:Scalar和GroupedMap。
Pandas UDF與使用基本UDf非常相似。我們必須首先從PySpark匯入pandas_udf並將其應用於要轉換的任何特定列。
>>> from pyspark.sql.functions import pandas_udf >>> def remaining_yrs(age): ...return (100-age) ... >>> from pyspark.sql.types import IntegerType >>> length_udf = pandas_udf(remaining_yrs, IntegerType()) >>> df.withColumn("yrs_left", length_udf(df['age'])).show(10,False) /opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream warnings.warn("pyarrow.open_stream is deprecated, please use " +-------+---+----------+------+-------+--------+ |ratings|age|experience|family|mobile |yrs_left| +-------+---+----------+------+-------+--------+ |3|32 |9.0|3|Vivo|68| |3|27 |13.0|3|Apple|73| |4|22 |2.5|0|Samsung|78| |4|37 |16.5|4|Apple|63| |5|27 |9.0|1|MI|73| |4|27 |9.0|0|Oppo|73| |5|37 |23.0|5|Vivo|63| |5|37 |23.0|5|Samsung|63| |3|22 |2.5|0|Apple|78| |3|27 |6.0|0|MI|73| +-------+---+----------+------+-------+--------+ only showing top 10 rows
- PandasUDF(多列)
>>> def prod(rating,exp): ...return rating*exp ... >>> prod_udf = pandas_udf(prod, DoubleType()) >>> df.withColumn("product",prod_udf(df['ratings'], df['experience'])).show(10,False) /opt/anaconda3/lib/python3.6/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream warnings.warn("pyarrow.open_stream is deprecated, please use " +-------+---+----------+------+-------+-------+ |ratings|age|experience|family|mobile |product| +-------+---+----------+------+-------+-------+ |3|32 |9.0|3|Vivo|27.0| |3|27 |13.0|3|Apple|39.0| |4|22 |2.5|0|Samsung|10.0| |4|37 |16.5|4|Apple|66.0| |5|27 |9.0|1|MI|45.0| |4|27 |9.0|0|Oppo|36.0| |5|37 |23.0|5|Vivo|115.0| |5|37 |23.0|5|Samsung|115.0| |3|22 |2.5|0|Apple|7.5| |3|27 |6.0|0|MI|18.0| +-------+---+----------+------+-------+-------+ only showing top 10 rows
刪除重複值
>>> df.count() 33 >>> df=df.dropDuplicates() >>> df.count() 26
刪除列
>>> df_new=df.drop('mobile') >>> df_new.show() +-------+---+----------+------+ |ratings|age|experience|family| +-------+---+----------+------+ |3| 32|9.0|3| |4| 22|2.5|0| |5| 27|6.0|0| |4| 22|6.0|1| |3| 27|6.0|0| |2| 32|16.5|2| |4| 27|9.0|0| |2| 27|9.0|2| |3| 37|16.5|5| |4| 27|6.0|1| |5| 37|23.0|5| |2| 27|6.0|2| |4| 37|6.0|0| |5| 37|23.0|5| |4| 37|9.0|2| |5| 37|13.0|1| |5| 27|2.5|0| |3| 42|23.0|5| |5| 22|2.5|0| |1| 37|23.0|5| +-------+---+----------+------+ only showing top 20 rows
參考資料
- python測試開發專案實戰-目錄
- python工具書籍下載-持續更新
- python 3.7極速入門教程 - 目錄
- 討論qq群630011153 144081101
- 原文地址
- 本文涉及的python測試開發庫 謝謝點贊!
- [本文相關海量書籍下載]( https://github.com/china-testing/python-api-tesing/blob/master/books.md
- http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html
寫資料
- CSV
如果我們想以原始csv格式將其儲存為單個檔案,我們可以在spark中使用coalesce函式。
>>> write_uri = '/home/andrew/test.csv' >>> df.coalesce(1).write.format("csv").option("header","true").save(write_uri)
- Parquet
如果資料集很大且涉及很多列,我們可以選擇對其進行壓縮並將其轉換為Parquet檔案格式。它減少了資料的整體大小並在處理資料時優化了效能,因為它可以處理所需列的子集而不是整個資料。
我們可以輕鬆地將資料幀轉換並儲存為Parquet格式。
注意完整的資料集以及程式碼可以在本書的GitHub儲存庫中進行參考,並在onSpark 2.3及更高版本上執行最佳。

圖片.png