1. 程式人生 > >Python爬蟲學習筆記——Python基礎

Python爬蟲學習筆記——Python基礎

Python爬蟲學習筆記——Python基礎

1 IO程式設計

1.1 檔案讀寫

Python內建了讀寫檔案的函式,語法為:
open(name[.mode[.buffering]])

#開啟檔案
f = open(r'C:\text\myTextFile.txt')
#讀取檔案
with open(r'C:\text\myTextFile.txt', 'r') as fileReader:
    print fileReader.read()
    #按每次讀取一行內容
    for line in fileReader.readlines():
        print
line.strip() #寫入檔案 with open(r'C:\text\myTextFile.txt', 'w') as fileReader: fileReader.write('myTextFile')

1.2 操作檔案和目錄

import os
import shutil
#獲取當前python指令碼工作的目錄路徑
os.getcwd()
#返回指定目錄的所有檔案和目錄名
os.listdir()
#刪除檔案
os.remove(filepath)
#刪除多個空目錄
os.removedirs(r'D:\python')
#檢驗給出的路徑是否是一個檔案
os.
path.isfile(filepath) #檢驗給出的路徑是否是一個目錄 os.path.isdir(filepath) #判斷是否是絕對路徑 os.path.isabs() #檢驗路徑是否存在 os.path.exists(r'D:\python') #分離一個路徑的目錄名和檔名 os.path.split() #分離副檔名 os.path.splitext() #獲取路徑名 os.path.dirname(filepath) #獲取檔名 os.path.basename(filepath) #讀取和設定環境變數 os.getenv() os.putenv() #給出當前平臺使用的行終止符
os.linesep #'\r\n' in Windows, '\n' in Linux #指示正在使用的平臺 os.name #重新命名檔案或者目錄 os.rename(old, new) #建立多級目錄 os.makedirs(r'C:\python\test') #建立單個目錄 os.mkdir('test') #獲取檔案屬性 os.stat(file) #修改檔案許可權與時間戳 os.chmod(file) #獲取檔案大小 os.path.getsize(filename) #複製資料夾 shutil.copytree('olddir', 'newdir') #複製檔案 shutil.copyfile('oldfile', 'newfile') #移動檔案 shutil.move('oldpos', 'newpos') #刪除目錄 os.rmdir('dir') #只能刪除空目錄 shutil.rmtree('dir') #可以刪除空目錄

1.3 序列化操作(把記憶體的變數變成可儲存或傳輸的過程)

import cPickle as pickle
d = dict(url='index.html', title='home page', content ='home page')
pickle.dumps(d)
f = open(r'D:\dump.txt', 'wb')
pickle.dump(d, f)
f.close()

#反序列化
f = open(r'D:\dump.txt', 'rb')
d = pickle.load(f)
f.close()
d

2 程序和執行緒

程序是程式在計算機上的執行活動,當執行一個程式是,就啟動一個程序。在Windows系統中,程序被細化為執行緒,作為可以獨立執行的單位。多程序,也就是說同一個系統中允許多個程序處於執行狀態,也稱為多工。在單CPU裡實現多程序,需要使用併發技術。

2.1 多程序

使用os模組的fork方法實現多程序,fork方法呼叫一次,返回兩次,作業系統將當前父程序複製出一份子程序,父程序返回子程序的ID,而子程序永遠返回0。

import os
if __name__ == '__main__':
    print('current process %s start ... ' %(os.getpid()))
    pid = os.fork()
    if pid < 0:
        print('error in fork')
    elif pid == 0:
        print('I am child process %s and my parent process is %s' %(os.getpid(), os.getppid()))
    else:
        print('I %s created a child process %s.' %(os.getpid(), pid))

使用multiprocessing模組實現多程序

import os
from multiprocessing import Process
def run_proc(name):
    print('child process %s (%s) running...' %(name, os.getpid()))
if __name__ == '__main__':
    print('parent process %s' %os.getpid())
    for i in range(5):
        p = Process(target=run_proc, args=(str(i), ))
        print('Process will start.')
        p.start()
    p.join()
    print('Process end.')

使用multiprocessing模組的Pool類來建立多程序

from multiprocessing import Pool
import os, time, random
def run_task(name):
    print('Task %s (pid=%s) is running...' %(name, os.getpid()))
    time.sleep(random.random()*3)
    print('Task %s end.' %name)
if __name__ == '__main__':
    print('current process %s' %os.getpid())
    p = Pool(processes=3)
    for i in range(5):
        p.apply_async(run_task, args=(i, ))
    print('waiting for all subprocesses done...')
    p.close()
    p.join()
    print('all subprocesses done.')

程序間通訊,使用Queue方式完成程序間通訊。

from multiprocessing import Process, Queue
import os, time, random

#寫資料程序執行的程式碼
def proc_write(q, urls):
    print('Process(%s) is writing...' %os.getpid())
    for url in urls:
        q.put(url)
        print('Put %s to queue...' %url)
        time.sleep(random.random())
#讀資料程序執行的程式碼
def proc_read(q):
    print('Process(%s) is reading...' %os.getpid())
    while True:
        url = q.get(True)
        print('Get %s from queue.' %url)

if __name__ == '__main__':
    #父程序建立Queue,並傳給各個子程序
    q = Queue()
    proc_writer1 = Process(target=proc_write, args=(q, ['url_1', 'url_2', 'url_3']))
    proc_writer2 = Process(target=proc_write, args=(q, ['url_4', 'url_5', 'url_6']))
    proc_reader = Process(target=proc_read, args=(q, ))
    #啟動子程序proc_write,寫入
    proc_writer1.start()
    proc_writer2.start()
    #啟動子程序proc_read,讀取
    proc_reader.start()
    #等待proc_writer結束
    proc_writer1.join()
    proc_writer2.join()
    #proc_reader程序裡是死迴圈,無法等待期結束,只能強行終止
    proc_reader.terminate()

使用Pipe方式完成程序間通訊

import multiprocessing
import random
import time, os

def proc_send(pipe, urls):
    for url in urls:
        print('Process(%s) send: %s' %(os.getpid(), url))
        pipe.send(url)
        time.sleep(random.random())
def proc_recv(pipe):
    while True:
        print('Process(%s) rev:%s' %(os.getpid(), pipe.recv()))
        time.sleep(random.random())
if __name__ == '__main__':
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc_send, args=(pipe[0], ['url_'+str(i) for i in range(10)]))
    p2 = multiprocessing.Process(target=proc_recv, args=(pipe[1], ))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

2.2 多執行緒

用threading模組建立多執行緒,第一種方式,把一個函式傳入並建立Thread例項,然後呼叫start方法開始執行。

import random
import time, threading
#新執行緒執行的程式碼
def thread_run(urls):
    print('Current %s is running...' % threading.current_thread().name)
    for url in urls:
        print('%s ---->>> %s' % (threading.current_thread().name, url))
        time.sleep(random.random())
    print('%s ended.' % threading.current_thread().name)
print('%s is running...' % threading.current_thread().name)
t1 = threading.Thread(target=thread_run, name='Thread_1', args=(['url_1', 'url_2', 'url_3'],))
t2 = threading.Thread(target=thread_run, name='Thread_2', args=(['url_4', 'url_5', 'url_6'],))
t1.start()
t2.start()
t1.join()
t2.join()
print('%s ended.' % threading.current_thread().name)

第二種方式直接從threading.Thread繼承並建立執行緒類,然後重寫__init__方法和run方法。

import random
import threading
import time
class myThread(threading.Thread):
    def __init__(self, name, urls):
        threading.Thread.__init__(self, name=name)
        self.urls = urls

    def run(self):
        print('Current %s is running...' % threading.current_thread().name)
        for url in self.urls:
            print('%s ---->>> %s' % ((threading.current_thread().name), url))
            time.sleep(random.random())
        print('%s ended.' % threading.current_thread().name)
print('%s is running...' % threading.current_thread().name)
t1 = myThread(name='Thread_1', urls=['url_1', 'url_2', 'url_3'])
t2 = myThread(name='Thread_2', urls=['url_4', 'url_5', 'url_6'])
t1.start()
t2.start()
t1.join()
t2.join()
print('%s ended.' % threading.current_thread().name)

執行緒同步

import threading
mylock = threading.RLock()
num = 0
class myThread(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self, name=name)

    def run(self):
        global num
        while True:
            mylock.acquire()
            print('%s locked, Number: %d' % (threading.current_thread().name, num))
            if num>=4:
                mylock.release()
                print('%s released, Number: %d' % (threading.current_thread().name, num))
                break
            num += 1
            print('%s released, Number: %d' % (threading.current_thread().name, num))
            mylock.release()
if __name__ == '__main__':
    thread1 = myThread('Thread_1')
    thread2 = myThread('Thread_2')
    thread1.start()
    thread2.start()

2.3 協程

協程,是一種輕量級執行緒,gevent是一個基於協程的Python網路函式庫,比較完善地提供了協程的支援。

from gevent import monkey;monkey.patch_all()
import gevent
import urllib2

def run_task(url):
    print('Visit --> %s' % url)
    try:
        response = urllib2.urlopen(url)
        data = response.read()
        print('%d bytes received from %s.' % (len(data), url))
    except Exception as e:
        print(e)

if __name__ == '__main__':
    urls = ['https://github.com/', 'https://www.python.org/', 'https://www.cnblogs.com/']
    greenlets = [gevent.spawn(run_task, url) for url in urls]
    gevent.joinall(greenlets)

使用gevent中的pool物件,對動態數量的greenlet進行併發管理

from gevent import monkey
monkey.patch_all()
import urllib2
from gevent.pool import Pool


def run_task(url):
    print('Visit --> %s' % url)
    try:
        response = urllib2.urlopen(url)
        data = response.read()
        print('%d bytes received from %s.' % (len(data), url))
    except Exception as e:
        print(e)
    return 'rul:%s --->finish'% url
  
if __name__ == '__main__':
    pool = Pool(2)
    urls = ['https://github.com/', 'https://www.python.org/', 'http://www.cnblogs.com/']
    results = pool.map(run_task,urls)
    print(results)

2.4 分散式程序

分散式程序指的是將Process程序分佈到多臺機器上。
首先建立服務程序:

import Queue
from multiprocessing.managers import baseManager
from mulitprocessing import freeze_support

#Task number and define receive/send queue
task_number = 10
task_queue = Queue.Queue(task_number)
result_queue = Queue.Queue(task_number)

def get_task():
    return task_queue
  
def get_result():
    return result_queue

#Create QueueManager class
class QueueManager(BaseManager):
    pass
def win_run():
    QueueManager.register('get_task_queue', callable = get_task)
    QueueManager.register('get_result_queue', callable = get_result)
    
    #Bind port and setup validation token
    manager = QueueManager(address = ('127.0.0.1',8001), authkey = 'enterprise')
    
    #Initiate
    manager.start()
    try:
        #Access task queue and result queue through network
        task = manager.get_task_queue()
        result = manager.get_result_queue()
        
        #Add task
        for url in ['ImageUrl_'+str(i) for i in range(10)]:
            print('Put task %s ...' % url)
            task.put(url)
        print('try get result')
        for i in range(10):
            print('result is %s ' % result.get(timeout=10))
    except:
        pritn('Manager error')
    finally:
        #Must close, or there will be an error
        manager.shutdown()
if __name__ == '__main__':
    freeze_support()
    win_run()

其次,建立任務程序:

improt time
from multiprocessing.managers import BaseManager


#Create QueueManager
class QueueManager(BaseManager):
    pass
  
#Use QueueManager to register
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

#Connect to the server
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)

#Port and validation token should be the same as the taskManager
m = QueueManager(address=(server_addr, 8001), authkey='enterprise')

m.connect()

#Access Queue object
task = m.get_task_queue()
result = m.get_result_queue()

#Access task from the queue and write into the result queue
while(not task.empty()):
    image_url = task.get(True,