1. 程式人生 > >對抗神經網路學習(六)——BEGAN實現不同人臉的生成(tensorflow實現)

對抗神經網路學習(六)——BEGAN實現不同人臉的生成(tensorflow實現)

一、背景

BEGAN,即邊界平衡GAN(Boundary Equilibrium GAN),是DavidBerthelot等人[1]於2017年03月提出的一種方法。傳統的GAN是利用判別器去評估生成器生成的圖片和真實圖片的資料分佈是否一致,而BEGAN則代替了這種概率估計的方法,作者認為只要分佈之間的誤差分佈相近的話,那就可以認為這些分佈相近。同時作者又對網路結構進行了改進,並取得了比較好的實驗效果。

本實驗基於CelebA資料[2],利用BEGAN生成不同的人臉,參考程式碼[3]並做改進,用盡可能少的程式碼實現該過程(關於資料集和程式碼實現過程,後面會詳細介紹)。

[1]文章連結:

https://arxiv.org/abs/1703.10717

[2]CelebA資料集:http://mmlab.ie.cuhk.edu.hk/projects/CelebA.html

[3]主要參考程式碼:https://github.com/JorgeCeja/began-tensorflow

二、BEGAN原理

BEGAN網上的解讀也比較多,這裡不再過多介紹,推薦一篇比較好的解讀文章:

[4]BEGAN解讀

對於BEGAN論文[1],作者也很清楚的介紹了他的貢獻:

In this paper, we make the following contributions: 

• A GAN with a simple yet robust architecture, standard training procedure with fast and stable convergence.(更為魯棒的GAN,更快速穩定收斂)

• An equilibrium concept that balances the power of the discriminator against the generator. (一種判別器於生成器的平衡概念)

• A new way to control the trade-off between image diversity and visual quality. (影象多樣性和生成質量的控制)

• An approximate measure of convergence. To our knowledge the only other published measure is from Wasserstein GAN [1] (WGAN), which will be discussed in the next section.(關於收斂的近似評估的討論,當然作者的靈感來自於WGAN)

作者在文章中也提到了關於BEGAN的模型結構:判別器採用自編碼器(auto-encoder),使用Wasserstein距離來匹配自編碼器的loss分佈,添加了一個判別器和生成器之間的平衡項。

We use an auto-encoder as a discriminator as was first proposed in EBGAN [21]. While typical GANs try to match data distributions directly, our method aims to match auto-encoder loss distributions using a loss derived from the Wasserstein distance. This is done using a typical GAN objective with the addition of an equilibrium term to balance the discriminator and the generator. Our method has an easier training procedure and uses a simpler neural network architecture compared to typical GAN techniques.

作者給出了網路結構為:

當然,BEGAN的詳細解讀可以參考[4]。關於BEGAN的實現,網上的程式碼不太多,而且github上排名前幾的程式碼量都非常大,對於閱讀理解不太友好。對於網上的程式碼,我主要參考了[3],並對其中多餘的部分進行刪減和簡單修改,另外再推薦幾個比較適合閱讀的程式碼,供大家參考:

[5]https://github.com/artcg/BEGAN

[6]https://github.com/Heumi/BEGAN-tensorflow

[7]https://github.com/k920049/BEGAN/blob/master/model/BEGAN.py

本實現的目的在於利用BEGAN生成不同的人臉,雖然之前的文章:對抗神經網路學習(二)——DCGAN生成Cage人臉影象(tensorflow實現),也能生成人臉,但是區別在於DCGAN的實驗只能生成一種人臉,而BEGAN是可以生成不同的人臉。實驗基於CelebA資料集,用盡可能少的程式碼實現BEGAN。

三、BEGAN實現

1.檔案結構

所有的檔案結構如下:

-- main.py                          (主要執行檔案)
-- model.py                         (BEGAN模型檔案)
-- utils.py                         (相關函式檔案)
-- data                             (訓練資料資料夾)
    |------ img_align_celeba_png
                |------ image01.png
                |------ image02.png
                |------ ...

2.資料集介紹

本實驗所使用的資料集是CelebA,該資料集的官方地址為:http://mmlab.ie.cuhk.edu.hk/projects/CelebA.html,直接開啟上述網頁,可以看到CelebA是一個人臉資料集:

官網也給出了該資料集的介紹:

CelebFaces Attributes Dataset (CelebA) is a large-scale face attributes dataset with more than 200K celebrity images, each with 40 attribute annotations. The images in this dataset cover large pose variations and background clutter. CelebA has large diversities, large quantities, and rich annotations, including

  • 10,177 number of identities,

  • 202,599 number of face images, and

  • 5 landmark locations, 40 binary attributes annotations per image.

可以得知,該資料集採集了10177個樣本,共有資料202599張人臉影像,這裡我們需要下載相關人臉資料,從官網地址向下拉,看到下面的介面,選擇Align&Cropped Images進行下載:

當然,如果該下載連結無法使用的話,可以點選下面的百度雲盤下載,這裡同樣也給出下載地址:https://pan.baidu.com/s/1eSNpdRG#list/path=%2FCelebA。下載好資料集之後,對其進行解壓,並在根目錄下建立data資料夾,將解壓好的資料放置在路徑'./data/img_align_celeba_png/*.png'。

整理好資料集之後,我們可以開啟看看資料集,裡面確實有202599張影像,每張影像的正中間包含人臉,且人臉型別差異挺大,所有的影象尺寸為178*218。具體的資料集展示:

3.輔助函式檔案utils.py

該檔案主要放置一些資料集預處理函式,主要做的事情包括自動下載資料集(如果資料集自己下載了的話可以註釋掉這些程式碼),將資料集裁剪成64*64大小,將資料分解成多個batch,繪製最終結果等函式。具體的程式碼如下:

import math
import os

import matplotlib.pyplot as plt
import numpy as np
from PIL import Image


# 根據影象路徑來讀取影象,並對影象進行裁剪,由於人臉基本都是處在影象的正中央,因此直接裁剪中心部分即可
def get_image(image_path, width, height, mode):
    """
    Read image from image_path
    :param image_path: Path of image
    :param width: Width of image
    :param height: Height of image
    :param mode: Mode of image
    :return: Image data
    """
    image = Image.open(image_path)

    if image.size != (width, height):  # HACK - Check if image is from the CELEBA dataset
        # Remove most pixels that aren't part of a face
        face_width = face_height = 108
        j = (image.size[0] - face_width) // 2
        i = (image.size[1] - face_height) // 2
        image = image.crop([j, i, j + face_width, i + face_height])
        image = image.resize([width, height], Image.BILINEAR)

    return np.array(image.convert(mode))


# 將讀取的影象分批
def get_batch(image_files, width, height, mode):
    data_batch = np.array(
        [get_image(sample_file, width, height, mode) for sample_file in image_files]).astype(np.float32)

    # Make sure the images are in 4 dimensions
    if len(data_batch.shape) < 4:
        data_batch = data_batch.reshape(data_batch.shape + (1,))

    return data_batch


# 構建資料集類,需要用到上述兩個函式
class Dataset(object):

    def __init__(self, data_files):
        """
        param data_files: List of files in the database
        """
        IMAGE_WIDTH = 64
        IMAGE_HEIGHT = 64

        self.image_mode = 'RGB'
        image_channels = 3

        self.data_files = data_files
        self.shape = len(data_files), IMAGE_WIDTH, IMAGE_HEIGHT, image_channels

    def get_batches(self, batch_size):
        """
        Generate batches
        :param batch_size: Batch Size
        :return: Batches of data
        """
        IMAGE_MAX_VALUE = 255

        current_index = 0
        while current_index + batch_size <= self.shape[0]:
            data_batch = get_batch(
                self.data_files[current_index:current_index + batch_size],
                *self.shape[1:3], 
                self.image_mode)

            current_index += batch_size

            yield data_batch / IMAGE_MAX_VALUE - 0.5


# 將生成的影象放置在一起
def images_square_grid(images, mode):
    """
    Save images as a square grid
    :param images: Images to be used for the grid
    :param mode: The mode to use for images
    :return: Image of images in a square grid
    """
    # Get maximum size for square grid of images
    save_size = math.floor(np.sqrt(images.shape[0]))

    # Scale to 0-255
    images = (((images - images.min()) * 255) /
              (images.max() - images.min())).astype(np.uint8)

    # Put images in a square arrangement
    images_in_square = np.reshape(
        images[:save_size * save_size],
        (save_size, save_size, images.shape[1], images.shape[2], images.shape[3]))
    if mode == 'L':
        images_in_square = np.squeeze(images_in_square, 4)

    # Combine images to grid image
    new_im = Image.new(
        mode, (images.shape[1] * save_size, images.shape[2] * save_size))
    for col_i, col_images in enumerate(images_in_square):
        for image_i, image in enumerate(col_images):
            im = Image.fromarray(image, mode)
            new_im.paste(
                im, (col_i * images.shape[1], image_i * images.shape[2]))

    return new_im


# 對最終的結果進行繪製並儲存
def save_plot(data, title, image_mode=None, isImage=False):
    """
    Save images or plot to file on the out folder
    Can also save stacked plots
    """
    if not os.path.exists('out/'):
        os.makedirs('out/')

    fig = plt.figure()

    if isImage:
        cmap = None if image_mode == 'RGB' else 'gray'
        plt.imshow(data, cmap=cmap)

    else:

        if type(data) == list:
            for i in data:
                plt.plot(i)
        else:
            plt.plot(data)

    fig.savefig('out/' + title)
    plt.close(fig)


# 對生成器的結果進行繪製,需要用到上述兩個函式
def show_generator_output(sess, generator, input_z, example_z, out_channel_dim, image_mode, num):
    """
    Show example output for the generator
    :param sess: TensorFlow session
    :param n_images: Number of Images to display
    :param input_z: Input Z Tensor
    :param out_channel_dim: The number of channels in the output image
    :param image_mode: The mode to use for images ("RGB" or "L")
    """

    samples = sess.run(
        generator(input_z, out_channel_dim, False),
        feed_dict={input_z: example_z})

    images_grid = images_square_grid(samples, image_mode)
    save_plot(images_grid, '{}.png'.format(num), image_mode, True)


# 進行高斯平滑
def smooth(list, degree=5):
    """
    By Scott W Harden from www.swharden.com
    """
    window = degree * 2 - 1
    weight = np.array([1.0] * window)
    weightGauss = []

    for i in range(window):
        i = i - degree + 1
        frac = i / float(window)
        gauss = 1 / (np.exp((4 * (frac)) ** 2))
        weightGauss.append(gauss)

    weight = np.array(weightGauss) * weight
    smoothed = [0.0] * (len(list) - window)

    for i in range(len(smoothed)):
        smoothed[i] = sum(np.array(list[i:i + window]) * weight) / sum(weight)

    return smoothed

4.模型函式檔案model.py

下面是BEGAN的模型檔案,該檔案中主要定義BEGAN的模型結構,其程式碼為:

import tensorflow as tf
from tensorflow.python.ops import math_ops
from tensorflow.python.framework import ops
import numpy as np


class BEGAN(object):
    def __init__(self, place_holder=''):
        self.place_holder = place_holder
        # pass

    def model_inputs(self, image_width, image_height, image_channels, z_dim):
        """
        Create the model inputs/tensors
        """
        inputs_real = tf.placeholder(
            tf.float32, (None, image_width, image_height, image_channels), name='input_real')
        inputs_z = tf.placeholder(tf.float32, (None, z_dim), name='input_z')
        learning_rate = tf.placeholder(tf.float32, [], name='learning_rate')
        k_t = tf.placeholder(tf.float32, name='k_t')

        return inputs_real, inputs_z, learning_rate, k_t

    # default aplha is 0.2, 0.01 works best for this example
    # Function from TensorFlow v1.4 for backwards compatability
    def leaky_relu(self, features, alpha=0.01, name=None):
        with ops.name_scope(name, "LeakyRelu", [features, alpha]):
            features = ops.convert_to_tensor(features, name="features")
            alpha = ops.convert_to_tensor(alpha, name="alpha")

            return math_ops.maximum(alpha * features, features)

    def fully_connected(self, x, output_shape):
        # flatten and dense
        shape = x.get_shape().as_list()
        dim = np.prod(shape[1:])

        x = tf.reshape(x, [-1, dim])
        x = tf.layers.dense(x, output_shape, activation=None)

        return x

    def decoder(self, h, n, img_dim, channel_dim):
        """
        Reconstruction network
        """
        h = tf.layers.dense(h, img_dim * img_dim * n, activation=None)
        h = tf.reshape(h, (-1, img_dim, img_dim, n))

        conv1 = tf.layers.conv2d(
            h, n, 3, padding="same", activation=self.leaky_relu)
        conv1 = tf.layers.conv2d(
            conv1, n, 3, padding="same", activation=self.leaky_relu)

        upsample1 = tf.image.resize_nearest_neighbor(
            conv1, size=(img_dim * 2, img_dim * 2))

        conv2 = tf.layers.conv2d(
            upsample1, n, 3, padding="same", activation=self.leaky_relu)
        conv2 = tf.layers.conv2d(
            conv2, n, 3, padding="same", activation=self.leaky_relu)

        upsample2 = tf.image.resize_nearest_neighbor(
            conv2, size=(img_dim * 4, img_dim * 4))

        conv3 = tf.layers.conv2d(
            upsample2, n, 3, padding="same", activation=self.leaky_relu)
        conv3 = tf.layers.conv2d(
            conv3, n, 3, padding="same", activation=self.leaky_relu)

        conv4 = tf.layers.conv2d(conv3, channel_dim, 3,
                                 padding="same", activation=None)

        return conv4

    def encoder(self, images, n, z_dim, channel_dim):
        """
        Feature extraction network
        """
        conv1 = tf.layers.conv2d(
            images, n, 3, padding="same", activation=self.leaky_relu)

        conv2 = tf.layers.conv2d(
            conv1, n, 3, padding="same", activation=self.leaky_relu)
        conv2 = tf.layers.conv2d(
            conv2, n * 2, 3, padding="same", activation=self.leaky_relu)

        subsample1 = tf.layers.conv2d(
            conv2, n * 2, 3, strides=2, padding='same')

        conv3 = tf.layers.conv2d(subsample1, n * 2, 3,
                                 padding="same", activation=self.leaky_relu)
        conv3 = tf.layers.conv2d(
            conv3, n * 3, 3, padding="same", activation=self.leaky_relu)

        subsample2 = tf.layers.conv2d(
            conv3, n * 3, 3, strides=2, padding='same')

        conv4 = tf.layers.conv2d(subsample2, n * 3, 3,
                                 padding="same", activation=self.leaky_relu)
        conv4 = tf.layers.conv2d(
            conv4, n * 3, 3, padding="same", activation=self.leaky_relu)

        h = self.fully_connected(conv4, z_dim)

        return h

    def discriminator(self, images, z_dim, channel_dim, reuse=False):
        """
        Create the discriminator network: The autoencoder
        """
        with tf.variable_scope('discriminator', reuse=reuse):
            x = self.encoder(images, 64, z_dim, channel_dim)
            x = self.decoder(x, 64, 64 // 4, channel_dim)

            return x

    def generator(self, z, channel_dim, is_train=True):
        """
        Create the generator network: Only the encoder part
        """
        reuse = False if is_train else True
        with tf.variable_scope('generator', reuse=reuse):
            x = self.decoder(z, 64, 64 // 4, channel_dim)

            return x

    def model_loss(self, input_real, input_z, channel_dim, z_dim, k_t):
        """
        Get the loss for the discriminator and generator
        """
        g_model_fake = self.generator(input_z, channel_dim, is_train=True)
        d_model_real = self.discriminator(input_real, z_dim, channel_dim)
        d_model_fake = self.discriminator(
            g_model_fake, z_dim, channel_dim, reuse=True)

        # l1 loss
        d_real = tf.reduce_mean(tf.abs(input_real - d_model_real))
        d_fake = tf.reduce_mean(tf.abs(g_model_fake - d_model_fake))

        d_loss = d_real - k_t * d_fake
        g_loss = d_fake

        return d_loss, g_loss, d_real, d_fake

    def model_opt(self, d_loss, g_loss, learning_rate, beta1, beta2=0.999):
        """
        Get optimization operations
        """
        # Get variables
        g_vars = tf.get_collection(
            tf.GraphKeys.GLOBAL_VARIABLES, "generator")
        d_vars = tf.get_collection(
            tf.GraphKeys.GLOBAL_VARIABLES, "discriminator")

        # Optimize
        d_train_opt = tf.train.AdamOptimizer(
            learning_rate, beta1=beta1, beta2=beta2).minimize(d_loss, var_list=d_vars)
        g_train_opt = tf.train.AdamOptimizer(
            learning_rate, beta1=beta1, beta2=beta2).minimize(g_loss, var_list=g_vars)

        return d_train_opt, g_train_opt

5.主檔案main.py

定義完上述兩個檔案之後,需要在主檔案中定義訓練過程,並對BEGAN的訓練結果進行繪圖,先直接給出BEGAN的main.py的程式碼:

from models import BEGAN
import tensorflow as tf
from glob import glob
import numpy as np
import utils
import math
import os


def train(model, epoch_count, batch_size, z_dim, star_learning_rate, beta1, beta2, get_batches, data_shape, image_mode):

    input_real, input_z, lrate, k_t = model.model_inputs(*(data_shape[1:]), z_dim)

    d_loss, g_loss, d_real, d_fake = model.model_loss(
        input_real, input_z, data_shape[3], z_dim, k_t)

    d_opt, g_opt = model.model_opt(d_loss, g_loss, lrate, beta1, beta2)

    losses = []
    iter = 0

    epoch_drop = 3

    lam = 1e-3
    gamma = 0.5
    k_curr = 0.0

    test_z = np.random.uniform(-1, 1, size=(16, z_dim))

    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())

        for epoch_i in range(epoch_count):

            learning_rate = star_learning_rate * \
                math.pow(0.2, math.floor((epoch_i + 1) / epoch_drop))
            for batch_images in get_batches(batch_size):
                iter += 1
                batch_images *= 2

                batch_z = np.random.uniform(-1, 1, size=(batch_size, z_dim))

                _, d_real_curr = sess.run([d_opt, d_real], feed_dict={
                                          input_z: batch_z, input_real: batch_images, lrate: learning_rate, k_t: k_curr})

                _, d_fake_curr = sess.run([g_opt, d_fake], feed_dict={
                                          input_z: batch_z, input_real: batch_images, lrate: learning_rate, k_t: k_curr})

                k_curr = k_curr + lam * (gamma * d_real_curr - d_fake_curr)

                # save convergence measure
                if iter % 100 == 0:
                    measure = d_real_curr + \
                        np.abs(gamma * d_real_curr - d_fake_curr)
                    losses.append(measure)

                    print("Epoch {}/{}, batch {}...".format(epoch_i + 1, epoch_count, iter),
                          'Convergence measure: {:.4}'.format(measure))

                # save test and batch images
                if iter % 700 == 0:
                    utils.show_generator_output(
                        sess, model.generator, input_z, batch_z, data_shape[3], image_mode, 'batch-' + str(iter))

                    utils.show_generator_output(
                        sess, model.generator, input_z, test_z, data_shape[3], image_mode, 'test-' + str(iter))

        print('Training steps: ', iter)

        losses = np.array(losses)

        utils.save_plot([losses, utils.smooth(losses)],
                         'convergence_measure.png')


if __name__ == '__main__':
    batch_size = 16
    z_dim = 64  
    learning_rate = 0.0001
    beta1 = 0.5
    beta2 = 0.999
    epochs = 20

    data_dir = './data/'

    model = BEGAN()

    celeba_dataset = utils.Dataset(glob(os.path.join(data_dir, 'img_align_celeba_png/*.png')))

    with tf.Graph().as_default():
        train(model,
              epochs,
              batch_size,
              z_dim,
              learning_rate,
              beta1,
              beta2,
              celeba_dataset.get_batches,
              celeba_dataset.shape,
              celeba_dataset.image_mode)

直接執行main.py檔案,即可執行試驗。

四、實驗結果

試驗暫且設定為20個epoch,每個batch_size設定為16張圖片,所以每個epoch約有12000多個batch。每700個batch用生成器生成一組人臉資料,並對之前設定好的資料再生成一組人臉資料,這樣每700個batch會得到兩組結果,20個epoch訓練完畢之後,會自動繪製loss曲線。

用生成器隨機生成的人臉資料的變化:

以上是每一次訓練完成之後對於隨機資料的生成器生成結果,那麼對於固定資料來說,其變化過程為:

當然,我目前並沒有訓練完,不過已經可以看到,大約在訓練8000左右個batch 的時候,生成的結果已經非常好了。不過隨著生成結果的質量越來越好,噪聲的問題也越來越明顯。


第二次更新

最終我一共訓練了253240個batch,用GPU大概訓練了2天,訓練最終的loss影象為(縱座標表示loss值,橫座標表示訓練次數,藍色是真實的loss曲線,橙色的是平滑後的loss曲線):

人臉生成的影象為(上面一行為隨機生成的人臉,下面一行為固定生成的人臉,選擇batch的值分別為~100000, ~150000, ~200000, ~250000):

五、分析

1.BEGAN可以用於生成多種不同的人臉資料,且試驗的效果較好。

2.檔案的結構可以參見三(1)中的介紹。

3.原始碼中給出了python下載資料的程式碼,由於我是自己下載的程式碼,所以就將這部分程式碼刪除了,下面給出這部分程式碼(我自己修改過一點),有興趣的話可以自己試試,直接執行下面的程式碼,會在'./data/'資料夾下面下載到資料:

import os
import hashlib
from urllib.request import urlretrieve
import zipfile
import shutil
from tqdm import tqdm


def download_extract(database_name, data_path):
    """
    Download and extract database
    :param database_name: Database name
    """
    url = 'https://s3-us-west-1.amazonaws.com/udacity-dlnfd/datasets/celeba.zip'
    hash_code = '00d2c5bc6d35e252742224ab0c1e8fcb'
    extract_path = os.path.join(data_path, 'img_align_celeba')
    save_path = os.path.join(data_path, 'celeba.zip')
    extract_fn = _unzip

    if os.path.exists(extract_path):
        print('Found {} Data'.format(database_name))
        return

    if not os.path.exists(data_path):
        os.makedirs(data_path)

    if not os.path.exists(save_path):
        with DLProgress(unit='B', unit_scale=True, miniters=1, desc='Downloading {}'.format(database_name)) as pbar:
            urlretrieve(
                url,
                save_path,
                pbar.hook)

    assert hashlib.md5(open(save_path, 'rb').read()).hexdigest() == hash_code, \
        '{} file is corrupted.  Remove the file and try again.'.format(
            save_path)

    os.makedirs(extract_path)
    try:
        extract_fn(save_path, extract_path, database_name, data_path)
    except Exception as err:
        # Remove extraction folder if there is an error
        shutil.rmtree(extract_path)
        raise err

    # Remove compressed data
    os.remove(save_path)


def _unzip(save_path, _, database_name, data_path):
    """
    Unzip wrapper with the same interface as _ungzip
    :param save_path: The path of the gzip files
    :param database_name: Name of database
    :param data_path: Path to extract to
    :param _: HACK - Used to have to same interface as _ungzip
    """
    print('Extracting {}...'.format(database_name))
    with zipfile.ZipFile(save_path) as zf:
        zf.extractall(data_path)


class DLProgress(tqdm):
    """
    Handle Progress Bar while Downloading
    """
    last_block = 0

    def hook(self, block_num=1, block_size=1, total_size=None):
        """
        A hook function that will be called once on establishment of the network connection and
        once after each block read thereafter.
        :param block_num: A count of blocks transferred so far
        :param block_size: Block size in bytes
        :param total_size: The total size of the file. This may be -1 on older FTP servers which do not return
                            a file size in response to a retrieval request.
        """
        self.total = total_size
        self.update((block_num - self.last_block) * block_size)
        self.last_block = block_num


if __name__ == '__main__':
    data_dir='./data/'
    download_extract('celeba', data_dir)