1. 程式人生 > >基於PySpark的網路服務異常檢測系統 (四) Mysql與SparkSQL對接同步資料 kmeans演算法計算預測異常

基於PySpark的網路服務異常檢測系統 (四) Mysql與SparkSQL對接同步資料 kmeans演算法計算預測異常

def get_current_timestamp(): 2 """ 3 獲取當前時間戳 4 :return: 5 """ 6 return int(time.time()) * 1000 7 8 9 def convert_datetime_to_timestamp(dtime): 10 """ 11 把datetime轉換為時間戳 12 :param datetime: 13 :return: 14 """ 15 timestamp = time.mktime(dtime.timetuple())
16 return int(timestamp) * 1000 17 18 19 def get_cache_cat_data(start_time, end_time, force=False): 20 """ 21 獲取指定時間段的cat資料 22 :param start_time: 23 :param end_time: 24 :return: 25 """ 26 key = 'GET_CAT_RES_DATA_{0}_TO_{1}'.format( 27 start_time, end_time 28 )
29 content = cache.get(key) 30 if force or not content: 31 content = get_cat_res_data(start_time, end_time) 32 if content: 33 cache.set(key, content, timeout=CACHE_TIMEOUT_DEFAULT) 34 35 return content 36 37 38 def add_normal_cat_data(data): 39 """ 40 構建資料model 用yield每次返回1000條資料
41 :param data 42 :return: 43 """ 44 tmp_cat_normal_models = [] 45 46 for cat_data in data: 47 response_time = cat_data.get('response_time') 48 request_count = cat_data.get('request_count') or 1 49 fail_count = cat_data.get('fail_count') or 1 50 cat_data['id'] = str(uuid4()) 51 if response_time < 1.2 and (fail_count / request_count) < 0.2: 52 cat_obj = CatNormalResource( 53 **cat_data 54 ) 55 tmp_cat_normal_models.append(cat_obj) 56 57 if len(tmp_cat_normal_models) >= 1000: 58 yield tmp_cat_normal_models 59 tmp_cat_normal_models = [] 60 61 yield tmp_cat_normal_models 62 63 64 @celery_app.task 65 def insert_normal_cat_data(data): 66 """ 67 使用非同步,每次用bulk 批量插入 1000條資料 68 :param data: 69 :return: 70 """ 71 try: 72 for i in add_normal_cat_data(data): 73 CatNormalResource.objects.bulk_create(i) 74 except Exception as e: 75 print(e) 76 raise RsError('插入資料庫失敗') 77 78 79 def insert_normal_cat_job(): 80 """ 81 定時匯入前一天的正常資料 82 :return: 83 """ 84 logger.info('insert_normal_cat_job ....') 85 dt_time = datetime.datetime.now() + datetime.timedelta(days=-1) 86 start_time = convert_datetime_to_timestamp(dt_time) 87 end_time = get_current_timestamp() 88 data = get_cache_cat_data(start_time, end_time) 89 insert_normal_cat_data.delay(data)