1. 程式人生 > >經典卷積神經網絡(LeNet、AlexNet、VGG、GoogleNet、ResNet)的實現(MXNet版本)

經典卷積神經網絡(LeNet、AlexNet、VGG、GoogleNet、ResNet)的實現(MXNet版本)

lns dataset frame outer soft 想法 object googlenet bat

  卷積神經網絡(Convolutional Neural Network, CNN)是一種前饋神經網絡,它的人工神經元可以響應一部分覆蓋範圍內的周圍單元,對於大型圖像處理有出色表現。

  其中 文章 詳解卷積神經網絡(CNN)已經對卷積神經網絡進行了詳細的描述,這裏為了學習MXNet的庫,所以對經典的神經網絡進行實現~加深學習印象,並且為以後的使用打下基礎。其中參考的為Gluon社區提供的學習資料~

1.簡單LeNet的實現

  

def LeNet():
    """
    較早的卷積神經網絡
    :return:
    """
    net = nn.Sequential()
    with net.name_scope():
        net.add(
            nn.Conv2D(channels
=20, kernel_size=5, activation=relu), nn.MaxPool2D(pool_size=2, strides=2), nn.Conv2D(channels=50, kernel_size=3, activation=relu), nn.MaxPool2D(pool_size=2, strides=2), nn.Flatten(), nn.Dense(128, activation="relu"), nn.Dense(
10) ) return net

2. AlexNet:

  由於圖片數據集的擴大和硬件設備的發展,更深層更復雜的神經網絡模型被使用,其中代表為AlexNet,與相對較小的LeNet相比,AlexNet包含8層變換,其中有五層卷積和兩層全連接隱含層,以及一個輸出層。

def AlexNet():
    """
    對leNet的一個擴展,得益於數據集和硬件資源的發展
    :return:
    """
    net = nn.Sequential()
    with net.name_scope():
        net.add(
            
# 第一階段 nn.Conv2D(channels=96, kernel_size=11, strides=4, activation=relu), nn.MaxPool2D(pool_size=3, strides=2), # 第二階段 nn.Conv2D(channels=256, kernel_size=5, padding=2, activation=relu), nn.MaxPool2D(pool_size=3, strides=2), # 第三階段 nn.Conv2D(channels=384, kernel_size=3, padding=1, activation=relu), nn.Conv2D(channels=384, kernel_size=3, padding=1, activation=relu), nn.Conv2D(channels=256, kernel_size=3, padding=1, activation=relu), nn.MaxPool2D(pool_size=3, strides=2), # 第四階段 nn.Flatten(), nn.Dense(4096, activation="relu"), nn.Dropout(.5), # 第五階段 nn.Dense(4096, activation="relu"), nn.Dropout(.5), # 第六階段 nn.Dense(10) ) return net

3. VGGNet:

  考慮到當網絡層數非常多時,一層一層堆疊網絡結構,非常麻煩,VGG使用了編程語言自帶的便利,采用了函數和循環的方式,復制了網絡結構裏面的大量重復結構,因此可以很緊湊來構造這些網絡。而第一個使用這種結構的深度網絡是VGG。

def VGGNet(architecture):
    """
    通過引入了函數和循環的方式,可以快速創建任意層數的神經網絡
    :return:
    """
    def vgg_block(num_convs, channals):
        """
        定義一個網絡的基本結構,由若幹卷積層和一個池化層構成
        VGG的一個關鍵是使用很多有著相對小的kernel(3×3)的卷積層然後接上一個池化層,之後再將這個模塊重復多次。因此先定義一個這樣的塊:
        :param num_convs: 卷積層的層數
        :param channals: 通道數
        :return:
        """
        net = nn.Sequential()
        for _ in range(num_convs):
            net.add(nn.Conv2D(channels=channals, kernel_size=3, padding=1, activation=relu))
        net.add(nn.MaxPool2D(pool_size=2, strides=2))
        return net

    def vgg_stack(architecture):
        """
        定義所有卷積層的網絡結構,通過參數將定義的網絡結構封裝起來
        :param architecture: 指定的網絡結構參數
        :return:
        """
        net = nn.Sequential()
        for (num_convs, channals) in architecture:
            net.add(vgg_block(num_convs, channals))
        return net

    # 在卷積層之後,采用了兩個全連接層,然後使用輸出層輸出結果。
    net = nn.Sequential()
    with net.name_scope():
        net.add(
            vgg_stack(architecture),
            nn.Flatten(),
            nn.Dense(4096, activation=relu),
            nn.Dropout(0.5),
            nn.Dense(4096, activation=relu),
            nn.Dropout(0.5),
            nn.Dense(10)
        )
    return net

4. NiNNet:

  註意到卷積神經網絡一般分成兩塊,一塊主要由卷積層構成,另一塊主要是全連接層。在Alexnet裏我們看到如何把卷積層塊和全連接層分別加深加寬從而得到深度網絡。另外一個自然的想法是,我們可以串聯數個卷積層塊和全連接層塊來構建深度網絡。

  不過這裏的一個難題是,卷積的輸入輸出是4D矩陣,然而全連接是2D。同時在卷積神經網絡裏我們提到如果把4D矩陣轉成2D做全連接,這個會導致全連接層有過多的參數。NiN提出只對通道層做全連接並且像素之間共享權重來解決上述兩個問題。就是說,我們使用kernel大小是1×1的卷積。

技術分享圖片

def NiNNet():
    """
    通過串聯多個卷積層和全連接層
    :return:
    """

    def mlpconv(channels, kernel_size, padding, strides=1, max_pooling=True):
        """
        通過構造一個正常的卷積層,和兩個kernel=1的卷積層(功能相當於全連接層)構造
        :param channels:
        :param kernel_size:
        :param padding:
        :param strides:
        :param max_pooling:
        :return:
        """
        net = nn.Sequential()
        net.add(
            nn.Conv2D(channels=channels, kernel_size=kernel_size, strides=strides, padding=padding, activation=relu),
            nn.Conv2D(channels=channels, kernel_size=1, padding=0, strides=1, activation=relu),
            nn.Conv2D(channels=channels, kernel_size=1, padding=0, strides=1, activation=relu))
        if max_pooling:
            net.add(nn.MaxPool2D(pool_size=3, strides=2))
        return net

    """
    除了使用了1×1卷積外,NiN在最後不是使用全連接,而是使用通道數為輸出類別個數的mlpconv,外接一個平均池化層來將每個通道裏的數值平均成一個標量。
    """
    net = nn.Sequential()
    with net.name_scope():
        net.add(
            mlpconv(96, 11, 0, strides=4),
            mlpconv(256, 5, 2),
            mlpconv(384, 3, 1),
            nn.Dropout(0.5),
            # 目標類為10類
            mlpconv(10, 3, 1, max_pooling=False),
            # 輸入為 batch_size x 10 x 5 x 5, 通過AvgPool2D轉成 batch_size x 10 x 1 x 1。
            # 使用全局池化可以避免估算pool_size大小
            nn.GlobalAvgPool2D(),
            # 轉成 batch_size x 10
            nn.Flatten()
        )
    return net

5. GoogLeNet:

  2014年,Google使用了更加復雜的網絡模型,其中包括了網絡的串聯和並聯,如下圖技術分享圖片

技術分享圖片

  可以看到其中有多個四個並行卷積層的塊。這個塊一般叫做Inception,其基於Network in network的思想做了很大的改進。GoogleNet加入了更加結構化的Inception塊來使得我們可以使用更大的通道,更多的層,同時控制計算量和模型大小在合理範圍內。

def GoogLeNet(num_class):
    """
    GoogLeNet加入了更加結構化的Inception塊來使得我們可以使用更大的通道,更多的層,同時控制計算量和模型大小在合理範圍內。
    :return:
    """

    class GoogleNet(nn.Block):
        """
        通過串聯Inception來構造深層網絡結構
        """
        def __init__(self, num_classes, verbose=False, **kwargs):
            super(GoogleNet, self).__init__(**kwargs)
            self.verbose = verbose
            # add name_scope on the outer most Sequential
            with self.name_scope():
                # block 1
                b1 = nn.Sequential()
                b1.add(
                    nn.Conv2D(64, kernel_size=7, strides=2,
                              padding=3, activation=relu),
                    nn.MaxPool2D(pool_size=3, strides=2)
                )
                # block 2
                b2 = nn.Sequential()
                b2.add(
                    nn.Conv2D(64, kernel_size=1),
                    nn.Conv2D(192, kernel_size=3, padding=1),
                    nn.MaxPool2D(pool_size=3, strides=2)
                )

                # block 3
                b3 = nn.Sequential()
                b3.add(
                    Inception(64, 96, 128, 16, 32, 32),
                    Inception(128, 128, 192, 32, 96, 64),
                    nn.MaxPool2D(pool_size=3, strides=2)
                )

                # block 4
                b4 = nn.Sequential()
                b4.add(
                    Inception(192, 96, 208, 16, 48, 64),
                    Inception(160, 112, 224, 24, 64, 64),
                    Inception(128, 128, 256, 24, 64, 64),
                    Inception(112, 144, 288, 32, 64, 64),
                    Inception(256, 160, 320, 32, 128, 128),
                    nn.MaxPool2D(pool_size=3, strides=2)
                )

                # block 5
                b5 = nn.Sequential()
                b5.add(
                    Inception(256, 160, 320, 32, 128, 128),
                    Inception(384, 192, 384, 48, 128, 128),
                    nn.AvgPool2D(pool_size=2)
                )
                # block 6
                b6 = nn.Sequential()
                b6.add(
                    nn.Flatten(),
                    nn.Dense(num_classes)
                )
                # chain blocks together
                self.net = nn.Sequential()
                self.net.add(b1, b2, b3, b4, b5, b6)

        def forward(self, x):
            out = x
            for i, b in enumerate(self.net):
                out = b(out)
                if self.verbose:
                    print(Block %d output: %s % (i + 1, out.shape))
            return out

    class Inception(nn.Block):
        """
        網絡結構的並聯單元
        """
        def __init__(self, n1_1, n2_1, n2_3, n3_1, n3_5, n4_1, **kwargs):
            super(Inception, self).__init__(**kwargs)

            # path1
            self.p1_convs_1 = nn.Conv2D(n1_1, kernel_size=1, activation=relu)
            # path2
            self.p2_convs_1 = nn.Conv2D(n2_1, kernel_size=1, activation=relu)
            # path2
            self.p2_convs_3 = nn.Conv2D(n2_3, kernel_size=1, activation=relu)
            # path3
            self.p3_convs_1 = nn.Conv2D(n3_1, kernel_size=1, activation=relu)
            self.p3_convs_5 = nn.Conv2D(n3_5, kernel_size=1, activation=relu)
            # path4
            self.p4_pool_3 = nn.MaxPool2D(pool_size=3, padding=1, strides=1)
            self.p4_convs_1 = nn.Conv2D(n4_1, kernel_size=1, activation=relu)

        def forward(self, x):
            p1 = self.p1_convs_1(x)
            p2 = self.p2_convs_3(self.p2_convs_1(x))
            p3 = self.p3_convs_5(self.p3_convs_1(x))
            p4 = self.p4_convs_1(self.p4_pool_3(x))
            return nd.concat(p1, p2, p3, p4, dim=1)

    net = GoogleNet(num_class)
    return net

6. ResNet:

  ResNet有效的解決了深度卷積神經網絡難訓練的問題。這是因為在誤差反傳的過程中,梯度通常變得越來越小,從而權重的更新量也變小。這個導致遠離損失函數的層訓練緩慢,隨著層數的增加這個現象更加明顯。之前有兩種常用方案來嘗試解決這個問題:

  1. 按層訓練。先訓練靠近數據的層,然後慢慢的增加後面的層。但效果不是特別好,而且比較麻煩。
  2. 使用更寬的層(增加輸出通道)而不是更深來增加模型復雜度。但更寬的模型經常不如更深的效果好。

  ResNet通過增加跨層的連接來解決梯度逐層回傳時變小的問題。雖然這個想法之前就提出過了,但ResNet真正的把效果做好了。

技術分享圖片

  最底下那層的輸入不僅僅是輸出給了中間層,而且其與中間層結果相加進入最上層。這樣在梯度反傳時,最上層梯度可以直接跳過中間層傳到最下層,從而避免最下層梯度過小情況。

  為什麽叫做殘差網絡呢?我們可以將上面示意圖裏的結構拆成兩個網絡的和,一個一層,一個兩層,最下面層是共享的。

技術分享圖片

  在訓練過程中,左邊的網絡因為更簡單所以更容易訓練。這個小網絡沒有擬合到的部分,或者說殘差,則被右邊的網絡抓取住。所以直觀上來說,即使加深網絡,跨層連接仍然可以使得底層網絡可以充分的訓練,從而不會讓訓練更難。 

  ResNet沿用了VGG的那種全用3×33×3卷積,但在卷積和池化層之間加入了批量歸一層來加速訓練。每次跨層連接跨過兩層卷積。這裏我們定義一個這樣的殘差塊(Residual塊)。註意到如果輸入的通道數和輸出不一樣時(same_shape=False),我們使用一個額外的1×1卷積來做通道變化,同時使用strides=2來把長寬減半。

def ResNet(num_classes):
    """
    深度殘差網絡,通過增加跨層的連接來解決梯度逐層回傳時變小的問題。雖然這個想法之前就提出過了,但ResNet真正的把效果做好了。
    :return:
    """

    class Residual(nn.Block):
        """
        構造擴層連接,ResNet沿用了VGG的那種全用3×3卷積,但在卷積和池化層之間加入了批量歸一層來加速訓練。
        每次跨層連接跨過兩層卷積。這裏我們定義一個這樣的殘差塊。註意到如果輸入的通道數和輸出不一樣時(same_shape=False),
        我們使用一個額外的1×1卷積來做通道變化,同時使用strides=2來把長寬減半。
        """
        def __init__(self, channels, same_shape=True, **kwargs):
            super(Residual, self).__init__(**kwargs)
            self.same_shape = same_shape
            strides = 1 if same_shape else 2
            self.conv1 = nn.Conv2D(channels, kernel_size=3, padding=1, strides=strides)
            self.bn1 = nn.BatchNorm()
            self.conv2 = nn.Conv2D(channels, kernel_size=3, padding=1)
            self.bn2 = nn.BatchNorm()
            if not same_shape:
                self.conv3 = nn.Conv2D(channels, kernel_size=1, strides=strides)

        def forward(self, x):
            out = nd.relu(self.bn1(self.conv1(x)))
            out = self.bn2(self.conv2(out))
            if not self.same_shape:
                x = self.conv3(x)
            return nd.relu(out + x)

    class ResNet(nn.Block):
        """
        類似GoogLeNet主體是由Inception塊串聯而成,ResNet的主體部分串聯多個Residual塊。
        另外註意到一點是,這裏我們沒用池化層來減小數據長寬,而是通過有通道變化的Residual塊裏面的使用strides=2的卷積層。
        """
        def __init__(self, num_classes, verbose=False, **kwargs):
            super(ResNet, self).__init__(**kwargs)
            self.verbose = verbose
            # add name_scope on the outermost Sequential
            with self.name_scope():
                # block 1
                b1 = nn.Conv2D(64, kernel_size=7, strides=2)
                # block 2
                b2 = nn.Sequential()
                b2.add(
                    nn.MaxPool2D(pool_size=3, strides=2),
                    Residual(64),
                    Residual(64)
                )
                # block 3
                b3 = nn.Sequential()
                b3.add(
                    Residual(128, same_shape=False),
                    Residual(128)
                )
                # block 4
                b4 = nn.Sequential()
                b4.add(
                    Residual(256, same_shape=False),
                    Residual(256)
                )
                # block 5
                b5 = nn.Sequential()
                b5.add(
                    Residual(512, same_shape=False),
                    Residual(512)
                )
                # block 6
                b6 = nn.Sequential()
                b6.add(
                    nn.AvgPool2D(pool_size=3),
                    nn.Dense(num_classes)
                )
                # chain all blocks together
                self.net = nn.Sequential()
                self.net.add(b1, b2, b3, b4, b5, b6)

        def forward(self, x):
            out = x
            for i, b in enumerate(self.net):
                out = b(out)
                if self.verbose:
                    print(Block %d output: %s % (i + 1, out.shape))
            return out
    net = ResNet(num_classes)
    return net

我們使用測試用例來對經典卷積神經網絡進行測試

def do_exp():
    # 初始化
    ctx = utils.try_gpu()

    # 獲取數據
    # batch_size = 256
    train_data, test_data = utils.load_data_fashion_mnist(batch_size=64, resize=224)

    # net = LeNet()
    # net = AlexNet()

    # architecture = ((2, 64), (2, 128), (2, 256), (2, 512), (2, 512))
    # net = VGGNet(architecture)

    # net = NiNNet()
    # net = GoogLeNet(10)
    net = ResNet(10)
    net.initialize(ctx=ctx, init=init.Xavier())
    print(initialize weight on, ctx)

    # 訓練
    loss = gluon.loss.SoftmaxCrossEntropyLoss()
    trainer = gluon.Trainer(net.collect_params(), sgd, {learning_rate: 0.01})
    utils.train(train_data, test_data, net, loss, trainer, ctx, num_epochs=1)


if __name__ == __main__:
    do_exp()

使用到的其他函數還有:

技術分享圖片
class DataLoader(object):
    """similiar to gluon.data.DataLoader, but might be faster.

    The main difference this data loader tries to read more exmaples each
    time. But the limits are 1) all examples in dataset have the same shape, 2)
    data transfomer needs to process multiple examples at each time
    """
    def __init__(self, dataset, batch_size, shuffle, transform=None):
        self.dataset = dataset
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.transform = transform

    def __iter__(self):
        data = self.dataset[:]
        X = data[0]
        y = nd.array(data[1])
        n = X.shape[0]
        # 順序打亂
        if self.shuffle:
            idx = np.arange(n)
            np.random.shuffle(idx)
            X = nd.array(X.asnumpy()[idx])
            y = nd.array(y.asnumpy()[idx])

        for i in range(n//self.batch_size):
            if self.transform is not None:
                yield self.transform(X[i*self.batch_size:(i+1)*self.batch_size], 
                                     y[i*self.batch_size:(i+1)*self.batch_size])
            else:
                yield (X[i*self.batch_size:(i+1)*self.batch_size],
                       y[i*self.batch_size:(i+1)*self.batch_size])

    def __len__(self):
        return len(self.dataset)//self.batch_size


def load_data_fashion_mnist(batch_size, resize=None, root="~/.mxnet/datasets/fashion-mnist"):
    """download the fashion mnist dataest and then load into memory"""
    def transform_mnist(data, label):
        # Transform a batch of examples.
        if resize:
            n = data.shape[0]
            new_data = nd.zeros((n, resize, resize, data.shape[3]))
            for i in range(n):
                new_data[i] = image.imresize(data[i], resize, resize)
            data = new_data
        # change data from batch x height x width x channel to batch x channel x height x width
        return nd.transpose(data.astype(float32), (0,3,1,2))/255, label.astype(float32)

    mnist_train = gluon.data.vision.FashionMNIST(root=root, train=True, transform=None)
    mnist_test = gluon.data.vision.FashionMNIST(root=root, train=False, transform=None)
    # Transform later to avoid memory explosion. 
    train_data = DataLoader(mnist_train, batch_size, shuffle=True, transform=transform_mnist)
    test_data = DataLoader(mnist_test, batch_size, shuffle=False, transform=transform_mnist)
    return train_data, test_data


def try_gpu():
    """If GPU is available, return mx.gpu(0); else return mx.cpu()"""
    try:
        ctx = mx.gpu()
        _ = nd.array([0], ctx=ctx)
    except:
        ctx = mx.cpu()
    return ctx

def _get_batch(batch, ctx):
    """return data and label on ctx"""
    if isinstance(batch, mx.io.DataBatch):
        data = batch.data[0]
        label = batch.label[0]
    else:
        data, label = batch
    return (gluon.utils.split_and_load(data, ctx),
            gluon.utils.split_and_load(label, ctx),
            data.shape[0])

def train(train_data, test_data, net, loss, trainer, ctx, num_epochs, print_batches=None):
    """Train a network"""
    print("Start training on ", ctx)
    if isinstance(ctx, mx.Context):
        ctx = [ctx]
    for epoch in range(num_epochs):
        train_loss, train_acc, n, m = 0.0, 0.0, 0.0, 0.0
        if isinstance(train_data, mx.io.MXDataIter):
            train_data.reset()
        start = time()
        for i, batch in enumerate(train_data):
            data, label, batch_size = _get_batch(batch, ctx)
            losses = []
            with autograd.record():
                outputs = [net(X) for X in data]
                losses = [loss(yhat, y) for yhat, y in zip(outputs, label)]
            for l in losses:
                l.backward()
            train_acc += sum([(yhat.argmax(axis=1)==y).sum().asscalar()
                              for yhat, y in zip(outputs, label)])
            train_loss += sum([l.sum().asscalar() for l in losses])
            trainer.step(batch_size)
            n += batch_size
            m += sum([y.size for y in label])
            if print_batches and (i+1) % print_batches == 0:
                print("Batch %d. Loss: %f, Train acc %f" % (
                    n, train_loss/n, train_acc/m
                ))

        test_acc = evaluate_accuracy(test_data, net, ctx)
        print("Epoch %d. Loss: %.3f, Train acc %.2f, Test acc %.2f, Time %.1f sec" % (
            epoch, train_loss/n, train_acc/m, test_acc, time() - start
        ))
View Code

經典卷積神經網絡(LeNet、AlexNet、VGG、GoogleNet、ResNet)的實現(MXNet版本)