1. 程式人生 > >python發送微信

python發送微信

指定 form 配置 text close 當前時間 https method mod

申請企業微信

使用python發送信息到企業微信,同時支持python2與python3環境,需要先申請一個企業微信,然後創建應用,獲取以下三個信息

企業IP、Agentid、Secret

技術分享圖片

網信為創建的應用名稱

技術分享圖片

腳本描述

將以上三個信息替換到腳本中,主要是

class WeiXin(object):部分,其他的輔助性工具類,收集的一些常用腳本可不用關註
#!/usr/bin/env python 
#coding=utf-8
‘‘‘
Created on 2018年2月8日

@author: root
‘‘‘


from datetime import datetime
import sys, os, re, json,socket,time
from subprocess import Popen, PIPE from sys import version_info if version_info.major == 3 and version_info.minor >=3: import urllib.request as urllib2 pyversion = 3 elif version_info.major == 3: pyversion = 3 else: import urllib2 pyversion = 2 try: if version_info.major and version_info.major == 3
: pyversion=3 elif version_info.major and version_info.major == 2: pyversion=2 else: pyversion=2 except Exception as e: pyversion = 2 localpath = os.path.split(os.path.realpath(__file__))[0] class OSCmd(): ‘‘‘ OS Command:直接可調用執行命令的方法,不包括業務邏輯 本腳本為分好層次的項目中抽出出來的方法,歸為3個類,一個是命令類OSCmd,一個是系統檢查類; 為保持代碼統計,命令類OSCmd是項目是調試好的代碼復制過來的,不在此腳本中修改,每次都去項目中取相應的方法 系統檢查邏輯類可以修改
‘‘‘ def __init__(self): ‘‘‘ Constructor ‘‘‘ def exes(self, cmd_shell): ‘‘‘ call shell command ‘‘‘ s = Popen(cmd_shell, shell=True, stdout=PIPE); if pyversion == 3: s = s.encode(encoding="utf-8") return (s.communicate()[0]).strip(\n) def getexesfstval(self, cmd_shell): ‘‘‘ call shell command 比如在通過ps -ef過濾進程號的時候,在eclipse中執行可以返回正確的結果,然後在shell在測試腳本時卻多返回一個數字(比如13742),這裏取第一個數字,舍棄多返回的數字 ‘‘‘ s = Popen(cmd_shell, shell=True, stdout=PIPE); res = (s.communicate()[0]).strip(\n) ress = res.split(\n) return ress[0] def exef(self, filename, args): ‘‘‘ filename : the file is needed to exec as the way like "./filename args" args: list [] for exp: oscmd.exef(/scripts/test/t2.py, [a,b]) ‘‘‘ args.insert(0, ‘‘) if os.path.exists(filename): os.execv(filename, args) else: print(The {0} is not exist.format(filename)) def getLineFromFile(self, targetFile, *param): ‘‘‘ 文件中,返回某行記錄,適合小文件 f:返回首行 l:返回末行 n:返回第n行,n為正整數 默認返回最後一行數據 ‘‘‘ global f try: f = open(targetFile); pnum = len(param); with open(targetFile, r) as f: # 打開文件,適合小文件 lines = f.readlines() # 讀取所有行 first_line = lines[0] # 取第一行 last_line = lines[-1] # 取最後一行 if pnum > 0: if type(param[0]) == type(a) and param[0].lower() == f: return first_line elif type(param[0]) == type(a) and param[0].lower() == l: return last_line else: return lines[int(param[0]) - 1] return last_line finally: f.close(); def timeminustoS(self, t1, t2): t1 = time.localtime(t1) t1 = time.strftime("%Y-%m-%d %H:%M:%S", t1) t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S") # t2=time.localtime(t2) t2 = time.strftime("%Y-%m-%d %H:%M:%S", t2) t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S") return self.total_seconds(t2 - t1) def total_seconds(self, time_delta): ‘‘‘ python 2.6 has not total_seconds method ‘‘‘ return 1.0 * (time_delta.microseconds + (time_delta.seconds + time_delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6 def rmfileFrmNow(self,p_savenum, targetDir, mins, *contain): ‘‘‘ 刪除指定目錄下指定分鐘之前的文件,不刪除目錄,也不遞歸目錄 刪除/backup/rman/backdb目錄下超過30個小時的文件 rmfileFrmNow(/backup/rman/backdb, 30*60) 刪除/backup/rman/backdb目錄下超過30個小時,包含_MEMDB_20字符的文件 rmfileFrmNow(/backup/rman/backdb, 30*60,_MEMDB_20) 刪除/backup/rman/backdb目錄下超過30個小時,同時包含back_full_、_MEMDB_20字符的文件 rmfileFrmNow(/backup/rman/backdb, 30*60,back_full,_MEMDB_20) ‘‘‘ clen = len(contain) defilist = [] if os.path.isdir(targetDir): for fil in os.listdir(targetDir): if clen > 0: for c in contain: if c in str(fil): defilist.append(fil) #排序 defilist = self.get_filist_bytime(defilist) lsz = len(defilist) if lsz > p_savenum: defilist = defilist[0,lsz - p_savenum] if os.path.isdir(targetDir): for fil in os.listdir(targetDir): flag = True if clen > 0: for c in contain: if not c in str(fil): flag = False if flag: fil = os.path.join(targetDir, fil) if os.path.isfile(fil): if self.isBeforeMins(fil, mins): os.remove(fil) def isBeforeMins(self, fil, mins): ‘‘‘ 判斷一個文件的最後修改時間距離當前時間,是否超過指定的分鐘數 ‘‘‘ if os.path.isfile(fil): mtfile = os.path.getmtime(fil) tnow = time.localtime() sec = self.timeminustoS(mtfile, tnow) mms = round(sec / 60) mins = eval(mins) if mms - mins > 0: return True return False def isMorthanSize(self, fil, siz): ‘‘‘ 判斷一個文件是否超過指定的大小,單位為M ‘‘‘ if os.path.isfile(fil): filsiz = os.path.getsize(fil) fsiz = eval(siz)*1024*1024 if filsiz - fsiz > 0: return True return False def rmfilMorthanSize(self, targetDir, siz, *contain): ‘‘‘ 刪除指定目錄下超過指定大小的文件,不刪除目錄,也不遞歸目錄 刪除/backup/rman/backdb目錄下超過2G大小的文件 rmfileFrmNow(/backup/rman/backdb, 2*1024) 刪除/backup/rman/backdb目錄下超過10G大小,包含_MEMDB_20字符的文件 rmfileFrmNow(/backup/rman/backdb, 10*1024,_MEMDB_20) 刪除/backup/rman/backdb目錄下超過3G大小,同時包含back_full_、_MEMDB_20字符的文件 rmfileFrmNow(/backup/rman/backdb, 3*1024,back_full,_MEMDB_20) ‘‘‘ clen = len(contain) if os.path.isdir(targetDir): for fil in os.listdir(targetDir): flag = True if clen > 0: for c in contain: if not c in str(fil): flag = False if flag: fil = os.path.join(targetDir, fil) if os.path.isfile(fil): if self.isMorthanSize(fil, siz): os.remove(fil) def mkdir(self, dr, *mod): # import stat s1 = str(dr) if s1.startswith("~/"): s1 = s1[1:] homedir = os.environ[HOME] s1 = %s%s % (homedir, s1) if not os.path.exists(s1): cmd_shell = mkdir -p %s % (s1) # os.mkdir(dir) 這個命令不識別linux 用戶home目錄“~”符號 self.exes(cmd_shell) p_num = len(mod) chmod = chmod -R 755 %s % (s1) if p_num == 1: chmod = chmod -R %s %s % (mod[0], s1) self.exes(chmod) # os.chmod(dir, mod) else: # os.chmod(dir, stat.S_IRWXU|stat.S_IRGRP|stat.S_IROTH) 該行會拋出異常 TypeError: coercing to Unicode: need string or buffer, builtin_function_or_method found self.exes(chmod) return s1 def mknod(self, filename, *mod): s1 = str(filename) if s1.startswith("~/"): s1 = s1[1:] homedir = os.environ[HOME] s1 = %s%s % (homedir, s1) if not os.path.exists(s1): cmd_shell = touch %s % (s1) self.exes(cmd_shell) p_num = len(mod) chmod = chmod -R 644 %s % (s1) if p_num == 1: chmod = chmod -R %s %s % (mod[0], s1) self.exes(chmod) else: self.exes(chmod) return s1 def get_filist_bytime(self,file_path): dir_list = os.listdir(file_path) if not dir_list: return else: # 註意,這裏使用lambda表達式,將文件按照最後修改時間順序升序排列 # os.path.getmtime() 函數是獲取文件最後修改時間 # os.path.getctime() 函數是獲取文件最後創建時間 dir_list = sorted(dir_list, key=lambda x: os.path.getmtime(os.path.join(file_path, x))) return dir_list def shichaByMin(self, t1, t2): ‘‘‘ 計算以下三種類型之間的時間差 time.time()-浮點型,time.localtime()-struct_time型、datetime.now()-datetime型 ‘‘‘ t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S") t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S") return round(self.total_seconds(t2 - t1) / 60) def total_seconds(self, time_delta): ‘‘‘ python 2.6 has not total_seconds method ‘‘‘ return 1.0 * (time_delta.microseconds + (time_delta.seconds + time_delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6 class Properties: # 腳本默認配置路徑 oscheck_properties = /tmp/.python-eggs/.oscheck.properties file_name = ‘‘ oscmd = OSCmd() def __init__(self, file_name=.oscheck.properties): ‘‘‘ 將配置文件轉化為列表,以列表的讀取方式進行值的替換 ‘‘‘ dr = /tmp/.python-eggs/ if not os.path.exists(dr): # 當目錄以~開頭時該行永為True,但下面的語句會自判斷 self.oscmd.mkdir(dr,777) if not os.path.exists(file_name): # 當目錄以~開頭時該行永為True,但下面的語句會自判斷 file_name = self.oscmd.mknod(file_name,666) # os.mknod(file_name) ~ self.file_name = file_name self.properties = {} try: fopen = open(self.file_name, r) for line in fopen: line = line.strip() if line.find(=) > 0 and not line.startswith(#): strs = line.split(=) self.properties[strs[0].strip()] = strs[1].strip() except Exception as e: raise e else: fopen.close() def has_key(self, key): return key in self.properties def keys(self, key): return self.properties.keys(); def get(self, key, default_value=‘‘): if key in self.properties: return self.properties[key] return default_value def put(self, key, value): self.properties[key] = value self.replace_property(self.file_name, key + =.*, key + = + str(value), True) def putjson(self, key, values): ‘‘‘ 以json的形式存儲prop中的value,可以處理一些特殊字符,比如= ‘‘‘ pj = {key: values} value = json.dumps(pj) self.put(key, value) def getjsonvalue(self, key, default_value=‘‘): ‘‘‘ 以json的形式存儲prop中的value,可以處理一些特殊字符,比如= ‘‘‘ if key in self.properties: value = self.get(key) valuedict = json.loads(value) return valuedict[key] return default_value def toasc(self, ss): asclst = [] for em in bytearray(ss): asclst.append(em) return asclst def asctostr(self, asclst): ss = ‘‘ for em in asclst: ss += str(chr(em)) return ss def putpwd(self, key, value): ‘‘‘ 字符串以ASCII碼存儲 ‘‘‘ asclst = self.toasc(value) self.putjson(key, asclst) def getpwd(self, key): asclst = self.getjsonvalue(key) pwd = self.asctostr(asclst) return pwd def replace_property(self, file_name, from_regex, to_str, append_on_not_exists=True): ‘‘‘ 新寫入數據後,替換文件以永久保存 :param file_name: :param from_regex: :param to_str: :param append_on_not_exists: :return: ‘‘‘ import tempfile tmpfile = tempfile.TemporaryFile() if os.path.exists(file_name): r_open = open(file_name, r) pattern = re.compile(r‘‘ + from_regex) found = None for line in r_open: if pattern.search(line) and not line.strip().startswith(#): found = True line = re.sub(from_regex, to_str, line) if pyversion == 3: line = line.encode(encoding="utf-8") tmpfile.write(line) if not found and append_on_not_exists: to_str = \n + to_str if pyversion == 3: to_str = to_str.encode(encoding="utf-8") tmpfile.write(to_str) r_open.close() tmpfile.seek(0) content = tmpfile.read() if os.path.exists(file_name): os.remove(file_name) w_open = open(file_name, w) if pyversion == 3: content = content.decode(utf-8) w_open.write(content) w_open.close() tmpfile.close() else: print("file %s not found" % (file_name)) def parsePro(file_name=/tmp/.python-eggs/.oscheck.properties): return Properties(file_name) class Log: def __init__(self): ‘‘‘ Constructor ‘‘‘ logfile = os.path.join(localpath, scheck.log) def setlog(self, logfile): self.logfile = logfile def log(self, strs, sp=a): open(self.logfile, sp).write(%s\n % (strs)) def inserttime(self, sp=a): now = datetime.now() strs = now.strftime(%Y-%m-%d %H:%M:%S) oscmd = OSCmd() oscmd.mknod(self.logfile, 666) open(self.logfile, sp).write(%s\n % (strs)) def frmMsg(self,content): ‘‘‘ 格式化信息輸出 :param content: :return: ‘‘‘ curtime = datetime.now().strftime(%Y-%m-%d %H:%M:%S) hname = socket.gethostname() IPADDR = socket.gethostbyname(hname) cnt = "內容:{0} <br/>時間:{1} <br/>信息來自 {2} {3}".format(content,curtime,hname,IPADDR) return cnt class WeiXin(object): ‘‘‘ 發送微信 ‘‘‘ token_url = https://qyapi.weixin.qq.com/cgi-bin/gettoken cropmsg ={ corpid : wwe***2ed*********, corpsecret : Mgyi*****ahx3O-******HkLfg } sendmsg = {} access_token_key="weixin_access_token" time_token_key="weixin_tokenkey_gettime" oscmd = OSCmd() prop = parsePro() log = Log() def __init__(self): ‘‘‘ Constructor ‘‘‘ def formatContent(self,content): cnt=self.log.frmMsg(content) return cnt def setMsg(self,param): arg_num=len(param) content = param[0] content = self.formatContent(content) if pyversion == 2: content = content.decode(utf-8) if arg_num == 1 : touser="@all" agentid="1000009" elif arg_num == 2 : touser=param[1] agentid="1000009" elif arg_num == 3 : touser=param[1] agentid=param[2] self.sendmsg = { "touser":touser, "agentid":agentid, "msgtype": "text", "text":{ "content":content } } def updateToken(self): access_token_response = self.geturl(self.token_url, self.cropmsg) access_token = (json.loads(access_token_response)[access_token]).encode(utf-8) self.prop.put(self.access_token_key, access_token) self.prop.put(self.time_token_key, datetime.now().strftime(%Y-%m-%d %H:%M:%S)) def get_access_token(self): ‘‘‘ 獲取訪問憑證,經過編碼的一串字符串數據 每兩個小時取一次即可 ‘‘‘ if not self.prop.has_key(self.time_token_key): self.updateToken() else: curtime = datetime.now().strftime(%Y-%m-%d %H:%M:%S) oldtime = self.prop.get(self.time_token_key) shicha = self.oscmd.shichaByMin(oldtime,curtime) if shicha > 110: self.updateToken() return self.prop.get(self.access_token_key) def encodeurl(self,dic): ‘‘‘ 將字典轉換為url參數傳值方式,key1=value1&key2=value2 ‘‘‘ data = ‘‘ for k,v in dic.items(): data += %s=%s%s % (k,v,&) return data def geturl(self,url,data): ‘‘‘ data為字典類型的參數, 返回類型<type unicode>,json ‘‘‘ data = self.encodeurl(data) response = urllib2.urlopen(%s?%s % (url,data)) return response.read().decode(utf-8) def posturl(self,url,data,isjson = True): ‘‘‘ 發送json數據 返回類型<type unicode>,json ‘‘‘ if isjson: data = json.dumps(data) #dict if pyversion == 3 : data = data.encode(encoding="utf-8") response = urllib2.urlopen(url,data) return response.read().decode(utf-8) def sampleSend(self,content): self.setMsg(content) # 獲取企業訪問憑證 access_token = self.get_access_token() sendmsg_url = https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s % access_token print (self.posturl(sendmsg_url,self.sendmsg)) def showExample(self): if len(sys.argv) == 4: touser,notuse,content = sys.argv[1:] else: print (error segments, now exit) sys.exit() def send(self,param): ll = Log() arg_num=len(param) if arg_num >=1: self.sampleSend(param) #ll.log(%s%(param)) return ‘‘ def showUsing(self): print (There must be more than 1 params. For example:) print (python sendweixin.py "微信發送信息調試 " ) print (python sendweixin.py "微信發送信息調試 " "微信用戶ID" ) print (註意事項,該腳本尚存在一個問題,首次發送微信時會報access_token missing,第二次無此問題) print (python sendweixin.py "微信發送信息調試 " "微信用戶ID" "企業微信號") print (error segments, now exit) if __name__ == __main__: inp = sys.argv arg_num = len(inp) wxmsg = WeiXin() if arg_num > 1 : res = wxmsg.send(sys.argv[1:]) else: wxmsg.showUsing() sys.exit()

重點為以下兩段代碼,企業與應用的標識

  cropmsg ={
        corpid : wwe***2ed*********,
        corpsecret : Mgyi*****ahx3O-******HkLfg 
        }

如果輸入一個參數,則默認發送給企業微信中可以訪問該應用的所有用戶;第二個參數指定具體的微信號

  if arg_num == 1 :
            touser="@all"
            agentid="1000009"
        elif arg_num == 2 :
            touser=param[1]
            agentid="1000009"
        elif arg_num == 3 :
            touser=param[1]
            agentid=param[2]
            

示例

$ python sendwx.py "陽光、沙灘、海浪、老船長……"
{"errcode":0,"errmsg":"ok","invaliduser":""}

技術分享圖片

python發送微信