1. 程式人生 > >module05-1-基於RabbitMQ rpc實現的主機管理

module05-1-基於RabbitMQ rpc實現的主機管理

not exit net 目錄 min .py 取值 event 機器

需求



題目:rpc命令端

需求:

可以異步的執行多個命令
對多臺機器

>>:run "df -h" --hosts 192.168.3.55 10.4.3.4
task id: 45334
>>: check_task 45334
>>:

實現需求



  1. 實現全部需求

  2.會緩存已建立過的連接,減少短時間內連接相同主機時再次建立連接的開銷

  3.定時清理緩存的連接

目錄結構


rabbitmq_server
    ├ bin   # 執行文件目錄
    |   └ rabbitmq_server.py     # 執行程序接口 
    ├ conf  # 配置文件目錄
    |   └ setting.py           # 配置文件。目前主要保存用以連接RabbitMQ服務器的遠程用戶權限
    └ core  # 程序核心代碼位置
        └ main.py           # 主交互邏輯
 

rabbitmq_client
    ├ bin   # 執行文件目錄
    |   └ rabbitmq_client.py     # 執行程序 
    ├ conf  # 配置文件目錄
    |   └ setting.py           # 配置文件。目前主要保存用以連接RabbitMQ服務器的遠程用戶權限,以及緩存連接的保存時間
    └ core  # 程序核心代碼位置
        └ main.py           # 主邏輯交互程序

代碼


rabbitmq_server 技術分享
1 import os,sys
2 
3 BasePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
4 sys.path.insert(0,BasePath)
5 
6 from core import main
7 main.main()
rabbitmq_server.py 技術分享
 1 #! /usr/bin/env python3
 2 # -*- coding:utf-8 -*-
 3 # Author:Jailly
 4 
 5 import os,sys,pika,subprocess,locale,threading
6 7 BasePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 8 sys.path.insert(0,BasePath) 9 10 from conf import setting 11 12 sys_encode = locale.getdefaultlocale()[1] 13 username = setting.username 14 password = setting.password 15 16 credentials = pika.PlainCredentials(username,password)
17 18 19 def cb(ch,method,properties,body): 20 command = body.decode(utf-8) 21 print(Received:,command) 22 23 res = subprocess.Popen(command,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 24 out,err = res.communicate() 25 res_con = err.decode(sys_encode) if err else out.decode(sys_encode) 26 27 ch.basic_publish( 28 exchange = ‘‘, 29 routing_key = properties.reply_to, 30 properties = pika.BasicProperties( 31 correlation_id = properties.correlation_id 32 ), 33 body = res_con 34 ) 35 print(send:,res_con) 36 37 38 39 def main(): 40 try: 41 conn = pika.BlockingConnection(pika.ConnectionParameters(127.0.0.1, 5672, /, credentials)) 42 except Exception as e: 43 print(e) 44 else: 45 46 try: 47 ch = conn.channel() 48 ch.queue_declare(queue=rpc_queue) 49 ch.queue_purge(queue=rpc_queue) 50 51 ch.basic_consume( 52 cb, 53 queue=rpc_queue 54 ) 55 56 ch.start_consuming() 57 58 except KeyboardInterrupt: 59 conn.close() 60 print(Server closed) 61 except Exception as e: 62 conn.close() 63 print(Server down because of,e) 64 65 if __name__ == __main__: 66 main()
main.py 技術分享
1 #! /usr/bin/env python3
2 # -*- coding:utf-8 -*-
3 # Author:Jailly
4 
5 username = jailly
6 password = 123456
setting.py rabbitmq_client 技術分享
1 import os,sys
2 
3 BasePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
4 sys.path.insert(0,BasePath)
5 
6 from core import main
7 main.main()
rabbitmq_client 技術分享
  1 #! /usr/bin/env python3
  2 # -*- coding:utf-8 -*-
  3 # Author:Jailly
  4 
  5 import re
  6 import time
  7 import random
  8 import threading
  9 import sys
 10 import os
 11 import pika
 12 
 13 BasePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 14 sys.path.insert(0,BasePath)
 15 
 16 from conf import setting
 17 
 18 username = setting.username
 19 password = setting.password
 20 connection_timeout = setting.connection_timeout
 21 
 22 run_p = re.compile(‘‘‘
 23     ^\s*run\s+  # run
 24     (?P<quote>\‘)?(?P<d_quote>\‘\‘)?  # 單引號 或 雙引號 的前一半
 25     (?P<command>.+)  # command
 26     (?(quote)\‘|)(?(d_quote)\‘\‘|)  # 單引號 或 雙引號 的後一半
 27     \s+--host\s+
 28     (?P<ips>
 29         (((([01]?\d?\d)|2[0-4]\d|25[0-5])\.){3}(([01]?\d?\d)|2[0-4]\d|25[0-5])\s+)*  # ip
 30         ((([01]?\d?\d)|2[0-4]\d|25[0-5])\.){3}(([01]?\d?\d)|2[0-4]\d|25[0-5])\s*  # ip
 31     )$  
 32     ‘‘‘,re.X)  # 匹配run指令的re模式對象
 33 check_p = re.compile(r^\s*check_task\s+(?P<task_id>\d+)\s*$)  # 匹配check_task指令的re模式對象
 34 exit_p = re.compile(r^\s*exit\s*$)  # 匹配exit指令的re模式對象
 35 
 36 task_ids = list(range(65535))  #task_id 的取值範圍
 37 
 38 # tasks是一個保存任務的字典。key為task_id,value是該id對應的RPCClient()對象們組成的列表。創建該變量的目的是為映射 task_id 與其對應的
 39 # RPCClient()對象,方便獲取該 task_id 所對應的結果(該結果即RPCClient()對象的response屬性)
 40 tasks = {}
 41 
 42 # conns是一個保存連接的字典,用以緩存已建立過的連接。key 為ip,value 是一個列表[rc,last_start_time,task_id_list],rc是該ip對應的
 43 # RPCClient()對象,last_start_time是最近一次連接的開始時間,task_id_list 是使用該連接(或該RPCClient()對象)的task_id 所構成的列表。
 44 # 創建該變量的目的是為了緩存連接,當在超時時間(默認5分鐘)內再次請求同一個主機時,不用再次創建連接,節省建立連接的開銷,同時也為定期清理連接保留了
 45 # 連接記錄
 46 conns = {}
 47 
 48 lock = threading.RLock()
 49 
 50 class RPCClient(object):
 51     def __init__(self,host,port,vhost,credentials,conn=None):
 52 
 53         self.host = host
 54         self.conn = conn if conn else pika.BlockingConnection(pika.ConnectionParameters(host,port,vhost,credentials))
 55         self.ch = self.conn.channel()
 56         self.ch.queue_declare(queue=rpc_queue)
 57 
 58         result = self.ch.queue_declare(exclusive=True)
 59         self.callback_queue = result.method.queue
 60 
 61         self.ch.basic_consume(
 62             self.cb,
 63             queue = self.callback_queue
 64         )
 65 
 66 
 67     def cb(self,ch,method,properties,body):
 68         if self.corr_id == properties.correlation_id:
 69             self.response = body
 70 
 71 
 72     def call(self,command,task_id):
 73         self.corr_id = str(task_id)
 74         self.response = None
 75 
 76         self.ch.basic_publish(
 77             exchange = ‘‘,
 78             routing_key = rpc_queue,
 79             properties = pika.BasicProperties(
 80                 reply_to = self.callback_queue,
 81                 correlation_id = self.corr_id
 82             ),
 83             body = command
 84         )
 85 
 86         while self.response is None:
 87             self.conn.process_data_events()
 88 
 89         self.response = \033[1;32m[--- %s ---]\033[0m\n%s\n% (self.host,self.response.decode(utf-8))
 90 
 91 
 92 def create_connection(conns,ips,rcs,task_id):
 93     ‘‘‘
 94     創建連接,並將生成的 RPCClient()對象 加入 rcs 列表
 95     :param conns: 對應 main.py 中的全局變量 conns
 96     :param ips: 本次任務對應的遠程主機的 ip 列表
 97     :param rcs: 對應 main.py 中的 main_interactive() 中的 rcs 變量:該task_id 對應的 RPCClient()對象們 所組成的列表
 98     :param task_id: 本次任務的task_id
 99     :return:
100     ‘‘‘
101 
102     credentials = pika.PlainCredentials(username,password)
103     for ip in ips:
104         if ip in conns:
105             rc = RPCClient(ip,5672,/,credentials,conn=conns[ip][0])
106             rcs.append(rc)
107             conns[ip][1] = time.time()      # 重置“最新連接的開始時間”
108             conns[ip][2].append(task_id)    # 添加跟該連接相關的task_id
109         else:
110             rc = RPCClient(ip,5672,/,credentials)
111             rcs.append(rc)
112             conns[ip] = [rc.conn,time.time(),[task_id,]]
113 
114 
115 def get_result(tasks,task_id):
116     ‘‘‘
117     根據task_id 獲取 RabbitMQ 服務器的返回結果
118     :param tasks: 任務列表,對應同名全局變量
119     :param task_id: 任務id
120     :return:
121     ‘‘‘
122 
123     rcs = tasks[task_id]
124     outcome = ‘‘
125     for rc in rcs:
126         if rc.response is not None:
127             outcome += rc.response
128         else:
129             print(Task %s is handling\nIf the time for handling was too long,plz check the server health or your network conditions%task_id)
130             return False
131     else:
132         print(outcome)
133         print(\033[1;34mTask done,task_id %s has been cleaned up\033[0m%task_id )
134         return True
135 
136 
137 def handle(command,ips,task_id,conns):
138     ‘‘‘
139     建立連接,並處理指令。相當於整合了creat_connection() 和 get_result()
140     :param command: 待處理的指令
141     :param ips: 待連接的主機ip列表
142     :param task_id: 任務id
143     :param conns: 保存連接的列表,對應同名全局變量
144     :return:
145     ‘‘‘
146 
147     # 第一步:建立連接
148     rcs = []  # 與本次 task_id 對應的 RPCClient() 對象們組成的列表
149     try:
150         create_connection(conns, ips, rcs, task_id)
151     except Exception as e:
152         print(\033[1;31mCan not connect with specified host,plz check server health or make sure your network patency\033[m)
153     else:
154         tasks[task_id] = rcs
155 
156         # 第二步:處理指令
157         for rc in rcs:
158             tc = threading.Thread(target=rc.call, args=(command, task_id))
159             tc.setDaemon(True)
160             tc.start()
161 
162 
163 def main_interactive():
164     ‘‘‘
165     主交互邏輯
166     :return:
167     ‘‘‘
168 
169     while 1:
170         cmd = input(>> ).strip()
171 
172         m = run_p.search(cmd)
173         if m:
174             command = m.group(command)
175             ips = m.group(ips).split()
176 
177             task_id = random.choice(task_ids)
178             task_ids.remove(task_id)
179             print(task_id:, task_id)
180 
181             t = threading.Thread(target=handle,args=(command,ips,task_id,conns))
182             t.setDaemon(True)
183             t.start()
184 
185         else:
186             m = check_p.search(cmd)
187             if m:
188                 task_id = m.group(task_id)
189                 if task_id.isdigit():
190                     task_id = int(task_id)
191                     if task_id in tasks:
192                         if get_result(tasks,task_id):
193                             # 取得結果後,釋放task_id,刪除對應的RPCClient() 對象
194                             task_ids.append(task_id)
195                             del tasks[task_id]
196                     else:
197                         print(‘‘‘\033[1;31mTask_id not found.It may because: 
198 1. task_id does not exist
199 2. connection with specified host has not been created.For this,please try again later
200 3. connection with specified host has been cleaned up because of timeout\033[0m‘‘‘)
201                 else:
202                     print(\033[1;31mTask_id must be integer\033[0m)
203 
204             else:
205                 m = exit_p.search(cmd)
206                 if m:
207                     for rc in tasks.values():
208                         rc.conn.close()
209                     exit()
210                 else:
211                     if cmd:
212                         print(\033[1;31mCommand \‘%s\‘ not found\033[0m%cmd.split()[0])
213                     else:
214                         print(Command can not be None)
215 
216 
217 def clean_conns():
218     ‘‘‘
219     超時連接清理。
220     每5分鐘(默認值,可在setting中設置)檢查一次,某連接最近一次建立/聲明距現在5分鐘以上,則從conns列表中刪除,同時刪除其對應的task_id和RPCClient()對象
221     註意 : 如果有正在執行的任務,會因為清除對應的RPCClient()對象,而無法取得結果!!!
222     :return:
223     ‘‘‘
224 
225     while 1:
226         time.sleep(connection_timeout)
227         lock.acquire()
228         for ip in conns.copy():  # 不能在本字典叠代時刪除字典元素,可以通過其軟拷貝間接刪除之
229             if time.time() - conns[ip][1] > connection_timeout:  # 判斷是否超時
230                 for id in conns[ip][2]:
231 
232                     # 清除與該連接相關的所有任務記錄
233                     if id in tasks:
234                         del tasks[id]
235 
236                 del conns[ip]  # 清除連接記錄
237         lock.release()
238 
239 
240 def main():
241     t = threading.Thread(target=clean_conns)
242     t.setDaemon(True)
243     t.start()
244 
245     main_interactive()
246 
247 
248 if __name__ == __main__:
249     main()
main.py 技術分享
1 #! /usr/bin/env python3
2 # -*- coding:utf-8 -*-
3 # Author:Jailly
4 
5 username = jailly
6 password = 123456
7 connection_timeout = 300
setting.py

module05-1-基於RabbitMQ rpc實現的主機管理