Python基礎(六)--- Python爬蟲,Python整合Hbase,PythonWorldCount,Spark資料分析生成分析圖表
阿新 • • 發佈:2018-11-19
一、Python爬蟲 --------------------------------------------------- 1.測試
# -*- encoding=utf-8 -*- import urllib.request # 開啟url上的資源 resp = urllib.request.urlopen("http://focus.tianya.cn/") # 讀取內容,返回byte陣列 mybytes = resp.read() # 解碼bytes成為string mystr = mybytes.decode("utf-8") # 關閉資源 resp.close() # 輸出 print(mystr) # 匯入正則表示式模組 import re ptn = u'<a\s*href="([\u0000-\uffff&&^"]*?)"' #非貪婪模式 res = re.finditer(ptn,mystr) for r in res: addr = r.group(1); print(addr)
2.遞迴爬並儲存
# -*- encoding=utf-8 -*- import urllib.request import os; import re; def fileExists(url,localpath): path = url ; path = path.replace(":", "_"); path = path.replace("/", "$"); path = path.replace("?", "$"); path = localpath + "/" + path; return os.path.exists(path) ; #下載網頁方法 def download(url): #處理處理問題 path = url ; path = path.replace(":","_"); path = path.replace("/","$"); path = path.replace("?","$"); path = "d:/py/data/" + path; #判斷當前的網頁是否已經下載 resp = urllib.request.urlopen(url) pageBytes = resp.read() resp.close if not os.path.exists(path): #儲存檔案到磁碟 f = open(path,"wb"); f.write(pageBytes) ; f.close(); try: #解析網頁的內容 pageStr = pageBytes.decode("utf-8"); #解析href地址 pattern = u'<a[\u0000-\uffff&&^[href]]*href="([\u0000-\uffff&&^"]*?)"' res = re.finditer(pattern, pageStr) for r in res: addr = r.group(1); print(addr) if addr.startswith("//"): addr = addr.replace("//","http://"); #判斷網頁中是否包含自己的地址 if (addr.startswith("http://") and not fileExists(addr,"d:/py/data")): download(addr) ; except Exception as e: #print(url + " : 不是文字") ; #print(Exception) print(e) # print(pageBytes.decode("gbk", errors='ignore')); return ; download("http://www.jd.com");
二、Python協同hbase實現資料的寫入 -------------------------------------------------------------- 1.啟動hbase叢集 a.啟動zk b.啟動hadoop叢集 c.啟動hbase叢集 如果時鐘不同步。 $>su root $>xcall.sh "ntpdate asia.pool.ntp.org" 2.s100上啟動hbase的thriftserver伺服器,滿足和第三方應用通訊 $> hbase-daemon.sh start thrift2 3.檢視webui http://s100:9095/ //webui埠 $> netstat -anop | grep 9090 //9090 rpc埠 4.下載windows下thrift的編譯器,不需要安裝,僅僅是個工具。 thrift-0.10.0.exe 5.下載並安裝thrift的python模組. 5.1)下載檔案 thrift-0.10.0.tar.gz 5.2)tar開檔案 5.3)進入目錄 cmd>cd thrift-0.10.0\lib\py cmd>setup.py install ... Using c:\users\administrator\appdata\local\programs\python\python37\lib\site-packages\six-1.11.0-py3.7.egg Finished processing dependencies for thrift==0.10.0 6.測試在py檔案中是否能夠匯入 from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol 7.找到hbase.thrift檔案進行編譯,產生python檔案,使用以下命令進行編譯 [hbase.thrift檔案位於hbase安裝包下,找不到就去網上下載] cmd> thrift-0.10.0.exe -o ./out -gen py hbase.thrift 8.將生成的資料夾拷貝到idea/python模組下 a.在模組下新建一個pythonpackage, 叫mythrift b.將生成的py資料夾下的hbase資料夾拷貝到mythrift下 9.使用python操作hbase的表
# -*- encoding=utf-8 -*-
# 匯入原生模組
import os
# 匯入thrift的python模組
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
# 匯入自已編譯生成的hbase python模組
from mythrift.hbase import THBaseService
from mythrift.hbase.ttypes import *
from mythrift.hbase.ttypes import TResult
# 建立Socket連線,到s100:9090
transport = TSocket.TSocket('s100', 9090)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = THBaseService.Client(protocol)
# 開啟傳輸埠
transport.open()
# # put操作
# table = b'ns1:t1'
# row = b'row2'
# v1 = TColumnValue(b'f1', b'id', b'101')
# v2 = TColumnValue(b'f1', b'name', b'tomas')
# v3 = TColumnValue(b'f1', b'age', b'12')
# vals = [v1, v2, v3]
# put = TPut(row, vals)
# client.put(table, put)
# print("okkkk!!")
# transport.close()
# # get操作
# table = b'ns1:t1'
# rowkey=b"row2"
# col_id = TColumn(b"f1",b"id")
# col_name = TColumn(b"f1",b"name")
# col_age = TColumn(b"f1",b"age")
#
# cols = [col_id,col_name,col_age]
# get = TGet(rowkey,cols)
# res = client.get(table,get)
# print(res.columnValues)
# print(bytes.decode(res.columnValues[0].qualifier))
# print(bytes.decode(res.columnValues[0].family))
# print(res.columnValues[0].timestamp)
# print(bytes.decode(res.columnValues[0].value))
# # delete操作
# table = b'ns1:t1'
# rowkey = b"row2"
# col_id = TColumn(b"f1", b"id")
# col_name = TColumn(b"f1", b"name")
# col_age = TColumn(b"f1", b"age")
# cols = [col_id, col_name, col_age]
#
# #構造刪除物件
# delete = TDelete(rowkey,cols)
# res = client.deleteSingle(table, delete)
# transport.close()
# print("ok")
# scan 掃描操作
table = b'call:calllogs'
startRow = b'34,13520401111,20180114152647,0,13269364444,406'
stopRow = b'90,15032295555,20180922165903,0,15778421111,298'
dur = TColumn(b"f1", b"callDuration")
time = TColumn(b"f1", b"callTime")
caller = TColumn(b"f1", b"caller")
callee = TColumn(b"f1", b"callee")
cols = [dur, time,caller,callee]
scan = TScan(startRow=startRow,stopRow=stopRow,columns=cols)
r = client.getScannerResults(table,scan,100);
for x in r:
print("============")
print(bytes.decode(x.columnValues[0].qualifier))
print(bytes.decode(x.columnValues[0].family))
print(x.columnValues[0].timestamp)
print(bytes.decode(x.columnValues[0].value))
# scan 全表掃描操作
table = b'call:calllogs'
# startRow = b'34,13520401111,20180114152647,0,13269364444,406'
# stopRow = b'90,15032295555,20180922165903,0,15778421111,298'
dur = TColumn(b"f1", b"callDuration")
time = TColumn(b"f1", b"callTime")
caller = TColumn(b"f1", b"caller")
callee = TColumn(b"f1", b"callee")
cols = [dur, time,caller,callee]
scan = TScan(columns=cols)
r = client.getScannerResults(table,scan,100);
print(len(r))
for x in r:
print("============")
print(bytes.decode(x.columnValues[0].qualifier))
print(bytes.decode(x.columnValues[0].family))
print(x.columnValues[0].timestamp)
print(bytes.decode(x.columnValues[0].value))
三、SparkShell使用Python進行WorldCount ----------------------------------------------------------------- 1.本地模式[可以使用scala和python編寫] a.移除spark/conf/core-site.xml | hdfs-site.xml | hive-site.xml檔案 [這樣spark就不會去整合hive了] b.進入pyspark shell $> cd /soft/spark/bin $> ./pyspark --master local[*] >>> arr = [1,2,3,4] >>> rdd = sc.parellize(arr); >>> rdd.map(lambda e : (e,1)) #python的lamba表示式 2.WorldCountDemo >>> arr = ["tom","tom1","tom1","tom3"] >>> rdd1 = sc.parallelize(arr) >>> rdd1.collect() ['tom', 'tom1', 'tom1', 'tom3'] >>> rdd1.map(lambda e : (e,1)) PythonRDD[4] at RDD at PythonRDD.scala:48 >>> rdd2 = rdd1.map(lambda e : (e,1)) >>> rdd2.collect() [('tom', 1), ('tom1', 1), ('tom1', 1), ('tom3', 1)] >>> rdd3 = rdd2.reduceByKey() Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: reduceByKey() takes at least 2 arguments (1 given) >>> rdd3 = rdd2.reduceByKey(lambda a,b : a + b) >>> rdd3.collect() [('tom1', 2), ('tom3', 1), ('tom', 1)] 四、爬蟲程式更改 -- 將爬到的網頁儲存到hbase中 --------------------------------------------------------- 1.使用base64進行編解碼 import base64; url = b"http://tianya.cn"; b = base64.encodebytes(url); print(b) bb = base64.decodebytes(b) print(bb) 2.建立hbase表:pages hbase> create 'ns1:pages','f1' 3.編寫pageDao.py,專門處理hbase表的crud
# -*- encoding=utf-8 -*-
# 匯入原生模組
import os
import base64
# 匯入thrift的python模組
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
# 匯入自已編譯生成的hbase python模組
from mythrift.hbase import THBaseService
from mythrift.hbase.ttypes import *
from mythrift.hbase.ttypes import TResult
# 建立Socket連線,到s100:9090
transport = TSocket.TSocket('s100', 9090)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = THBaseService.Client(protocol)
#定義函式,儲存網頁
def savePage(url,page):
#
transport.open()
#對url進行base64編碼,形成bytes,作為rowkey
urlBase64Bytes = base64.encodebytes(url.encode("utf-8"))
# put操作
table = b'ns1:pages'
rowkey = urlBase64Bytes
v1 = TColumnValue(b'f1', b'page', page)
vals = [v1]
put = TPut(rowkey, vals)
client.put(table, put)
transport.close()
#判斷網頁是否存在
def exists(url):
transport.open()
# 對url進行base64編碼,形成bytes,作為rowkey
urlBase64Bytes = base64.encodebytes(url.encode("utf-8"))
print(urlBase64Bytes)
table = b'ns1:pages'
rowkey = urlBase64Bytes
col_page = TColumn(b"f1",b"page")
cols = [col_page]
get = TGet(rowkey,cols)
res = client.get(table, get)
transport.close()
return res.row is not None
4.編寫爬蟲程式
# -*- encoding=utf-8 -*-
import urllib.request
import os
import re
import pageDao
#下載網頁方法
def download(url):
#判斷當前的網頁是否已經下載
resp = urllib.request.urlopen(url)
pageBytes = resp.read()
resp.close
if not pageDao.exists(url):
pageDao.savePage(url, pageBytes);
try:
#解析網頁的內容
pageStr = pageBytes.decode("utf-8");
#解析href地址
pattern = u'<a[\u0000-\uffff&&^[href]]*href="([\u0000-\uffff&&^"]*?)"'
res = re.finditer(pattern, pageStr)
for r in res:
addr = r.group(1);
print(addr)
if addr.startswith("//"):
addr = addr.replace("//","http://");
#判斷網頁中是否包含自己的地址
if addr.startswith("http://") and url != addr and (not pageDao.exists(addr)):
download(addr) ;
except Exception as e:
print(e)
print(pageBytes.decode("gbk", errors='ignore'));
return ;
download("http://jd.com");
五、使用python實現spark的資料分析,生成分析圖表 ----------------------------------------------------------------- 1.Win上安裝pip安裝python的模組 1.numpy cmd> pip install -i https://pypi.tuna.tsinghua.edu.cn/simple numpy 2.scipy cmd> pip install -i https://pypi.tuna.tsinghua.edu.cn/simple scipy 3.matplotpy cmd> pip install -i https://pypi.tuna.tsinghua.edu.cn/simple matplotlib cmd> python -m pip install -U pip setuptools cmd> python -m pip install matplotlib 2.ubuntu下安裝numpy,scipy,pandas,matplotlib模組 a.numpy 安裝: 如果你的ubuntu中沒有安裝python,請首先安裝python 在終端輸入以下命令: $> sudo apt-get update $> sudo apt-get install python-numpy $> sudo apt-get install python3-numpy 如果是python3,則將上面的python-numpy換成python3-numpy即可 b.scipy $> sudo apt-get update $> sudo apt-get install python-scipy c.pandas $> sudo apt-get update $> sudo apt-get install python-pandas d.matplotlib $> sudo apt-get update $> sudo apt-get install python-matplotlib e.scikit-learn $> sudo apt-get update $> sudo apt-get install python-sklearn 3.進入PythonSparkShell cmd> ./pyspark --master local[*] $> ./pyspark --master local[*] 4.貼上下面程式碼,建立資料框 from pyspark.sql import Row import matplotlib.pyplot as plt import numpy as np import pylab as P plt.rcdefaults() dataDir ="file:///D://share//python//ml-data//ml-1m//users.dat" dataDir ="file:///mnt/hgfs/share/python/ml-data/ml-1m/users.dat" lines = sc.textFile(dataDir) splitLines = lines.map(lambda l: l.split("::")) usersRDD = splitLines.map(lambda p: Row(id=p[0],gender=p[1],age=int(p[2]), occupation=p[3], zipcode=p[4])) usersDF = spark.createDataFrame(usersRDD) usersDF.createOrReplaceTempView("users") usersDF.show() 5.生成圖表
#生成直方圖
ageDF = spark.sql("SELECT age FROM users")
ageList = ageDF.rdd.map(lambda p: p.age).collect()
ageDF.describe().show()
plt.hist(ageList)
plt.title("Age distribution of the users\n")
plt.xlabel("Age")
plt.ylabel("Number of users")
plt.show(block=False)
#密度圖
from scipy.stats import gaussian_kde
density = gaussian_kde(ageList)
xAxisValues = np.linspace(0,100,1000)
density.covariance_factor = lambda : .5
density._compute_covariance()
plt.title("Age density plot of the users\n")
plt.xlabel("Age")
plt.ylabel("Density")
plt.plot(xAxisValues, density(xAxisValues))
plt.show(block=False)
#生成巢狀子圖
plt.subplot(121)
plt.hist(ageList)
plt.title("Age distribution of the users\n")
plt.xlabel("Age")
plt.ylabel("Number of users")
plt.subplot(122)
plt.title("Summary of distribution\n")
plt.xlabel("Age")
plt.boxplot(ageList, vert=False)
plt.show(block=False)
#柱狀圖
occ10 = spark.sql("SELECT occupation, count(occupation) as usercount FROM users GROUP BY occupation ORDER BY usercount DESC LIMIT 10")
occ10.show()
occTuple = occ10.rdd.map(lambda p:(p.occupation,p.usercount)).collect()
occList, countList = zip(*occTuple)
occList
y_pos = np.arange(len(occList))
plt.barh(y_pos, countList, align='center', alpha=0.4)
plt.yticks(y_pos, occList)
plt.xlabel('Number of users')
plt.title('Top 10 user types\n')
plt.gcf().subplots_adjust(left=0.15)
plt.show(block=False)
#堆疊條形圖
occGender = spark.sql("SELECT occupation, gender FROM users")
occGender.show()
occCrossTab = occGender.stat.crosstab("occupation","gender")
occupationsCrossTuple = occCrossTab.rdd.map(lambda p:(p.occupation_gender,p.M, p.F)).collect()
occList, mList, fList = zip(*occupationsCrossTuple)
N = len(occList)
ind = np.arange(N)
width = 0.75
p1 = plt.bar(ind, mList, width, color='r')
p2 = plt.bar(ind, fList, width, color='y', bottom=mList)
plt.ylabel('Count')
plt.title('Gender distribution by occupation\n')
plt.xticks(ind + width/2., occList, rotation=90)
plt.legend((p1[0], p2[0]), ('Male', 'Female'))
plt.gcf().subplots_adjust(bottom=0.25)
plt.show(block=False)
#餅圖
occupationsBottom10 = spark.sql("SELECT occupation,count(occupation) as usercount FROM users GROUP BY occupation ORDER BY usercount LIMIT 10")
occupationsBottom10Tuple = occupationsBottom10.rdd.map(lambda p:(p.occupation,p.usercount)).collect()
occupationsBottom10List, countBottom10List =zip(*occupationsBottom10Tuple)
explode = (0, 0.3, 0.2, 0.15,0.1,0,0,0,0,0.1)
plt.pie(countBottom10List, explode=explode,labels=occupationsBottom10List, autopct='%1.1f%%', shadow=True,startangle=90)
plt.title('Bottom 10 user types\n')
plt.show(block=False)