1. 程式人生 > >阿里雲 iot平臺 打通記錄!!

阿里雲 iot平臺 打通記錄!!

  • 環境:python2.7    tornado 5.0

 

記錄一規則引擎:

進入iot平臺的規則引擎

按照文件經行建立

進入規則詳情介面

  裡面有處理資料,轉發資料,轉發錯誤資料

1、處理資料:點選編輯,出現彈窗

欄位:可以寫一些函式:https://help.aliyun.com/document_detail/30555.html?spm=a2c4g.11186623.2.19.54b5782bhdgxu8

  也可以寫裝置屬性上報的欄位:https://help.aliyun.com/document_detail/73736.html?spm=a2c4g.11186623.2.13.333e79afmHPMOY

 

2、資料轉發:選擇操作裡選擇mns,其他按在mns的設定

3、轉發錯誤資料:和資料轉發一樣

mns  阿里雲訊息服務

1、安裝官方文件建立 主題

2、因為我要改成tornado的網站,由於我的實力有限,花了大力氣終於搞定了

  記一些我差點繞不過去的坑:

  第一:用

  self.request.body   

  第二:需要多執行緒

  @tornado.web.asynchronous

   第三:M2Crypto安裝    百度吧!!

  第四:簽名驗證:雖然有點偷懶但還是搞定了  

def authenticate(self):
service_str = "\n".join(sorted(["%s:%s" % (k.lower(), v) for k, v in self.request.headers.items() if k.startswith("X-Mns-")]))
sign_header_list = []
for key in ["Content-Md5", "Content-Type", "Date"]:
if key in self.request.headers.keys():
sign_header_list.append(self.request.headers[key])
else:
sign_header_list.append("")
str(self.request.headers).split('\r\n')
str2sign = "POST\n%s\n%s\n%s" % ("\n".join(sign_header_list), service_str, '/notifications')

authorization = self.request.headers['Authorization']
signature = base64.b64decode(authorization)
cert_str = urllib2.urlopen(base64.b64decode(self.request.headers['X-Mns-Signing-Cert-Url'])).read()

pubkey = M2Crypto.X509.load_cert_string(cert_str).get_pubkey()
pubkey.reset_context(md='sha1')
pubkey.verify_init()
pubkey.verify_update(str2sign)
return pubkey.verify_final(signature)

剩下的就基本和官方的案例差不多了

記得建立訂閱指定好網址和埠
完整的程式碼
import tornado.web
import base64
import urllib2
import M2Crypto
import xml.dom.minidom
import socket

class test(tornado.web.RequestHandler):
msg_type = "json"

@tornado.web.asynchronous
def post(self):
self.req_body = self.request.body

self.msg = NotifyMessage()
# print(self.req_body)
if not self.authenticate():
print("403")
elif not self.validateBody(self.req_body, self.msg, self.msg_type):
print('123')
self.write("%s %d %s\r\n" %
('HTTP/1.0', 201, ''))
self.write('201')
self.finish()
def authenticate(self):
service_str = "\n".join(sorted(["%s:%s" % (k.lower(), v) for k, v in self.request.headers.items() if k.startswith("X-Mns-")]))
sign_header_list = []
for key in ["Content-Md5", "Content-Type", "Date"]:
if key in self.request.headers.keys():
sign_header_list.append(self.request.headers[key])
else:
sign_header_list.append("")
str(self.request.headers).split('\r\n')
str2sign = "POST\n%s\n%s\n%s" % ("\n".join(sign_header_list), service_str, '/notifications')

authorization = self.request.headers['Authorization']
signature = base64.b64decode(authorization)
cert_str = urllib2.urlopen(base64.b64decode(self.request.headers['X-Mns-Signing-Cert-Url'])).read()

pubkey = M2Crypto.X509.load_cert_string(cert_str).get_pubkey()
pubkey.reset_context(md='sha1')
pubkey.verify_init()
pubkey.verify_update(str2sign)
return pubkey.verify_final(signature)

def validateBody(self, data, msg, type):
if type == "XML":
return self.xml_decode(data, msg)
else:
msg.message = data
print(self.msg.message)
return True

def xml_decode(self, data, msg):
if data == "":
print("Data is \"\".")
return False
try:
dom = xml.dom.minidom.parseString(data)
except Exception, e:
print("Parse string fail, exception:%s" % e)
return False

node_list = dom.getElementsByTagName("Notification")
if not node_list:
print("Get node of \"Notification\" fail:%s" % e)
return False

data_dic = {}
for node in node_list[0].childNodes:
if node.nodeName != "#text" and node.childNodes != []:
data_dic[node.nodeName] = node.firstChild.toxml().encode('utf-8')

key_list = ["TopicOwner", "TopicName", "Subscriber", "SubscriptionName", "MessageId", "MessageMD5", "Message",
"PublishTime"]
for key in key_list:
if key not in data_dic.keys():
print("Check item fail. Need \"%s\"." % key)
return False

msg.topic_owner = data_dic["TopicOwner"]
msg.topic_name = data_dic["TopicName"]
msg.subscriber = data_dic["Subscriber"]
msg.subscription_name = data_dic["SubscriptionName"]
msg.message_id = data_dic["MessageId"]
msg.message_md5 = data_dic["MessageMD5"]
# msg.message_tag = data_dic["MessageTag"] if data_dic["MessageTag"] else ""
msg.message = data_dic["Message"]
msg.publish_time = data_dic["PublishTime"]
print(self.msg)
return True



class NotifyMessage:
def __init__(self):
self.topic_owner = ""
self.topic_name = ""
self.subscriber = ""
self.subscription_name = ""
self.message_id = ""
self.message_md5 = ""
# self.message_tag = ""
self.message = ""
self.publish_time = -1

def __str__(self):
msg_info = {"TopicOwner" : self.topic_owner,
"TopicName" : self.topic_name,
"Subscriber" : self.subscriber,
"SubscriptionName" : self.subscription_name,
"MessageId" : self.message_id,
"MessageMD5" : self.message_md5,
# "MessageTag" : self.message_tag,
"Message" : self.message,
"PublishTime" : self.publish_time}
return "\n".join(["%s: %s"%(k.ljust(30),v) for k,v in msg_info.items()])
setting={}
application = tornado.web.Application([
(r"/notifications", test),
],**setting)
if __name__ == "__main__":
ip_addr = socket.gethostbyname(socket.gethostname())
addr_info = "Start Endpoint! Address: %s%s:%s" % ("http://", ip_addr, 8080)
print (addr_info)
application.listen(8080)
tornado.ioloop.IOLoop.instance().start()