1. 程式人生 > >嘗試用 Python 寫了個病毒傳播模擬程式

嘗試用 Python 寫了個病毒傳播模擬程式

病毒擴散模擬程式,用 python 也可以。

概述

事情是這樣的,B 站 UP 主 @ele 實驗室,寫了一個簡單的疫情傳播模擬程式,告訴大家在家待著的重要性,視訊相信大家都看過了,並且 UP 主也放出了原始碼。

因為是 Java 開發的,所以開始我並沒有多加關注。後來看到有人解析程式碼,發現我也能看懂,然後就琢磨用 Python 應該怎麼實現。

Java 版程式淺析

一個人就是 1 個(x, y)座標點,並且每個人有一個狀態。

public class Person extends Point {
    private int state = State.NORMAL;
}

在每一輪的迭代中,遍歷每個人,每個人根據自身的狀態,做出一定的動作,包括:

  • 移動
  • 狀態變化
  • 影響他人

這些動作的具體變更,取決於定義的各種係數。

一輪迭代完成,列印這些點,不同的狀態對應不同的顏色。

繪圖部分直接使用的 Java 繪圖類 Graphics。

Python 版思路

如果我們想用 Python 實現應該怎麼做呢?

如果完全復刻 Java 版本,則每次迭代需遍歷所有人,並計算和他人距離,這就是 N^2 次計算。如果是 1000 個人,就需要迴圈 1 百萬次。這個 Python 的效能肯定捉急。

不過 Python 有 numpy ,可以快速的運算元組。結合 matplotlib 則可以畫出圖形。

import numpy as np
import matplotlib.pyplot as plt

如何模擬人群

為了減少函式之間互相傳參和使用全域性變數,我們也來定義一個類:

class People(object):
    def __init__(self, count=1000, first_infected_count=3):
        self.count = count
        self.first_infected_count = first_infected_count
        self.init()

所有人的座標資料就是 N 行 2 列的陣列,同時伴隨一定的狀態:

    def init(self):
        self._people = np.random.normal(0, 100, (self.count, 2))
        self.reset()

狀態值和計時器也都是陣列,同時每次隨機選取指定數量的人感染:

    def reset(self):
        self._round = 0
        self._status = np.array([0] * self.count)
        self._timer = np.array([0] * self.count)
        self.random_people_state(self.first_infected_count, 1)

這裡關鍵的一點是,輔助陣列的大小和人數保持一致,這樣就能形成一一對應的關係。

狀態發生變化的人才順帶記錄時間:

    def random_people_state(self, num, state=1):
        """隨機挑選人設定狀態
        """
        assert self.count > num
        # TODO:極端情況下會出現無限迴圈
        n = 0
        while n < num:
            i = np.random.randint(0, self.count)
            if self._status[i] == state:
                continue
            else:
                self.set_state(i, state)
                n += 1

    def set_state(self, i, state):
        self._status[i] = state
        # 記錄狀態改變的時間
        self._timer[i] = self._round

通過狀態值,就可以過濾出人群,每個人群都是 people 的切片檢視。這裡 numpy 的功能相當強大,只需要非常簡潔的語法即可實現:

    @property
    def healthy(self):
        return self._people[self._status == 0]

    @property
    def infected(self):
        return self._people[self._status == 1]

按照既定的思路,我們先來定義每輪迭代要做的動作:

    def update(self):
        """每一次迭代更新"""
        self.change_state()
        self.affect()
        self.move()
        self._round += 1
        self.report()

順序和開始分析的略有差異,其實並不是十分重要,調換它們的順序也是可以的。

如何改變狀態

這一步就是更新狀態陣列 self._status 和 計時器陣列 self._timer:

    def change_state(self):
        dt = self._round - self._timer
        # 必須先更新時鐘再更新狀態
        d = np.random.randint(3, 5)
        self._timer[(self._status == 1) & ((dt == d) | (dt > 14))] = self._round
        self._status[(self._status == 1) & ((dt == d) | (dt > 14))] += 1

仍然是通過切片過濾出要更改的目標,然後全部更新。

這裡具體的實現我寫的非常簡單,沒有引入太多的變數:

在一定週期內的 感染者(infected),狀態置為 確診(confirmed)。 我這裡簡單假設了確診者就被醫院收治,所以失去了繼續感染他人的機會(見下面)。如果要搞複雜點,可以引入病床,治癒,死亡等狀態。

如何影響他人

影響別人是整個程式的效能瓶頸,因為需要計算每個人之間的距離。

這裡繼續做了簡化,只處理感染者:

    def infect_possible(self, x=0., safe_distance=3.0):
        """按概率感染接近的健康人
        x 的取值參考正態分佈概率表,x=0 時感染概率是 50%
        """
        for inf in self.infected:
            dm = (self._people - inf) ** 2
            d = dm.sum(axis=1) ** 0.5
            sorted_index = d.argsort()
            for i in sorted_index:
                if d[i] >= safe_distance:
                    break  # 超出範圍,不用管了
                if self._status[i] > 0:
                    continue
                if np.random.normal() > x:
                    continue
                self._status[i] = 1
                # 記錄狀態改變的時間
                self._timer[i] = self._round

可以看到,距離的計算仍然是通過 numpy 的矩陣操作。但是需要對每一個感染者單獨計算,所以如果感染者較多,python 的處理效率感人。

如何移動

_people 是一個座標矩陣,只要生成移動距離矩陣 dt,然後它相加即可。我們可以設定一個可移動的範圍 width,把移動距離控制在一定範圍內。

    def move(self, width=1, x=.0):
        movement = self.random_movement(width=width)
        # 限定特定狀態的人員移動
        switch = self.random_switch(x=x)
        movement[switch == 0] = 0
        self._people = self._people + movement

這裡還需要增加一個控制移動意向的選項,仍然是利用了正態分佈概率。考慮到這種場景有可能會重用,所以特地把這個方法提取了出來,生成一個只包含 0 1 的陣列充當開關。

    def random_switch(self, x=0.):
        """隨機生成開關,0 - 關,1 - 開

        x 大致取值範圍 -1.99 - 1.99;
        對應正態分佈的概率, 取值 0 的時候對應概率是 50%
        :param x: 控制開關比例
        :return:
        """
        normal = np.random.normal(0, 1, self.count)
        switch = np.where(normal < x, 1, 0)
        return switch

輸出結果

有了一切資料和變化之後,接下來最重要的事情自然就是圖形化顯示結果了。直接使用 matplotlib 的散點圖就可以了:

    def report(self):
        plt.cla()
        # plt.grid(False)
        p1 = plt.scatter(self.healthy[:, 0], self.healthy[:, 1], s=1)
        p2 = plt.scatter(self.infected[:, 0], self.infected[:, 1], s=1, c='pink')
        p3 = plt.scatter(self.confirmed[:, 0], self.confirmed[:, 1], s=1, c='red')

        plt.legend([p1, p2, p3], ['healthy', 'infected', 'confirmed'], loc='upper right', scatterpoints=1)
        t = "Round: %s, Healthy: %s, Infected: %s, Confirmed: %s" % \
            (self._round, len(self.healthy), len(self.infected), len(self.confirmed))
        plt.text(-200, 400, t, ha='left', wrap=True)

實際效果

啟動。

if __name__ == '__main__':
    np.random.seed(0)
    plt.figure(figsize=(16, 16), dpi=100)
    plt.ion()
    p = People(5000, 3)
    for i in range(100):
        p.update()
        p.report()
        plt.pause(.1)
    plt.pause(3)

因為這個小 demo 主要是個人用來練手,目前一些引數沒有完全抽出來。有需要的只能直接改原始碼。

後記

從多次實驗的結果,通過調整人員的流動意願,流動距離等因素,是可以得到直觀的結論的。

本人也是初次使用 numpymatplotlib,現學現賣,若有使用不當之處請指正。其中的概率引數設定 基本沒有科學依據,僅供 Python 愛好者參考。

總得來說,用 numpy 來模擬病毒感染情況應該是能行得通的。但是其中的影響因子還需要仔細設計。效能也是需要考量的問題。

原始碼地址

願疫情能早日過去,武漢加油,中國加油