1. 程式人生 > >(資料科學學習手札76)基於Python的拐點檢測——以新冠肺炎疫情資料為例

(資料科學學習手札76)基於Python的拐點檢測——以新冠肺炎疫情資料為例

本文對應程式碼、資料及文獻資料已上傳至我的Github倉庫https://github.com/CNFeffery/DataScienceStudyNotes,對程式碼不感興趣的朋友可以直接跳至2.2 探索新冠肺炎疫情資料檢視疫情拐點分析結果

1 簡介

  拐點檢測(Knee point detection),指的是在具有上升或下降趨勢的曲線中,在某一點之後整體趨勢明顯發生變化,這樣的點就稱為拐點(如圖1所示,在藍色標記出的點之後曲線陡然上升):

圖1

  本文就將針對Python中用於拐點檢測的第三方包kneed進行介紹,並以新型冠狀肺炎資料為例,找出各指標數學意義上的拐點。

2 基於kneed的拐點檢測

2.1 kneed基礎

  許多演算法都需要利用肘部法則來確定某些關鍵引數,如K-means中聚類個數kDBSCAN中的搜尋半徑eps等,在面對需要確定所謂肘部,即拐點時,人為通過觀察來確定位置的方式不嚴謹,需要一套有數學原理支撐的檢測方法,Jeannie Albrecht等人在Finding a “Kneedle” in a Haystack: Detecting Knee Points in System Behavior(你可以在文章開頭的Github倉庫中找到)中從曲率的思想出發,針對離散型資料,結合離線、線上的不同應用場景以及Angle-basedMenger Curvature

EWMA等演算法,提出了一套拐點檢測方法,kneed就是對這篇論文所提出演算法的實現。
  使用pip install kneed完成安裝之後,下面我們來了解其主要用法:

2.1.1 KneeLocator

  KneeLocatorkneed中用於檢測拐點的模組,其主要引數如下:

x:待檢測資料對應的橫軸資料序列,如時間點、日期等

y:待檢測資料序列,在x條件下對應的值,如x為星期一,對應的y為降水量

S:float型,預設為1,敏感度引數,越小對應拐點被檢測出得越快

curve:str型,指明曲線之上區域是凸集還是凹集,concave代表凹,convex代表凸

direction:str型,指明曲線初始趨勢是增還是減,increasing

表示增,decreasing表示減
online:bool型,用於設定線上/離線識別模式,True表示線上,False表示離線;線上模式下會沿著x軸從右向左識別出每一個區域性拐點,並在其中選擇最優的拐點;離線模式下會返回從右向左檢測到的第一個區域性拐點

  KneeLocator在傳入引數例項化完成計算後,可返回的我們主要關注的屬性如下:

knee及elbow:返回檢測到的最優拐點對應的x

knee_y及elbow_y:返回檢測到的最優拐點對應的y

all_elbows及all_knees:返回檢測到的所有區域性拐點對應的x

all_elbows_y及all_knees_y:返回檢測到的所有區域性拐點對應的y

  curvedirection引數非常重要,用它們組合出想要識別出的拐點模式,以正弦函式為例,在oonline設定為True時,分別在curve='concave'+direction='increasing'curve='concave'+direction='decreasing'curve='convex'+direction='increasing'curve='convex'+direction='decreasing'引數組合下對同一段餘弦曲線進行拐點計算:

import matplotlib.pyplot as plt
from matplotlib import style
import numpy as np
from kneed import KneeLocator

style.use('seaborn-whitegrid')

x = np.arange(1, 3, 0.01)*np.pi
y = np.cos(x)

# 計算各種引數組合下的拐點
kneedle_cov_inc = KneeLocator(x, 
                      y, 
                      curve='convex', 
                      direction='increasing',
                      online=True)

kneedle_cov_dec = KneeLocator(x, 
                      y, 
                      curve='convex', 
                      direction='decreasing',
                      online=True)

kneedle_con_inc = KneeLocator(x, 
                      y, 
                      curve='concave', 
                      direction='increasing',
                      online=True)

kneedle_con_dec = KneeLocator(x, 
                      y, 
                      curve='concave', 
                      direction='decreasing',
                      online=True)


fig, axe = plt.subplots(2, 2, figsize=[12, 12])

axe[0, 0].plot(x, y, 'k--')
axe[0, 0].annotate(s='Knee Point', xy=(kneedle_cov_inc.knee+0.2, kneedle_cov_inc.knee_y), fontsize=10)
axe[0, 0].scatter(x=kneedle_cov_inc.knee, y=kneedle_cov_inc.knee_y, c='b', s=200, marker='^', alpha=1)
axe[0, 0].set_title('convex+increasing')
axe[0, 0].fill_between(np.arange(1, 1.5, 0.01)*np.pi, np.cos(np.arange(1, 1.5, 0.01)*np.pi), 1, alpha=0.5, color='red')
axe[0, 0].set_ylim(-1, 1)

axe[0, 1].plot(x, y, 'k--')
axe[0, 1].annotate(s='Knee Point', xy=(kneedle_cov_dec.knee+0.2, kneedle_cov_dec.knee_y), fontsize=10)
axe[0, 1].scatter(x=kneedle_cov_dec.knee, y=kneedle_cov_dec.knee_y, c='b', s=200, marker='^', alpha=1)
axe[0, 1].fill_between(np.arange(2.5, 3, 0.01)*np.pi, np.cos(np.arange(2.5, 3, 0.01)*np.pi), 1, alpha=0.5, color='red')
axe[0, 1].set_title('convex+decreasing')
axe[0, 1].set_ylim(-1, 1)

axe[1, 0].plot(x, y, 'k--')
axe[1, 0].annotate(s='Knee Point', xy=(kneedle_con_inc.knee+0.2, kneedle_con_inc.knee_y), fontsize=10)
axe[1, 0].scatter(x=kneedle_con_inc.knee, y=kneedle_con_inc.knee_y, c='b', s=200, marker='^', alpha=1)
axe[1, 0].fill_between(np.arange(1.5, 2, 0.01)*np.pi, np.cos(np.arange(1.5, 2, 0.01)*np.pi), 1, alpha=0.5, color='red')
axe[1, 0].set_title('concave+increasing')
axe[1, 0].set_ylim(-1, 1)

axe[1, 1].plot(x, y, 'k--')
axe[1, 1].annotate(s='Knee Point', xy=(kneedle_con_dec.knee+0.2, kneedle_con_dec.knee_y), fontsize=10)
axe[1, 1].scatter(x=kneedle_con_dec.knee, y=kneedle_con_dec.knee_y, c='b', s=200, marker='^', alpha=1)
axe[1, 1].fill_between(np.arange(2, 2.5, 0.01)*np.pi, np.cos(np.arange(2, 2.5, 0.01)*np.pi), 1, alpha=0.5, color='red')
axe[1, 1].set_title('concave+decreasing')
axe[1, 1].set_ylim(-1, 1)

# 匯出影象
plt.savefig('圖2.png', dpi=300)

  圖中紅色區域分別對應符合引數條件的搜尋區域,藍色三角形為每種引數組合下由kneed檢測到的最優拐點:

圖2

  下面我們擴大餘弦函式中x的範圍,繪製出提取到的所有區域性拐點:

x = np.arange(0, 6, 0.01)*np.pi
y = np.cos(x)

# 計算convex+increasing引數組合下的拐點
kneedle = KneeLocator(x, 
                      y, 
                      curve='convex', 
                      direction='increasing',
                      online=True)

fig, axe = plt.subplots(figsize=[8, 4])

axe.plot(x, y, 'k--')
axe.annotate(s='Knee Point', xy=(kneedle.knee+0.2, kneedle.knee_y), fontsize=10)
axe.set_title('convex+increasing')
axe.fill_between(np.arange(1, 1.5, 0.01)*np.pi, np.cos(np.arange(1, 1.5, 0.01)*np.pi), 1, alpha=0.5, color='red')
axe.fill_between(np.arange(3, 3.5, 0.01)*np.pi, np.cos(np.arange(3, 3.5, 0.01)*np.pi), 1, alpha=0.5, color='red')
axe.fill_between(np.arange(5, 5.5, 0.01)*np.pi, np.cos(np.arange(5, 5.5, 0.01)*np.pi), 1, alpha=0.5, color='red')
axe.scatter(x=list(kneedle.all_knees), y=np.cos(list(kneedle.all_knees)), c='b', s=200, marker='^', alpha=1)
axe.set_ylim(-1, 1)

# 匯出影象
plt.savefig('圖3.png', dpi=300)

  得到的結果如圖3所示,其中注意,在使用kneed檢測拐點時,落在最左或最右的拐點是無效拐點:

圖3

2.2 探索新冠肺炎疫情資料

  接下來我們嘗試將上文介紹的kneed應用到新冠肺炎資料上,來探究各個指標數學意義上的拐點是否已經出現。使用到的原始資料來自https://github.com/BlankerL/DXY-COVID-19-Data ,這個Github倉庫以丁香園資料為資料來源,實時同步更新粒度到城市級別的疫情發展資料,你可以在本文開頭提到的我的Github倉庫對應本文路徑下找到下文使用到的資料,更新時間為2020-02-18 22:55:07,下面開始我們的分析。

  首先我們讀入DXYArea.csv檔案,並檢視其資訊,為了後面方便處理我們在讀入時將updateTime列提前解析為時間格式:

import pandas as pd

raw = pd.read_csv('DXYArea.csv', parse_dates=['updateTime'])
raw.info()
圖4

  檢視其第一行資訊:
  

圖5

  可以看到,原始資料中包含了省、市資訊,以及對應省及市的最新累計確診人數、累計疑似人數、累計治癒人數和累計死亡人數資訊,我們的目的是檢測全國範圍內,累計確診人數、日新增確診人數、治癒率、死亡率隨時間(單位:天)變化下的曲線,是否已經出現數學意義上的拐點(由於武漢市資料變化的複雜性和特殊性,下面的分析只圍繞除武漢市之外的其他地區進行),首先我們對所有市取每天最晚一次更新的資料作為當天正式的記錄值:

# 抽取updateTime列中的年、月、日資訊分別儲存到新列中
raw['year'], raw['month'], raw['day'] = list(zip(*raw['updateTime'].apply(lambda d: (d.year, d.month, d.day))))

# 得到每天每個市最晚一次更新的疫情資料
temp = raw.sort_values(['provinceName', 'cityName', 'year', 'month', 'day', 'updateTime'], 
                ascending=False, 
                ignore_index=True).groupby(['provinceName', 'cityName', 'year', 'month', 'day']) \
                                  .agg({'province_confirmedCount': 'first',
                                        'province_curedCount': 'first',
                                        'province_deadCount': 'first',
                                        'city_confirmedCount': 'first',
                                        'city_curedCount': 'first',
                                        'city_deadCount': 'first'}) \
                                  .reset_index(drop=False)

# 檢視前5行
temp.head()
圖6

  有了上面處理好的資料,接下來我們針對全國(除武漢市外)的相關指標的拐點進行分析。

  首先我們來對截止到今天(2020-2-18)我們關心的指標進行計算並做一個基本的視覺化:

# 計算各指標時序結果
# 全國(除武漢市外)累計確診人數
nationwide_confirmed_count = temp[temp['cityName'] != '武漢'].groupby(['year', 'month', 'day']) \
                                                             .agg({'city_confirmedCount': 'sum'}) \
                                                             .reset_index(drop=False)

# 全國(除武漢市外)累計治癒人數
nationwide_cured_count = temp[temp['cityName'] != '武漢'].groupby(['year', 'month', 'day']) \
                                                         .agg({'city_curedCount': 'sum'}) \
                                                         .reset_index(drop=False)

# 全國(除武漢市外)累計死亡人數
nationwide_dead_count = temp[temp['cityName'] != '武漢'].groupby(['year', 'month', 'day']) \
                                                         .agg({'city_deadCount': 'sum'}) \
                                                         .reset_index(drop=False)

# 全國(除武漢市外)每日新增確診人數,即為nationwide_confirmed_count的一階差分
nationwide_confirmed_inc_count = nationwide_confirmed_count['city_confirmedCount'].diff()[1:]

# 全國(除武漢市外)治癒率
nationwide_cured_ratio = nationwide_cured_count['city_curedCount'] / nationwide_confirmed_count['city_confirmedCount']

# 全國(除武漢市外)死亡率
nationwide_died_ratio = nationwide_dead_count['city_deadCount'] / nationwide_confirmed_count['city_confirmedCount']

# 繪圖

#解決中文顯示問題
plt.rcParams['font.sans-serif'] = ['KaiTi']
plt.rcParams['axes.unicode_minus'] = False

fig, axes = plt.subplots(3, 2, figsize=[12, 18])

axes[0, 0].plot(nationwide_confirmed_count.index, nationwide_confirmed_count['city_confirmedCount'], 'k--')
axes[0, 0].set_title('累計確診人數', fontsize=20)
axes[0, 0].set_xticks(nationwide_confirmed_count.index)
axes[0, 0].set_xticklabels([f"{nationwide_confirmed_count.loc[i, 'month']}-{nationwide_confirmed_count.loc[i, 'day']}"
                            for i in nationwide_confirmed_count.index], rotation=60)

axes[0, 1].plot(nationwide_cured_count.index, nationwide_cured_count['city_curedCount'], 'k--')
axes[0, 1].set_title('累計治癒人數', fontsize=20)
axes[0, 1].set_xticks(nationwide_cured_count.index)
axes[0, 1].set_xticklabels([f"{nationwide_cured_count.loc[i, 'month']}-{nationwide_cured_count.loc[i, 'day']}"
                            for i in nationwide_cured_count.index], rotation=60)

axes[1, 0].plot(nationwide_dead_count.index, nationwide_dead_count['city_deadCount'], 'k--')
axes[1, 0].set_title('累計死亡人數', fontsize=20)
axes[1, 0].set_xticks(nationwide_dead_count.index)
axes[1, 0].set_xticklabels([f"{nationwide_dead_count.loc[i, 'month']}-{nationwide_dead_count.loc[i, 'day']}"
                            for i in nationwide_dead_count.index], rotation=60)

axes[1, 1].plot(nationwide_confirmed_inc_count.index, nationwide_confirmed_inc_count, 'k--')
axes[1, 1].set_title('每日新增確診人數', fontsize=20)
axes[1, 1].set_xticks(nationwide_confirmed_inc_count.index)
axes[1, 1].set_xticklabels([f"{nationwide_confirmed_count.loc[i, 'month']}-{nationwide_confirmed_count.loc[i, 'day']}"
                            for i in nationwide_confirmed_inc_count.index], rotation=60)

axes[2, 0].plot(nationwide_cured_ratio.index, nationwide_cured_ratio, 'k--')
axes[2, 0].set_title('治癒率', fontsize=20)
axes[2, 0].set_xticks(nationwide_cured_ratio.index)
axes[2, 0].set_xticklabels([f"{nationwide_cured_count.loc[i, 'month']}-{nationwide_cured_count.loc[i, 'day']}"
                            for i in nationwide_cured_ratio.index], rotation=60)

axes[2, 1].plot(nationwide_died_ratio.index, nationwide_died_ratio, 'k--')
axes[2, 1].set_title('死亡率', fontsize=20)
axes[2, 1].set_xticks(nationwide_died_ratio.index)
axes[2, 1].set_xticklabels([f"{nationwide_dead_count.loc[i, 'month']}-{nationwide_dead_count.loc[i, 'day']}"
                            for i in nationwide_died_ratio.index], rotation=60)

fig.suptitle('全國範圍(除武漢外)', fontsize=30)

# 匯出影象
plt.savefig('圖7.png', dpi=300)
圖7

  接著就到了檢測拐點的時候了,為了簡化程式碼,我們先編寫自定義函式,用於從KneeLocatorcurvedirection引數的全部組合中搜索合法的拐點輸出值及對應拐點的趨勢變化型別,若無則返回None:

def knee_point_search(x, y):
    
    # 轉為list以支援負號索引
    x, y = x.tolist(), y.tolist()
    output_knees = []
    for curve in ['convex', 'concave']:
        for direction in ['increasing', 'decreasing']:
            model = KneeLocator(x=x, y=y, curve=curve, direction=direction, online=False)
            if model.knee != x[0] and model.knee != x[-1]:
                output_knees.append((model.knee, model.knee_y, curve, direction))
    
    if output_knees.__len__() != 0:
        print('發現拐點!')
        return output_knees
    else:
        print('未發現拐點!')

  下面我們對每個指標進行拐點搜尋,先來看看累計確診數,經過程式的搜尋,並未發現有效拐點:

圖8

  接著檢測累計治癒數,發現了有效拐點:

圖9

  在曲線圖上標記出拐點:

knee_info = knee_point_search(x=nationwide_cured_count.index, 
                  y=nationwide_cured_count['city_curedCount'])
fig, axe = plt.subplots(figsize=[8, 6])
axe.plot(nationwide_cured_count.index, nationwide_cured_count['city_curedCount'], 'k--')
axe.set_title('累計治癒人數', fontsize=20)
axe.set_xticks(nationwide_cured_count.index)
axe.set_xticklabels([f"{nationwide_cured_count.loc[i, 'month']}-{nationwide_cured_count.loc[i, 'day']}"
                            for i in nationwide_cured_count.index], rotation=60)

for point in knee_info:
    axe.scatter(x=point[0], y=point[1], c='b', s=200, marker='^')
    axe.annotate(s=f'{point[2]} {point[3]}', xy=(point[0]+1, point[1]), fontsize=14)

# 匯出影象
plt.savefig('圖10.png', dpi=300)
圖10

  結合其convex+increasing的特點,可以說明從2月5日開始,累計治癒人數有了明顯的加速上升趨勢。

  再來看看累計死亡人數:
  

圖11

  繪製出其拐點:

圖12

  同樣在2月5日開始,累計死亡人數跟累計治癒人數同步,有了較為明顯的加速上升趨勢。
  對於日新增確診數則找到了兩個拐點,雖然這個指標在變化趨勢上看波動較為明顯,但結合其引數資訊還是可以推斷出其在第一個拐點處增速放緩,在第二個拐點出加速下降,說明全國除武漢之外的地區抗疫工作已經有了明顯的成果:

圖13

  治癒率和死亡率同樣出現了拐點,其中治癒率出現加速上升的拐點,伴隨著廣大醫療工作者的辛勤付出,更好的療法加速了治癒率的上升:

圖14

  死亡率雖然最新一次的拐點代表著加速上升,但通過比較其與治癒率的變化幅度比較可以看出,死亡率的絕對增長量十分微弱:

圖15

  通過上面的分析,可以看出在這場針對新冠肺炎的特殊戰役中,到目前為止,除武漢外其他地區已取得階段性的進步,但仍然需要付出更大的努力來鞏固來之不易的改變,相信只要大家都能從自己做起,不給病毒留可趁之機,更加明顯的勝利拐點一定會出現。