1. 程式人生 > >Python基礎(六)--- Python爬蟲,Python整合Hbase,PythonWorldCount,Spark資料分析生成分析圖表

Python基礎(六)--- Python爬蟲,Python整合Hbase,PythonWorldCount,Spark資料分析生成分析圖表

一、Python爬蟲
---------------------------------------------------
    1.測試
        
# -*- encoding=utf-8 -*-

        import urllib.request

        # 開啟url上的資源
        resp = urllib.request.urlopen("http://focus.tianya.cn/")
        # 讀取內容,返回byte陣列
        mybytes = resp.read()
        # 解碼bytes成為string
        mystr = mybytes.decode("utf-8")
        # 關閉資源
        resp.close()
        # 輸出
        print(mystr)

        # 匯入正則表示式模組
        import re

        ptn = u'<a\s*href="([\u0000-\uffff&&^"]*?)"'        #非貪婪模式
        res = re.finditer(ptn,mystr)
        for r in res:
            addr = r.group(1);
            print(addr)


    2.遞迴爬並儲存
        
# -*- encoding=utf-8 -*-

        import urllib.request
        import os;
        import re;

        def fileExists(url,localpath):
            path = url ;
            path = path.replace(":", "_");
            path = path.replace("/", "$");
            path = path.replace("?", "$");
            path = localpath + "/" + path;
            return os.path.exists(path) ;

        #下載網頁方法
        def download(url):
            #處理處理問題
            path = url ;
            path = path.replace(":","_");
            path = path.replace("/","$");
            path = path.replace("?","$");
            path = "d:/py/data/" + path;

            #判斷當前的網頁是否已經下載
            resp = urllib.request.urlopen(url)
            pageBytes = resp.read()
            resp.close

            if not os.path.exists(path):
                #儲存檔案到磁碟
                f = open(path,"wb");
                f.write(pageBytes) ;
                f.close();

            try:
                #解析網頁的內容
                pageStr = pageBytes.decode("utf-8");
                #解析href地址
                pattern = u'<a[\u0000-\uffff&&^[href]]*href="([\u0000-\uffff&&^"]*?)"'
                res = re.finditer(pattern, pageStr)
                for r in res:
                    addr = r.group(1);
                    print(addr)
                    if addr.startswith("//"):
                        addr = addr.replace("//","http://");

                    #判斷網頁中是否包含自己的地址
                    if (addr.startswith("http://") and not fileExists(addr,"d:/py/data")):
                        download(addr) ;

            except Exception as e:
                #print(url + " : 不是文字") ;
                #print(Exception)
                print(e)
                # print(pageBytes.decode("gbk", errors='ignore'));
                return ;

        download("http://www.jd.com");


二、Python協同hbase實現資料的寫入
--------------------------------------------------------------
    1.啟動hbase叢集
        a.啟動zk

        b.啟動hadoop叢集

        c.啟動hbase叢集
            如果時鐘不同步。
            $>su root
            $>xcall.sh "ntpdate asia.pool.ntp.org"

    2.s100上啟動hbase的thriftserver伺服器,滿足和第三方應用通訊
        $> hbase-daemon.sh start thrift2

    3.檢視webui
        http://s100:9095/           //webui埠
        $> netstat -anop | grep 9090   //9090 rpc埠

    4.下載windows下thrift的編譯器,不需要安裝,僅僅是個工具。
        thrift-0.10.0.exe

    5.下載並安裝thrift的python模組.
        5.1)下載檔案
            thrift-0.10.0.tar.gz

        5.2)tar開檔案

        5.3)進入目錄
            cmd>cd thrift-0.10.0\lib\py
            cmd>setup.py install

            ...
            Using c:\users\administrator\appdata\local\programs\python\python37\lib\site-packages\six-1.11.0-py3.7.egg
            Finished processing dependencies for thrift==0.10.0

    6.測試在py檔案中是否能夠匯入
        from thrift import Thrift
        from thrift.transport import TSocket
        from thrift.transport import TTransport
        from thrift.protocol import TBinaryProtocol

    7.找到hbase.thrift檔案進行編譯,產生python檔案,使用以下命令進行編譯
        [hbase.thrift檔案位於hbase安裝包下,找不到就去網上下載]
        cmd> thrift-0.10.0.exe -o ./out -gen py hbase.thrift

    8.將生成的資料夾拷貝到idea/python模組下
        a.在模組下新建一個pythonpackage, 叫mythrift
        b.將生成的py資料夾下的hbase資料夾拷貝到mythrift下

    9.使用python操作hbase的表 
# -*- encoding=utf-8 -*-

        # 匯入原生模組
        import os

        # 匯入thrift的python模組
        from thrift import Thrift
        from thrift.transport import TSocket
        from thrift.transport import TTransport
        from thrift.protocol import TBinaryProtocol

        # 匯入自已編譯生成的hbase python模組
        from mythrift.hbase import THBaseService
        from mythrift.hbase.ttypes import *
        from mythrift.hbase.ttypes import TResult

        # 建立Socket連線,到s100:9090
        transport = TSocket.TSocket('s100', 9090)
        transport = TTransport.TBufferedTransport(transport)
        protocol = TBinaryProtocol.TBinaryProtocol(transport)
        client = THBaseService.Client(protocol)

        # 開啟傳輸埠
        transport.open()

        # # put操作
        # table = b'ns1:t1'
        # row = b'row2'
        # v1 = TColumnValue(b'f1', b'id', b'101')
        # v2 = TColumnValue(b'f1', b'name', b'tomas')
        # v3 = TColumnValue(b'f1', b'age', b'12')
        # vals = [v1, v2, v3]
        # put = TPut(row, vals)
        # client.put(table, put)
        # print("okkkk!!")
        # transport.close()

        # # get操作
        # table = b'ns1:t1'
        # rowkey=b"row2"
        # col_id = TColumn(b"f1",b"id")
        # col_name = TColumn(b"f1",b"name")
        # col_age = TColumn(b"f1",b"age")
        #
        # cols = [col_id,col_name,col_age]
        # get = TGet(rowkey,cols)
        # res = client.get(table,get)
        # print(res.columnValues)
        # print(bytes.decode(res.columnValues[0].qualifier))
        # print(bytes.decode(res.columnValues[0].family))
        # print(res.columnValues[0].timestamp)
        # print(bytes.decode(res.columnValues[0].value))

        # # delete操作
        # table = b'ns1:t1'
        # rowkey = b"row2"
        # col_id = TColumn(b"f1", b"id")
        # col_name = TColumn(b"f1", b"name")
        # col_age = TColumn(b"f1", b"age")
        # cols = [col_id, col_name, col_age]
        #
        # #構造刪除物件
        # delete = TDelete(rowkey,cols)
        # res = client.deleteSingle(table, delete)
        # transport.close()
        # print("ok")

        # scan 掃描操作
        table = b'call:calllogs'
        startRow = b'34,13520401111,20180114152647,0,13269364444,406'
        stopRow = b'90,15032295555,20180922165903,0,15778421111,298'
        dur = TColumn(b"f1", b"callDuration")
        time = TColumn(b"f1", b"callTime")
        caller = TColumn(b"f1", b"caller")
        callee = TColumn(b"f1", b"callee")
        cols = [dur, time,caller,callee]

        scan = TScan(startRow=startRow,stopRow=stopRow,columns=cols)
        r = client.getScannerResults(table,scan,100);
        for x in r:
            print("============")
            print(bytes.decode(x.columnValues[0].qualifier))
            print(bytes.decode(x.columnValues[0].family))
            print(x.columnValues[0].timestamp)
            print(bytes.decode(x.columnValues[0].value))

        # scan 全表掃描操作
        table = b'call:calllogs'
        # startRow = b'34,13520401111,20180114152647,0,13269364444,406'
        # stopRow = b'90,15032295555,20180922165903,0,15778421111,298'
        dur = TColumn(b"f1", b"callDuration")
        time = TColumn(b"f1", b"callTime")
        caller = TColumn(b"f1", b"caller")
        callee = TColumn(b"f1", b"callee")
        cols = [dur, time,caller,callee]

        scan = TScan(columns=cols)
        r = client.getScannerResults(table,scan,100);
        print(len(r))
        for x in r:
            print("============")
            print(bytes.decode(x.columnValues[0].qualifier))
            print(bytes.decode(x.columnValues[0].family))
            print(x.columnValues[0].timestamp)
            print(bytes.decode(x.columnValues[0].value))


三、SparkShell使用Python進行WorldCount
-----------------------------------------------------------------
    1.本地模式[可以使用scala和python編寫]
        a.移除spark/conf/core-site.xml | hdfs-site.xml | hive-site.xml檔案
            [這樣spark就不會去整合hive了]

        b.進入pyspark shell
            $> cd /soft/spark/bin
            $> ./pyspark --master local[*]
            >>> arr = [1,2,3,4]
            >>> rdd = sc.parellize(arr);
            >>> rdd.map(lambda e : (e,1))       #python的lamba表示式

    2.WorldCountDemo
        >>> arr = ["tom","tom1","tom1","tom3"]
        >>> rdd1 = sc.parallelize(arr)
        >>> rdd1.collect()
        ['tom', 'tom1', 'tom1', 'tom3']
        >>> rdd1.map(lambda e : (e,1))
        PythonRDD[4] at RDD at PythonRDD.scala:48
        >>> rdd2 = rdd1.map(lambda e : (e,1))
        >>> rdd2.collect()
        [('tom', 1), ('tom1', 1), ('tom1', 1), ('tom3', 1)]
        >>> rdd3 = rdd2.reduceByKey()
        Traceback (most recent call last):
          File "<stdin>", line 1, in <module>
        TypeError: reduceByKey() takes at least 2 arguments (1 given)
        >>> rdd3 = rdd2.reduceByKey(lambda a,b : a + b)
        >>> rdd3.collect()
        [('tom1', 2), ('tom3', 1), ('tom', 1)]


四、爬蟲程式更改 -- 將爬到的網頁儲存到hbase中
---------------------------------------------------------
    1.使用base64進行編解碼
        import base64;

        url = b"http://tianya.cn";
        b = base64.encodebytes(url);
        print(b)
        bb = base64.decodebytes(b)
        print(bb)

    2.建立hbase表:pages
        hbase> create 'ns1:pages','f1'

    3.編寫pageDao.py,專門處理hbase表的crud
        
# -*- encoding=utf-8 -*-

        # 匯入原生模組
        import os
        import base64

        # 匯入thrift的python模組
        from thrift import Thrift
        from thrift.transport import TSocket
        from thrift.transport import TTransport
        from thrift.protocol import TBinaryProtocol

        # 匯入自已編譯生成的hbase python模組
        from mythrift.hbase import THBaseService
        from mythrift.hbase.ttypes import *
        from mythrift.hbase.ttypes import TResult

        # 建立Socket連線,到s100:9090
        transport = TSocket.TSocket('s100', 9090)
        transport = TTransport.TBufferedTransport(transport)
        protocol = TBinaryProtocol.TBinaryProtocol(transport)
        client = THBaseService.Client(protocol)

        #定義函式,儲存網頁
        def savePage(url,page):
            #
            transport.open()
            #對url進行base64編碼,形成bytes,作為rowkey
            urlBase64Bytes = base64.encodebytes(url.encode("utf-8"))

            # put操作
            table = b'ns1:pages'
            rowkey = urlBase64Bytes
            v1 = TColumnValue(b'f1', b'page', page)
            vals = [v1]
            put = TPut(rowkey, vals)
            client.put(table, put)
            transport.close()

        #判斷網頁是否存在
        def exists(url):
            transport.open()
            # 對url進行base64編碼,形成bytes,作為rowkey
            urlBase64Bytes = base64.encodebytes(url.encode("utf-8"))
            print(urlBase64Bytes)

            table = b'ns1:pages'
            rowkey = urlBase64Bytes
            col_page = TColumn(b"f1",b"page")

            cols = [col_page]
            get = TGet(rowkey,cols)
            res = client.get(table, get)
            transport.close()
            return res.row is not None


    4.編寫爬蟲程式
       
 # -*- encoding=utf-8 -*-

        import urllib.request
        import os
        import re
        import pageDao

        #下載網頁方法
        def download(url):
            #判斷當前的網頁是否已經下載
            resp = urllib.request.urlopen(url)
            pageBytes = resp.read()
            resp.close

            if not pageDao.exists(url):
                pageDao.savePage(url, pageBytes);

            try:
                #解析網頁的內容
                pageStr = pageBytes.decode("utf-8");
                #解析href地址
                pattern = u'<a[\u0000-\uffff&&^[href]]*href="([\u0000-\uffff&&^"]*?)"'
                res = re.finditer(pattern, pageStr)
                for r in res:
                    addr = r.group(1);
                    print(addr)
                    if addr.startswith("//"):
                        addr = addr.replace("//","http://");

                    #判斷網頁中是否包含自己的地址
                    if addr.startswith("http://") and url != addr and (not pageDao.exists(addr)):
                        download(addr) ;

            except Exception as e:
                print(e)
                print(pageBytes.decode("gbk", errors='ignore'));
                return ;

        download("http://jd.com");



五、使用python實現spark的資料分析,生成分析圖表
-----------------------------------------------------------------
    1.Win上安裝pip安裝python的模組
        1.numpy
            cmd> pip install -i https://pypi.tuna.tsinghua.edu.cn/simple numpy

        2.scipy
            cmd> pip install -i https://pypi.tuna.tsinghua.edu.cn/simple scipy

        3.matplotpy
            cmd> pip install -i https://pypi.tuna.tsinghua.edu.cn/simple matplotlib
            cmd> python -m pip install -U pip setuptools
            cmd> python -m pip install matplotlib

    2.ubuntu下安裝numpy,scipy,pandas,matplotlib模組
        a.numpy
            安裝:
            如果你的ubuntu中沒有安裝python,請首先安裝python
            在終端輸入以下命令:
            $> sudo apt-get update
            $> sudo apt-get install python-numpy
            $> sudo apt-get install python3-numpy
            如果是python3,則將上面的python-numpy換成python3-numpy即可

        b.scipy
            $> sudo apt-get update
            $> sudo apt-get install python-scipy

        c.pandas
            $> sudo apt-get update
            $> sudo apt-get install python-pandas

        d.matplotlib
            $> sudo apt-get update
            $> sudo apt-get install python-matplotlib

        e.scikit-learn
            $> sudo apt-get update
            $> sudo apt-get install python-sklearn

    3.進入PythonSparkShell
        cmd> ./pyspark  --master local[*]
        $> ./pyspark --master local[*]

    4.貼上下面程式碼,建立資料框
        from pyspark.sql import Row
        import matplotlib.pyplot as plt
        import numpy as np
        import pylab as P
        plt.rcdefaults()
        dataDir ="file:///D://share//python//ml-data//ml-1m//users.dat"
        dataDir ="file:///mnt/hgfs/share/python/ml-data/ml-1m/users.dat"
        lines = sc.textFile(dataDir)
        splitLines = lines.map(lambda l: l.split("::"))
        usersRDD = splitLines.map(lambda p: Row(id=p[0],gender=p[1],age=int(p[2]), occupation=p[3], zipcode=p[4]))
        usersDF = spark.createDataFrame(usersRDD)
        usersDF.createOrReplaceTempView("users")
        usersDF.show()

    5.生成圖表
        























#生成直方圖
        ageDF = spark.sql("SELECT age FROM users")
        ageList = ageDF.rdd.map(lambda p: p.age).collect()
        ageDF.describe().show()
        plt.hist(ageList)
        plt.title("Age distribution of the users\n")
        plt.xlabel("Age")
        plt.ylabel("Number of users")
        plt.show(block=False)

        #密度圖
        from scipy.stats import gaussian_kde
        density = gaussian_kde(ageList)
        xAxisValues = np.linspace(0,100,1000)
        density.covariance_factor = lambda : .5
        density._compute_covariance()
        plt.title("Age density plot of the users\n")
        plt.xlabel("Age")
        plt.ylabel("Density")
        plt.plot(xAxisValues, density(xAxisValues))
        plt.show(block=False)

        #生成巢狀子圖
        plt.subplot(121)
        plt.hist(ageList)
        plt.title("Age distribution of the users\n")
        plt.xlabel("Age")
        plt.ylabel("Number of users")
        plt.subplot(122)
        plt.title("Summary of distribution\n")
        plt.xlabel("Age")
        plt.boxplot(ageList, vert=False)
        plt.show(block=False)

        #柱狀圖
        occ10 = spark.sql("SELECT occupation, count(occupation) as usercount FROM users GROUP BY occupation ORDER BY usercount DESC LIMIT 10")
        occ10.show()

        occTuple = occ10.rdd.map(lambda p:(p.occupation,p.usercount)).collect()
        occList, countList = zip(*occTuple)
        occList

        y_pos = np.arange(len(occList))
        plt.barh(y_pos, countList, align='center', alpha=0.4)
        plt.yticks(y_pos, occList)
        plt.xlabel('Number of users')
        plt.title('Top 10 user types\n')
        plt.gcf().subplots_adjust(left=0.15)
        plt.show(block=False)


        #堆疊條形圖
        occGender = spark.sql("SELECT occupation, gender FROM users")
        occGender.show()

        occCrossTab = occGender.stat.crosstab("occupation","gender")
        occupationsCrossTuple = occCrossTab.rdd.map(lambda p:(p.occupation_gender,p.M, p.F)).collect()
        occList, mList, fList = zip(*occupationsCrossTuple)
        N = len(occList)
        ind = np.arange(N)
        width = 0.75
        p1 = plt.bar(ind, mList, width, color='r')
        p2 = plt.bar(ind, fList, width, color='y', bottom=mList)
        plt.ylabel('Count')
        plt.title('Gender distribution by occupation\n')
        plt.xticks(ind + width/2., occList, rotation=90)
        plt.legend((p1[0], p2[0]), ('Male', 'Female'))
        plt.gcf().subplots_adjust(bottom=0.25)
        plt.show(block=False)

        #餅圖
        occupationsBottom10 = spark.sql("SELECT occupation,count(occupation) as usercount FROM users GROUP BY occupation ORDER BY usercount LIMIT 10")
        occupationsBottom10Tuple = occupationsBottom10.rdd.map(lambda p:(p.occupation,p.usercount)).collect()
        occupationsBottom10List, countBottom10List =zip(*occupationsBottom10Tuple)
        explode = (0, 0.3, 0.2, 0.15,0.1,0,0,0,0,0.1)
        plt.pie(countBottom10List, explode=explode,labels=occupationsBottom10List, autopct='%1.1f%%', shadow=True,startangle=90)
        plt.title('Bottom 10 user types\n')
        plt.show(block=False)