“日常研究”之動態重新整理 respage01 百度熱力圖
因為這個DashBoard自己會一直開發維護下去,原本會演變成一個《試用 vue-admin-template 寫一個自己的dashboard》+ 1、2、3、、N。但是今天還是把標題改掉了,因為其實重點並不是dashboard本身,真正的重點是我日常想去做的業餘研究,順便研究些華麗的前後端技術。
respage01 再說明
Respage01的起因是老婆大人對周邊各種早教和教育培訓機構春筍般出現的現象感到費解,然後向我提了一個問題,“如果我們入股類似的一個小教育培訓機構,會不會有前途?”。這是一個很難回答的問題,因為我們都沒有在培訓教育行業裡待過,但是既然問題已經出現了,自己也覺得這個問題有些意思,便想要做一個日常研究。Respage01會從巨集觀角度去分析並監測教育培訓機構(在金華地區)相關的資料,資料都來自網際網路。
預加功能
上一次已經留下了一個殼子,類似這樣:

但是從程式碼上也看到了,我是把某一天的座標資料硬編碼到了這個頁面中,所以今天就是要來做成動態,剛好結合《部署Django REST Framework服務(Nginx + uWSGI + Django)》,將介面部署到線上。
既然做了介面化,那我們就可以定時做好每天的資料爬取 -> 資料清洗 -> 資料入庫 -> 介面吐出。那麼頁面便可每天獲取到最新的資料。
問題又來了,既然每天有了最新的資料,那最好是有方式可以看到每天的變化。可能在一定的時間跨度內,可以看到一些明顯的變化,所以我打算做一個數據回看功能,做法可能很簡單: 觸發了某個按鈕後,定時重新整理所有的資料,或者做成一個短視訊形式
在一個較短的時間跨度內,有可能很難從熱力圖上看到變化,所以打算加一個折線圖來標識每天的資料。
部署自動爬取和資料清洗併入庫
目前的爬蟲目錄很簡單:
. ├── config.json ├── log │└── train-2018-09-03.log ├── result │└── train-2018-09-03.txt └── spiker.py 複製程式碼
那完成這個事情定時執行類似這樣的指令碼就可以:
python spiker.py | python format.py | python writeToRedis.py && python redisToMysql.py 複製程式碼
雖然有人會建議可以直接在爬取時完成清洗、去重和入庫的動作,但是我還是喜歡用這種流式的方法來處理,這樣更加清晰,功能也更加解耦。而且瞭解管道的同學可以看出來,這其實就是同時完成了爬取、清洗和入庫動作,只不過是每條資料序列完成了這系列動作。這裡的writeToRedis.py是為了利用redis天然的去重功能,redis的讀寫效能也會讓效率更高些。
ofollow,noindex">修改spiker.py
修改就只有兩個:
- 將原先的查詢關鍵字等配置資訊寫到config.json中,方便各管道節點獲取到統一的資訊
- 在原先寫檔案的地方,直接加個print,將資料標準輸出。
""" 查詢關鍵字:移到config.json """ FileKey = 'train' KeyWord = u"早教$培訓" 複製程式碼
## 設定標準輸出的編碼格式 sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8') for r in res['results']: file.writelines(str(r).strip() + '\n') # 增加標準輸出 print(str(r).strip(), flush=True) 複製程式碼
新增format.py 過濾無用資訊
首先讀取上一級管道的標準輸出作為輸入,使用fileinput便可實現:
def main(): for line in fileinput.input(): format(line) 複製程式碼
分析一條的資料的結構,只保留感興趣的資料(不用擔心丟失,因為在第一個節點上已經儲存了原始資料),並儘量把json結構拉平成只有一級:
{'name': '英倫藝術培訓', 'lat': 29.109614, 'lng': 119.662018, 'address': '解放東路238號福蓮匯8層', 'province': '浙江省', 'city': '金華市', 'area': '婺城區', 'street_id': '15ca1ce6773a95f7a2a9343c', 'detail': 1, 'uid': '15ca1ce6773a95f7a2a9343c', 'detail_info': {'tag': '教育培訓;培訓機構', 'type': 'education', 'detail_url': 'http://api.map.baidu.com/place/detail?uid=15ca1ce6773a95f7a2a9343c&output=html&source=placeapi_v2', 'overall_rating': '0.0', 'children': []}} 複製程式碼
由於該資料一級比較簡單,所以format也只是做了很小的處理,另外,這樣的好處時,不同的資料結構可以寫不同的format就可以。
# coding: utf-8 import fileinput import io import sys import chardet sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8') def format(line): """ :param line: :return: """ result = {} tmp = eval(line.decode('utf-8')) try: result = { "name": str(tmp["name"]), "lat": tmp["location"]["lat"], "lng": tmp["location"]["lng"], "address": str(tmp["address"]), "tag": str(tmp["detail_info"]["tag"]), } # 部分資料可能缺失欄位 if "detail_url" in tmp["detail_info"]: result["detail_url"] = tmp["detail_info"]["detail_url"] else: result["detail_url"] = "" if "overall_rating" in tmp["detail_info"]: result["rate"] = tmp["detail_info"]["overall_rating"] else: result["rate"] = "0.0" print(str(result).strip(), flush=True) except Exception as e: print(e) pass def main(): try: for line in fileinput.input(mode='rb'): format(line) sys.stderr.close() except Exception as e: print(e) pass if __name__ == '__main__': main() 複製程式碼
如果資料量大,可以用類似的方法來除錯:
cat node1.txt | head -n 1 | python format.py 複製程式碼
其實使用python的set也可以完成去重的事情,程式碼中也可以嘗試這樣的操作。關於去重的方式,在不同場景下有各式的方案,我們這屬於簡單場景,因為資料量不大。
系統安裝redis服務,並配置密碼:
sudo apt-get install redis-server 複製程式碼
在虛擬環境下安裝redis庫:
pip install redis vi /etc/redis/redis.conf # 開啟 requirepass 配置項,並後面跟上密碼 requirepass xxxx 複製程式碼
登入測試:
redis-cli -a xxxx 複製程式碼
redis有String、List、Set、Hash、Sort Hash幾種型別,由於我們只是要做去重,那就用Set結構就可以:
train_2018_09_07(key) -> (資料1,資料2 ... 資料n) 複製程式碼
writeToRedis的簡單實現:
# coding: utf-8 import fileinput import redis import time from tool.tool import tool import io import sys sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8') def connectRedis(): re = redis.Redis( host=tool.getRedisHost(), port=tool.getRedisPort(), password=tool.getRedisPass(), decode_responses=True) return re def main(): today = time.strftime("%Y_%m_%d") setName = tool.getFileKey() + "_" + today try: re = connectRedis() for line in fileinput.input(mode='rb'): re.sadd(setName, line.decode('utf-8').strip()) exit(0) except Exception as e: print(e) exit(-1) if __name__ == '__main__': main() 複製程式碼
使用redis的set還有一個好處就是,可以理由set的差集等功能,快速獲取每天發生變化的資料。這個需求打算後面加上
執行:
python spiker.py | python format.py | python writeToRedis.py 複製程式碼
執行後,原始資料檔案條目:
cat train-2018-09-07.txt | wc -l663 複製程式碼
Redis Set 內的條目數:
127.0.0.1:6379> SCARD train_2018_09_07 (integer) 640 複製程式碼
說明確實還是有重複的資料,原因可能是我們使用了10*10小矩形掃描的方式,不可避免地會有交界處重疊的問題。
關於使用了redis後還是否需要Mysql的討論也有很多,大家可以去參與討論。我個人的考慮是Django上可以更好地支援Mysql來做序列化和反序列化,畢竟Mysql的查詢功能也更加舒適一些。
首先從redis中讀出資料的形式,可以使用迭代器的方式,好處是稍微省記憶體些,但是問題是如果單條資料單獨寫入mysql的話,IO上估計也不太合算,所以我使用pandas的DataFrame寫入mysql的方式來分批寫入,使用pandas的好處是字典資料寫入資料操作比一般的mysql庫要簡潔很多。
虛擬環境下安裝pandas:
pip install pandas sqlalchemy 複製程式碼
# coding: utf-8 import redis from tool.tool import tool import time import pandas as pd from sqlalchemy import create_engine import pymysql def connectRedis(): """ 連線Redis :return: redis connect """ re = redis.Redis( host=tool.getRedisHost(), port=tool.getRedisPort(), password=tool.getRedisPass(), decode_responses=True) return re def connectMysql(): """ 連線mysql資料庫 :return: engine connect """ config = tool.getMysqlConfig() engine = create_engine(str(r"mysql+pymysql://%s:%s@%s/%s?charset=utf8") % (config['User'], config['Pass'], config['Host'], config['Name'])) return engine def redisToMysql(re, en): """ :param re: redis connect :param en: mysql engine connect :return: """ today = time.strftime("%Y_%m_%d") tableName = tool.getFileKey() + '_' + today res = [] index = 0 for item in re.sscan_iter(tool.getFileKey() + '_' + today): tmp = eval(item.encode('utf-8').decode('utf-8')) tmp['time'] = today res.append(tmp) index += 1 if index >= 100: df = pd.DataFrame(res) df.to_sql('respage01', con=en, if_exists='append', index=False,) index = 0 res = [] if index != 0: df = pd.DataFrame(res) df.to_sql(name='respage01', con=en, if_exists='append', index=False) # 新增主鍵 # print("xxxxxxxx") # with en.connect() as con: #con.execute("alter table respage01 add COLUMN id INT NOT NULL AUTO_INCREMENTprimary key first") def main(): re = connectRedis() en = connectMysql() redisToMysql(re, en) if __name__ == '__main__': main() 複製程式碼
為了後面django處理方便,我後面臨時加入了一個自增id作為主鍵。方法可以是:
alter table respage01 add COLUMN id INT NOT NULL AUTO_INCREMENTprimary key first; 複製程式碼
編寫相關apis
我們設計兩個api: * 獲取某個時間段內的所有座標資料 * 獲取某個時間段內每天的數量值
model實現
class Respage01Info(models.Model): """ respage 01 相關的資料 """ time = models.CharField(max_length=100) name = models.CharField(max_length=200) address = models.CharField(max_length=500) detail_url = models.URLField(max_length=500) rate = models.FloatField() lat = models.FloatField() lng = models.FloatField() class Meta: # 指定資料表 db_table = "respage01" 複製程式碼
需要注意的是,我們已經擁有了資料庫,並且表裡已經有了資料,所以在執行migrate的時候,需要指明fake掉該專案的資料遷移:
python manage.py migrate --fake rouboapi 複製程式碼
Serializer實現
由於我們的計數介面是需要使用聚合類查詢功能,簡單說,就是需要返回資料庫欄位以外的欄位給客戶端,所以需要使用serializers的Field方法。
class Respage01Serializer(serializers.HyperlinkedModelSerializer): """ 序列化Respage01相關的資料 """ class Meta: model = Respage01Info fields = ('time', 'lat', 'lng', 'name', 'address', 'detail_url', 'rate') class Respage01CountSerializer(serializers.HyperlinkedModelSerializer): """ 序列化計數資料,用於序列化聚合類查詢的結果 """ time = serializers.StringRelatedField() count = serializers.IntegerField() class Meta: model = Respage01Info fields = ('time', 'count') 複製程式碼
view實現
這裡需要用到django的資料庫查詢相關的知識,我們這裡用到了fiter、values、annotate幾個函式,具體的可以參考官方文件,基本用法還是比較簡單。
class Respage01(APIView): """ 獲取respage01相關的資料 """ authentication_classes = [] permission_classes = [] def rangeTime(self, start_time, end_time): """ 獲取時間區間 :param start_time: :param end_time: :return: """ print("------------") dateList = [datetime.strftime(x, "%Y_%m_%d") for x in list(pd.date_range(start=start_time.replace('_',''), end=end_time.replace('_','')))] return dateList def get(self, request, format=None): req = request.query_params if 'type' not in req or 'start_time' not in req or 'end_time' not in req: return Response({}, status=status.HTTP_400_BAD_REQUEST) if req['type'] == 'location': dateList = self.rangeTime(start_time=req['start_time'], end_time=req['end_time']) queryset = Respage01Info.objects.filter(time__in=dateList) serializer = Respage01Serializer(queryset, many=True) elif req['type'] == 'count': dateList = self.rangeTime(start_time=req['start_time'], end_time=req['end_time']) queryset = Respage01Info.objects.filter(time__in=dateList).values('time').annotate(count=Count('id')) serializer = Respage01CountSerializer(queryset, many=True) return Response(serializer.data, status=status.HTTP_200_OK) 複製程式碼
介面上線後發現的異常
在介面上線後測試過程中,發現介面極其不穩定,查了一下發現mysql會異常地退出,查看了日誌發現是記憶體不足導致。
我的vps是1G記憶體的基礎配置,雖然小,但是不至於這麼緊張。通過top【M】排序後驚奇地發現uwsgi開了10個程序,每個程序佔用了7%左右的記憶體。修改uwsgi ini檔案重啟後故障排除(我們這種小服務,兩個程序足夠了)。
# mysite_uwsgi.ini file [uwsgi] # Django-related settings # the base directory (full path) chdir= /data/django/rouboApi # Django's wsgi file module= rouboinfo.wsgi # the virtualenv (full path) home= /data/django/env3 # process-related settings # master master= true # maximum number of worker processes processes= 2 # the socket (use the full path to be safe socket= /data/django/rouboApi/rouboapi.scok # ... with appropriate permissions - may be needed chmod-socket= 666 # clear environment on exit vacuum= true 複製程式碼
修改respage01頁面
roubo’s dashboard 主要是增加了兩個介面請求,並將v-charts的資料動態化。這裡也簡單加了一個“覆盤”按鈕,定時重新整理資料,可以大概看到一些變化。
<template> <div> <div style="height: 100%"> <button @click="onPlay">覆盤</button> <ve-heatmap :data="chartDataMap" :settings="chartSettingsMap" height="600px"/> </div> <div> <ve-line :data="chartDataChart" :settings="chartSettingsChart"/> </div> </div> </template> 複製程式碼
/** * 獲取某個時間區間的位置資訊 * @param start_time * @param end_time */ getLocations: function(start_time, end_time) { this.rouboapis.getRespage01Info('location', start_time, end_time, { success: (res) => { this.chartDataMap.rows = res }, fail: (err) => { console.log(err) } }) }, /** * 獲取某個時間段的統計資料 * @param start_time * @param end_time */ getCount: function(start_time, end_time) { this.rouboapis.getRespage01Info('count', start_time, end_time, { success: (res) => { this.chartDataChart.rows = res } }) }, /** * 點選覆盤按鈕事件 */ onPlay: function() { const dateList = this.getDateList('2018_09_13', this.today('_')) let index = 0 const timer = setInterval(() => { this.getLocations(dateList[index], dateList[index]) this.getCount('2018_09_13', dateList[index]) index = index + 1 if (index >= dateList.length) { clearInterval(timer) return } }, 5000) } 複製程式碼