1. 程式人生 > >Keras特徵圖Tensorboard視覺化

Keras特徵圖Tensorboard視覺化

前言

之前在用Tensorflow框架的時候,對於特徵圖的視覺化已經研究了一下。主要的思路其實就是將特徵圖展開鋪平成一張大圖。具體文章參考Tensorboard特徵圖視覺化

文章主要的函式就是將特徵圖沿著第四通道展開,然後將其拼接。最後的返回結果 all_concact 其實還是個四維的張量,第一維是 batch_size,最後一維是 1,中間兩維則是大的特徵圖的尺寸。

def _concact_features(self, conv_output):
     """
     對特徵圖進行reshape拼接
     :param conv_output:輸入多通道的特徵圖
     :return:
     """
num_or_size_splits = conv_output.get_shape().as_list()[-1] each_convs = tf.split(conv_output, num_or_size_splits=num_or_size_splits, axis=3) concact_size = int(math.sqrt(num_or_size_splits) / 1) all_concact = None for i in range(concact_size): row_concact = each_convs[
i * concact_size] for j in range(concact_size - 1): row_concact = tf.concat([row_concact, each_convs[i * concact_size + j + 1]], 1) if i == 0: all_concact = row_concact else: all_concact = tf.concat([all_concact, row_concact], 2) return
all_concact

最近在用 Keras 框架,Keras 框架使用 Tensorboard 似乎更加簡單,利用 Tensorboard 這個 Callback 回撥函式,自帶了顯示 loss 等曲線的功能。但是對於特徵圖、輸入圖片等涉及到 image 相關的,Keras 自帶的 Tensorboard 並沒有幫我們實現,所以需要我們自己去完善。

關於這方面的現成的資料可能有點難找,所以自己稍微摸索了一陣子,主要思路還是基於修改原始碼的方式進行擴充套件我們自己需要的功能。

主要有兩個注意點:

  1. 利用Keras的API獲取到網路中間層的輸出結果時,這些只能用來定義 summary,並不是直接就獲取到輸出值了,想要獲取到真正的輸出值,需要給網路填充資料;
  2. 我們只需要在 set_model() 函式中利用 tf.summary.image() 之類的函式定義 summary 一次即可,想要更新則在 on_batch_end() 函式中不斷的計算更新,也就是 sess.run()add_summary()

可能表述的不是很清楚,沒有用專業的詞彙去表達,不能理解的可以直接看程式碼的實現思路,然後再回頭看這兩個注意點。

程式碼實現

下面附上自定義的 Tensorboard 原始碼。程式碼的自定義 MyTensorBoard 類的建構函式引數的含義都用註釋標明,程式碼的主要修改在於加入了顯示學習率變化的曲線和特徵圖的展示

利用 self.model.layers 獲取 Keras 的 model 的每一個網路層,然後遍歷每一層,通過 layer.output 獲取網路層的輸出。利用 tf.summary.image() 將這些層的輸出新增至 Tensorboard 中即可。

實現過程需要注意上述的兩個注意點。其實主要就是 Keras 官方文件的那些獲取網路中間層輸出等等 API 都是不能直接有資料的,一定要 feed 資料才行。其實仔細想想是有道理的,Keras 只是包裝了一下 Tensorflow 的計算圖(網路模型),而不是幫我們都直接計算 (sess.run) 好了值。

# coding=utf-8
"""
自定義Tensorboard顯示特徵圖
"""
from keras.callbacks import Callback
from keras import backend as K
import warnings
import math
import numpy as np


class MyTensorBoard(Callback):
    """TensorBoard basic visualizations.
    log_dir: the path of the directory where to save the log
        files to be parsed by TensorBoard.
    write_graph: whether to visualize the graph in TensorBoard.
        The log file can become quite large when
        write_graph is set to True.
    batch_size: size of batch of inputs to feed to the network
        for histograms computation.
    input_images: input data of the model, because we will use it to build feed dict to
        feed the summary sess.
    write_features: whether to write feature maps to visualize as
        image in TensorBoard.
    update_features_freq: update frequency of feature maps, the unit is batch, means
        update feature maps per update_features_freq batches
    update_freq: `'batch'` or `'epoch'` or integer. When using `'batch'`, writes
        the losses and metrics to TensorBoard after each batch. The same
        applies for `'epoch'`. If using an integer, let's say `10000`,
        the callback will write the metrics and losses to TensorBoard every
        10000 samples. Note that writing too frequently to TensorBoard
        can slow down your training.
    """

    def __init__(self, log_dir='./logs',
                 batch_size=64,
                 update_features_freq=1,
                 input_images=None,
                 write_graph=True,
                 write_features=False,
                 update_freq='epoch'):
        super(MyTensorBoard, self).__init__()
        global tf, projector
        try:
            import tensorflow as tf
            from tensorflow.contrib.tensorboard.plugins import projector
        except ImportError:
            raise ImportError('You need the TensorFlow module installed to '
                              'use TensorBoard.')

        if K.backend() != 'tensorflow':
            if write_graph:
                warnings.warn('You are not using the TensorFlow backend. '
                              'write_graph was set to False')
                write_graph = False
            if write_features:
                warnings.warn('You are not using the TensorFlow backend. '
                              'write_features was set to False')
                write_features = False

        self.input_images = input_images[0]
        self.log_dir = log_dir
        self.merged = None
        self.im_summary = []
        self.lr_summary = None
        self.write_graph = write_graph
        self.write_features = write_features
        self.batch_size = batch_size
        self.update_features_freq = update_features_freq
        if update_freq == 'batch':
            # It is the same as writing as frequently as possible.
            self.update_freq = 1
        else:
            self.update_freq = update_freq
        self.samples_seen = 0
        self.samples_seen_at_last_write = 0

    def set_model(self, model):
        self.model = model
        if K.backend() == 'tensorflow':
            self.sess = K.get_session()
        if self.merged is None:
            # 顯示特徵圖
            # 遍歷所有的網路層
            for layer in self.model.layers:
                # 獲取當前層的輸出與名稱
                feature_map = layer.output
                feature_map_name = layer.name.replace(':', '_')

                if self.write_features and len(K.int_shape(feature_map)) == 4:
                    # 展開特徵圖並拼接成大圖
                    flat_concat_feature_map = self._concact_features(feature_map)
                    # 判斷展開的特徵圖最後通道數是否是1
                    shape = K.int_shape(flat_concat_feature_map)
                    assert len(shape) == 4 and shape[-1] == 1
                    # 寫入tensorboard
                    self.im_summary.append(tf.summary.image(feature_map_name, flat_concat_feature_map, 4))  # 第三個引數為tensorboard展示幾個

            # 顯示學習率的變化
            self.lr_summary = tf.summary.scalar("learning_rate", self.model.optimizer.lr)

        self.merged = tf.summary.merge_all()

        if self.write_graph:
            self.writer = tf.summary.FileWriter(self.log_dir, self.sess.graph)
        else:
            self.writer = tf.summary.FileWriter(self.log_dir)

    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}

        if self.validation_data:
            val_data = self.validation_data
            tensors = (self.model.inputs +
                       self.model.targets +
                       self.model.sample_weights)

            if self.model.uses_learning_phase:
                tensors += [K.learning_phase()]

            assert len(val_data) == len(tensors)
            val_size = val_data[0].shape[0]
            i = 0
            while i < val_size:
                step = min(self.batch_size, val_size - i)
                if self.model.uses_learning_phase:
                    # do not slice the learning phase
                    batch_val = [x[i:i + step] for x in val_data[:-1]]
                    batch_val.append(val_data[-1])
                else:
                    batch_val = [x[i:i + step] for x in val_data]
                assert len(batch_val) == len(tensors)
                feed_dict = dict(zip(tensors, batch_val))
                result = self.sess.run([self.merged], feed_dict=feed_dict)
                summary_str = result[0]
                self.writer.add_summary(summary_str, epoch)
                i += self.batch_size

        if self.update_freq == 'epoch':
            index = epoch
        else:
            index = self.samples_seen
        self._write_logs(logs, index)

    def _write_logs(self, logs, index):
        for name, value in logs.items():
            if name in ['batch', 'size']:
                continue
            summary = tf.Summary()
            summary_value = summary.value.add()
            if isinstance(value, np.ndarray):
                summary_value.simple_value = value.item()
            else:
                summary_value.simple_value = value
            summary_value.tag = name
            self.writer.add_summary(summary, index)
        self.writer.flush()

    def on_train_end(self, _):
        self.writer.close()

    def on_batch_end(self, batch, logs=None):
        if self.update_freq != 'epoch':
            self.samples_seen += logs['size']
            samples_seen_since = self.samples_seen - self.samples_seen_at_last_write
            if samples_seen_since >= self.update_freq:
                self._write_logs(logs, self.samples_seen)
                self.samples_seen_at_last_write = self.samples_seen

        # 每update_features_freq個batch重新整理特徵圖
        if batch % self.update_features_freq == 0:
            # 計算summary_image
            feed_dict = dict(zip(self.model.inputs, self.input_images[np.newaxis, ...]))
            for i in range(len(self.im_summary)):
                summary = self.sess.run(self.im_summary[i], feed_dict)
                self.writer.add_summary(summary, self.samples_seen)

        # 每個batch顯示學習率
        summary = self.sess.run(self.lr_summary, {self.model.optimizer.lr: K.eval(self.model.optimizer.lr)})
        self.writer.add_summary(summary, self.samples_seen)

    def _concact_features(self, conv_output):
        """
        對特徵圖進行reshape拼接
        :param conv_output:輸入多通道的特徵圖
        :return: all_concact
        """
        all_concact = None

        num_or_size_splits = conv_output.get_shape().as_list()[-1]
        each_convs = tf.split(conv_output, num_or_size_splits=num_or_size_splits, axis=3)

        if num_or_size_splits < 4:
            # 對於特徵圖少於4通道的認為是輸入,直接橫向concact輸出即可
            concact_size = num_or_size_splits
            all_concact = each_convs[0]
            for i in range(concact_size - 1):
                all_concact = tf.concat([all_concact, each_convs[i + 1]], 1)
        else:
            concact_size = int(math.sqrt(num_or_size_splits) / 1)
            for i in range(concact_size):
                row_concact = each_convs[i * concact_size]
                for j in range(concact_size - 1):
                    row_concact = tf.concat([row_concact, each_convs[i * concact_size + j + 1]], 1)
                if i == 0:
                    all_concact = row_concact
                else:
                    all_concact = tf.concat([all_concact, row_concact], 2)
        return all_concact

該自定義類基本和具體的網路無關,可以直接就用。唯一需要自己實現的部分就是傳入 input_images 引數,這其實就是網路的輸入資料。一般使用 Keras 框架,都會定義資料的讀取函式(generate 函式),並將該讀取函式傳給 model.fit_generator() 中。我們僅僅需要利用 __next__ 方法(python2是 next())即可獲取到一組資料傳入我們自定義的 MyTensorBoard 建構函式中。

例如:

# 定義一個讀取資料的generate函式
def _get_data(self, path, batch_size, normalize):
     """
     Generator to be used with model.fit_generator()
     :param path: .npz資料路徑
     :param batch_size: batch_size
     :param normalize: 是否歸一化
     :return:
     """
     while True:
         files = glob.glob(os.path.join(path, '*.npz'))
         np.random.shuffle(files)
         for npz in files:
             # Load pack into memory
             archive = np.load(npz)
             images = archive['images']
             offsets = archive['offsets']
             del archive
             self._shuffle_in_unison(images, offsets)

             # 切分獲得batch
             num_batches = int(len(offsets) / batch_size)
             images = np.array_split(images, num_batches)
             offsets = np.array_split(offsets, num_batches)
             while offsets:
                 batch_images = images.pop()
                 batch_offsets = offsets.pop()
                 if normalize:
                     batch_images = (batch_images - 127.5) / 127.5
                 yield batch_images, batch_offsets

train_loader = _get_data(/path/xxx, 128, True)
MyTensorBoard(
    log_dir=self.config.tb_dir,
    input_images=train_loader.__next__(),  # 使用__next__()獲取一組資料
    batch_size=self.config.batch_size,
    update_features_freq=50,
    write_features=True,
    write_graph=True,
    update_freq='batch'
)

結果圖

  1. 顯示學習率變化曲線(附自定義指數下降學習率的實現程式碼);
    class LearningRate(Callback):
        """
        自定義學習率調整函式
        """
        def __init__(self, config):
            super(LearningRate, self).__init__()
            self.base_lr = config.learning_rate
            self.num_total_steps = config.num_epochs * int(np.ceil(config.num_datas / float(config.batch_size)))
            self.step = 0  # 記錄訓練batch的次數
            # 下降間隔
            self.decay_steps = int((math.log(0.96) * self.num_total_steps) / math.log(config.min_lr * 1.0 / self.base_lr))
            print("[DEBUG]: learning rate decay steps is %d ......" % self.decay_steps)
    
        def on_batch_begin(self, batch, logs=None):
            self.step += 1
            if self.step % self.decay_steps == 0:
                cur_lr = self.base_lr * math.pow(0.96, self.step / self.decay_steps)
                K.set_value(self.model.optimizer.lr, cur_lr)
    
    學習率
  2. 顯示特徵圖。
    網路層
    特徵圖
    特徵圖