1. 程式人生 > >探索 Pexpect,第 1 部分:剖析 Pexpect

探索 Pexpect,第 1 部分:剖析 Pexpect

概述

Pexpect 是 Don Libes 的 Expect 語言的一個 Python 實現,是一個用來啟動子程式,並使用正則表示式對程式輸出做出特定響應,以此實現與其自動互動的 Python 模組。 Pexpect 的使用範圍很廣,可以用來實現與 ssh、ftp 、telnet 等程式的自動互動;可以用來自動複製軟體安裝包並在不同機器自動安裝;還可以用來實現軟體測試中與命令列互動的自動化。

下載

Pexpect 可以從 SourceForge 網站下載。 本文介紹的示例使用的是 2.3 版本,如不說明測試環境,預設執行作業系統為 fedora 9 並使用 Python 2.5 。

安裝

1

2

3

4

download pexpect-2.3.tar.gz

tar zxvf pexpect-2.3.tar.g

cd pexpect-2.3

python setup.py install  (do this as root)

依賴

  • Python 版本 2.4 或者 2.5
  • pty module ,pty 是任何 Posix 系統標準庫的一部分

由於其依賴 pty module ,所以 Pexpect 還不能在 Windows 的標準 python 環境中執行,如果想在 Windows 平臺使用,可以使用在 Windows 中執行 Cygwin 做為替代方案。

遵循 MIT 許可證

Pexpect 提供的 run() 函式:

清單 1. run() 的定義

1

2

run(command,timeout=-1,withexitstatus=False,events=None,\

extra_args=None,logfile=None, cwd=None, env=None)

函式 run 可以用來執行命令,其作用與 Python os 模組中 system() 函式相似。run() 是通過 Pexpect 類實現的。

如果命令的路徑沒有完全給出,則 run 會使用 which 命令嘗試搜尋命令的路徑 。

清單 2. 使用 run()執行 svn 命令

1

2

from pexpect import *

run ("svn ci -m 'automatic commit' my_file.py")

與 os.system() 不同的是,使用 run() 可以方便地同時獲得命令的輸出結果與命令的退出狀態 。

清單 3. run() 的返回值

1

2

from pexpect import *

(command_output, exitstatus) = run ('ls -l /bin', withexitstatus=1)

command_out 中儲存的就是 /bin 目錄下的內容。

Pexpect 提供的 spawn() 類:

使用 Pexpect 啟動子程式

清單 4. spawn 的建構函式

1

2

3

class spawn:

def __init__(self,command,args=[],timeout=30,maxread=2000,\

searchwindowsize=None, logfile=None, cwd=None, env=None)

spawn是Pexpect模組主要的類,用以實現啟動子程式,它有豐富的方法與子程式互動從而實現使用者對子程式的控制。它主要使用 pty.fork() 生成子程序,並呼叫 exec() 系列函式執行 command 引數的內容。

可以這樣使用:

清單 5. spawn() 使用示例

1

2

3

child = pexpect.spawn ('/usr/bin/ftp') #執行ftp客戶端命令

child = pexpect.spawn ('/usr/bin/ssh [email protected]') #使用ssh登入目標機器

child = pexpect.spawn ('ls -latr /tmp') #顯示 /tmp目錄內容

當子程式需要引數時,還可以使用一個引數的列表:

清單 6. 引數列表示例

1

2

3

child = pexpect.spawn ('/usr/bin/ftp', [])

child = pexpect.spawn ('/usr/bin/ssh', ['[email protected]'])

child = pexpect.spawn ('ls', ['-latr', '/tmp'])

在建構函式中,maxread 屬性指定了 Pexpect 物件試圖從 tty 一次讀取的最大位元組數,它的預設值是2000位元組 。

由於需要實現不斷匹配子程式輸出, searchwindowsize 指定了從輸入緩衝區中進行模式匹配的位置,預設從開始匹配。

logfile 引數指定了 Pexpect 產生的日誌的記錄位置。

例如:

清單 7. 記錄日誌

1

2

3

child = pexpect.spawn('some_command')

fout = file('mylog.txt','w')

child.logfile = fout

還可以將日誌指向標準輸出:

清單 8. 將日誌指向標準輸出

1

2

child = pexpect.spawn('some_command')

child.logfile = sys.stdout

如果不需要記錄向子程式輸入的日誌,只記錄子程式的輸出,可以使用:

清單 9. 記錄輸出日誌

1

2

child = pexpect.spawn('some_command')

child.logfile_send = sys.stdout

使用 Pexpect 控制子程式

為了控制子程式,等待子程式產生特定輸出,做出特定的響應,可以使用 expect 方法。

清單 10. expect() 定義

1

expect(self, pattern, timeout=-1, searchwindowsize=None)

在引數中: pattern 可以是正則表示式, pexpect.EOF , pexpect.TIMEOUT ,或者由這些元素組成的列表。需要注意的是,當 pattern 的型別是一個列表時,且子程式輸出結果中不止一個被匹配成功,則匹配返回的結果是緩衝區中最先出現的那個元素,或者是列表中最左邊的元素。使用 timeout 可以指定等待結果的超時時間 ,該時間以秒為單位。當超過預訂時間時, expect 匹配到pexpect.TIMEOUT。

如果難以估算程式執行的時間,可以使用迴圈使其多次等待直至等待執行結束:

清單 11. 使用迴圈

1

2

3

4

5

6

7

8

while True:

index = child.expect(["suc","fail",pexpect.TIMEOUT])

if index == 0:

break

elif index == 1:

return False

elif index == 2:

pass        #continue to wait

expect() 在執行中可能會丟擲兩種型別的異常分別是 EOF and TIMEOUF,其中 EOF 通常代表子程式的退出, TIMEOUT 代表在等待目標正則表示式中出現了超時。

清單 12. 使用並捕獲異常

1

2

3

4

5

6

7

8

9

10

try:

index = pexpect (['good', 'bad'])

if index == 0:

do_something()

elif index == 1:

do_something_else()

except EOF:

do_some_other_thing()

except TIMEOUT:

do_something_completely_different()

此時可以將這兩種異常放入expect等待的目標列表中:

清單 13. 避免異常

1

2

3

4

5

6

7

8

9

index = p.expect (['good', 'bad', pexpect.EOF, pexpect.TIMEOUT])

if index == 0:

do_something()

elif index == 1:

do_something_else()

elif index == 2:

do_some_other_thing()

elif index == 3:

do_something_completely_different()

expect 不斷從讀入緩衝區中匹配目標正則表示式,當匹配結束時 pexpect 的 before 成員中儲存了緩衝區中匹配成功處之前的內容, pexpect 的 after 成員儲存的是緩衝區中與目標正則表示式相匹配的內容。

清單 14. 列印 before 成員的內容

1

2

3

child = pexpect.spawn('/bin/ls /')

child.expect (pexpect.EOF)

print child.before

此時 child.before 儲存的就是在根目錄下執行 ls 命令的結果。

清單 15. send 系列函式

1

2

3

send(self, s)

sendline(self, s='')

sendcontrol(self, char)

這些方法用來向子程式傳送命令,模擬輸入命令的行為。 與 send() 不同的是 sendline() 會額外輸入一個回車符 ,更加適合用來模擬對子程式進行輸入命令的操作。 當需要模擬傳送 “Ctrl+c” 的行為時,還可以使用 sendcontrol() 傳送控制字元。

清單 16. 傳送 ctrl+c

1

child.sendcontrol('c')

由於 send() 系列函式向子程式傳送的命令會在終端顯示,所以也會在子程式的輸入緩衝區中出現,因此不建議使用 expect 匹配最近一次 sendline() 中包含的字元。否則可能會在造成不希望的匹配結果。

清單 17. interact() 定義

1

interact(self, escape_character = chr(29), input_filter = None, output_filter = None)

Pexpect還可以呼叫interact() 讓出控制權,使用者可以繼續當前的會話控制子程式。使用者可以敲入特定的退出字元跳出,其預設值為“^]” 。

下面展示一個使用Pexpect和ftp互動的例項。

清單 18. ftp 互動的例項:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

# This connects to the openbsd ftp site and

# downloads the README file.

import pexpect

child = pexpect.spawn ('ftp ftp.openbsd.org')

child.expect ('Name .*: ')

child.sendline ('anonymous')

child.expect ('Password:')

child.sendline ('[email protected]')

child.expect ('ftp> ')

child.sendline ('cd pub/OpenBSD')

child.expect('ftp> ')

child.sendline ('get README')

child.expect('ftp> ')

child.sendline ('bye')

該程式與 ftp 做互動,登入到 ftp.openbsd.org ,當提述輸入登入名稱和密碼時輸入預設使用者名稱和密碼,當出現 “ftp>” 這一提示符時切換到 pub/OpenBSD 目錄並下載 README 這一檔案。

以下例項是上述方法的綜合應用,用來建立一個到遠端伺服器的 telnet 連線,並返回儲存該連線的 pexpect 物件。

清單 19. 登入函式:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

import re,sys,os

from pexpect import *

def telnet_login(server,user, passwd,shell_prompt= “#|->”): 

""" 

@summary: This logs the user into the given server.

It uses the 'shell_prompt' to try to find the prompt right after login.

When it finds the prompt it immediately tries to reset the prompt to '#UNIQUEPROMPT#'

more easily matched. 

@return: If Login successfully ,It will return a pexpect object   

@raise exception: RuntimeError will be raised when the cmd telnet failed or the user

and passwd do not match 

@attention:1. shell_prompt should not include '$',on some server, after sendline

(passwd)   the pexpect object will read a '$'. 

2.sometimes the server's output before its shell prompt will contain '#' or

'->'  So the caller should kindly assign the shell prompt 

""" 

if not server or not user \ 

or not passwd or not shell_prompt: 

raise RuntimeError, "You entered empty parameter for telnet_login " 

child = pexpect.spawn('telnet %s' % server) 

child.logfile_read = sys.stdout 

index = child.expect (['(?i)login:', '(?i)username', '(?i)Unknown host']) 

if index == 2: 

raise RuntimeError, 'unknown machine_name' + server 

child.sendline (user) 

child.expect ('(?i)password:') 

child.logfile_read = None  # To turn off log

child.sendline (passwd) 

while True: 

index = child.expect([pexpect.TIMEOUT,shell_prompt]) 

child.logfile_read = sys.stdout 

if index == 0: 

if re.search('an invalid login', child.before): 

raise RuntimeError, 'You entered an invalid login name or password.'

elif index == 1: 

break     

child.logfile_read = sys.stdout # To tun on log again

child.sendline(“PS1=#UNIQUEPROMPT#”) 

#This is very crucial to wait for PS1 has been modified successfully 

#child.expect(“#UNIQUEPROMPT#”) 

child.expect("%s.+%s" % (“#UNIQUEPROMPT#”, “#UNIQUEPROMPT#”)) 

return child

Pxssh 類的使用:

Pxssh 做為 pexpect 的派生類可以用來建立一個 ssh 連線,它相比其基類增加了如下方法:

login() 建立到目標機器的ssh連線 ;

losuckgout() 釋放該連線 ;

prompt() 等待提示符,通常用於等待命令執行結束。

下面的示例連線到一個遠端伺服器,執行命令並列印命令執行結果。

該程式首先接受使用者輸入使用者名稱和密碼,login 函式返回一個 pxssh 物件的連結,然後呼叫 sendline() 分別輸入 “uptime”、“ls” 等命令並列印命令輸出結果。

清單 20. pxssh 示例

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

import pxssh 

import getpass 

try:

s = pxssh.pxssh()

hostname = raw_input('hostname: ') 

username = raw_input('username: ') 

password = getpass.getpass('password: ') 

s.login (hostname, username, password) 

s.sendline ('uptime')  # run a command 

s.prompt()             # match the prompt 

print s.before         # print everything before the propt. 

s.sendline ('ls -l') 

s.prompt()

print s.before 

s.sendline ('df') 

s.prompt()

print s.before 

s.logout()

except pxssh.ExceptionPxssh, e: 

print "pxssh failed on login."

print str(e)

Pexpect 使用中需要注意的問題:

spawn() 引數的限制

在使用spawn執行命令時應該注意,Pexpect 並不與 shell 的元字元例如重定向符號 > 、>> 、管道 | ,還有萬用字元 * 等做互動,所以當想執行一個帶有管道的命令時必須另外啟動一個 shell ,為了使程式碼清晰,以下示例使用了引數列表例如:

清單 21. 啟動新的 shell 執行命令

1

2

3

shell_cmd = 'ls -l | grep LOG > log_list.txt'

child = pexpect.spawn('/bin/bash', ['-c', shell_cmd])

child.expect(pexpect.EOF)

與執行緒共同工作

Perl 也有 expect 的模組 Expect-1.21,但是 perl 的該模組在某些作業系統例如 fedora 9 或者 AIX 5 中不支援線上程中啟動程式執行以下例項試圖利用多線同時程登入到兩臺機器進行操作,不使用執行緒直接呼叫時 sub1() 函式可以正常工作,但是使用執行緒時在 fedora9 和 AIX 5 中都不能正常執行。

清單 22. perl 使用 expect 由於執行緒和 expect 共同使用導致不能正常工作的程式

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

use threads;

use Expect; 

$timeout = 5; 

my $thr = threads->create(\&sub1(first_server)); 

my $thr2 = threads->create(\&sub1(second_server)); 

sub sub1 

my $exp = new Expect; 

$exp -> raw_pty(1); 

$exp -> spawn ("telnet",$_[0]) or die "cannot access telnet";

$exp -> expect ( $timeout, -re=>'[Ll]ogin:' ); 

$exp -> send ( "user\n"); 

$exp -> expect ( $timeout, -re=>'[Pp]assword:' ); 

$exp -> send ( "password\n" ); 

$exp -> expect ( $timeout, -re=>" #" ); 

$exp -> send ( "date\n" );

$exp -> expect ( $timeout, -re=>'\w\w\w \w\w\w \d{1,2}

\d\d:\d\d:\d\d \w\w\w \d\d\d\d'); 

$localtime=$exp->match();

print "\tThe first server’s time is : $localtime\n"; 

$exp -> soft_close ();

print "This is the main thread!"; 

$thr->join();

$thr2->join();

Pexpect 則沒有這樣的問題,可以使用多執行緒並在執行緒中啟動程式執行。但是在某些作業系統如 fedora9 中不可以線上程之間傳遞 Pexpect 物件。

清單 使用 Pexpect 線上程中啟動控制子程式

請參見例項

應用例項:

在實際系統管理員的任務中,有時需要同時管理多臺機器,這個示例程式被用來自動編譯並安裝新的核心版本,並重啟。它使用多執行緒,每個執行緒都建立一個到遠端機器的 telnet 連線並執行相關命令。 該示例會使用上文中的登入函式

清單 23. 管理多臺機器示例

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

import sys,os 

from Login import *

PROMPT = “#UNIQUEPROMPT#”

class RefreshKernelThreadClass(threading.Thread): 

""" The thread to downLoad the kernel and install it on a new server  """

def __init__(self,server_name,user,passwd): 

threading.Thread.__init__(self) 

self.server_name_ = server_name 

self.user_ = user 

self.passwd_ = passwd 

self.result_ = [] # the result information of the thread

def run(self): 

self.setName(self.server_name_) # set the name of thread

try: 

#call the telnet_login to access the server through telnet

child = telnet_login(self.server_name_,self.user_, self.passwd_) 

except RuntimeError,ex: 

info = "telnet to machine %s failed with reason %s" % (self.server_name_, ex)

self.result_.=(False, self.server_name_+info)

return self.result_  

child.sendline(' cd ~/Download/dw_test && \

wget http://www.kernel.org/pub/linux/kernel/v2.6/linux-2.6.28.tar.gz && \

tar zxvf linux-2.6.28.tar.gz && \

cd linux-2.6.28 \

&& make mrproper && make allyesconfig and make -j 4 && make modules && \

make modules install && make install') 

# wail these commands finish

while True:

index = child.expect([PROMPT,pexpect.TIMEOUT,pexpect.EOF]) 

if index == 0:

break

elif index == 1:

pass

elif index ==2 :

self.result_=(False,'Sub process exit abnormally ')

return False

# reboot the server

child.sendline('shutdown -Fr') 

child.expect('\r\n') 

retry_times = 10

while retry_times > 0:  

index_shutdown = child.expect(["Unmounting the file systems",

pexpect.EOF, 

pexpect.TIMEOUT]) 

if index_shutdown == 0 or index_shutdown == 1 : 

break 

elif index_shutdown == 2:

retry_times = retry_times-1

if retry_times == 0: 

self.result_=(False,'Cannot shutdown ') 

return self.result_

def refresh_kernel(linux_server_list,same_user,same_passwd): 

""" 

@summary: The function is used to work on different linux servers to download

the same version linux kernel, conpile them and reboot all these servers 

To keep it simple we use the same user id and password on these servers   

""" 

if not type(linux_server_list) == list:

return (False,"Param %s Error!"%linux_server_list) 

if same_user is None or same_passwd is None or not 

type(same_user)== str or not type(same_passwd) == str: 

return (False,"Param Error!")

thread_list = []

# start threads to execute command on the remote servers

for i in range (len(linux_server_list)): 

thread_list[i] = RefreshKernelThreadClass(linux_server_list[i],

same_user,same_passwd)

thread_list[i].start()

# wait the threads finish

for i in range (len(linux_server_list)): 

thread_list[i].join()

# validate the result

for i in range (len(linux_server_list)): 

if thread_list[0].result_[0] == False: 

return False

else:

return True

if __name__ == "__main__":

refresh_kernel(server_list,"test_user","test_passwd")