1. 程式人生 > >從頭實現一個深度學習的對話系統--tf.contrib.seq2seq API介紹

從頭實現一個深度學習的對話系統--tf.contrib.seq2seq API介紹

這篇文章就簡單從原始碼的角度上分析一下tf.contrib.seq2seq下提供的API,首先來講這個資料夾下面的幾個檔案和函式上篇文章中都已經提到而且介紹了他們之間的關係和如何使用,如果對原始碼不感興趣就不用看下去了~~

BasicDecoder和dynamic_decode

為了簡單起見,從decode的入口dynamic_deocde函式開始分析:

    dynamic_decode(
    decoder,
    output_time_major=False,
    impute_finished=False,
    maximum_iterations=None,
    parallel_iterations=32
, swap_memory=False, scope=None ) decoder: BasicDecoder、BeamSearchDecoder或者自己定義的decoder類物件 output_time_major: 見RNN,為真時step*batch_size*...,為假時batch_size*step*... impute_finished: Boolean,為真時會拷貝最後一個時刻的狀態並將輸出置零,程式執行更穩定,使最終狀態和輸出具有正確的值,在反向傳播時忽略最後一個完成步。但是會降低程式執行速度。 maximum_iterations: 最大解碼步數,一般訓練設定為decoder_inputs_length,預測時設定一個想要的最大序列長度即可。程式會在產生<eos>或者到達最大步數處停止。

其實簡單來講dynamic_decode就是先執行decoder的初始化函式,對解碼時刻的state等變數進行初始化,然後迴圈執行decoder的step函式進行多輪解碼。如果讓我寫可能就一個for迴圈,但是原始碼裡面比較複雜,因為會涉及到很多條件判斷等,以保證程式正常執行和報錯。所以我們直接來看主體程式部分,也是一個control_flow_ops.while_loop迴圈,正好藉機瞭解一下這個函式的使用方法:

    while_loop(cond, body, loop_vars, shape_invariants=None, parallel_iterations=10, back_prop=True
, swap_memory=False, name=None)

cond是迴圈的條件,body是迴圈執行的主體,這兩個都是函式。loop_vars是要用到的變數,cond和body的引數相同且都是loop_vars。但一般cond只用到個別引數用來判斷迴圈是否結束,大部分引數都是body中用到。parallel_iterations是並行執行迴圈的個數。看下面cond函式其實就是看finished變數是否已經全部變為0,而body函式也就是執行了decoder.step(time, inputs, state)這句程式碼之後一系列的賦值和判斷。

    def condition(unused_time, unused_outputs_ta, unused_state, unused_inputs,
                  finished, unused_sequence_lengths):
      return math_ops.logical_not(math_ops.reduce_all(finished))

    def body(time, outputs_ta, state, inputs, finished, sequence_lengths):
        #======1,呼叫step函式得到下一時刻的輸出、狀態、並得到下一時刻輸入(由helper得到)和是否完成變數decoder_finished
      (next_outputs, decoder_state, next_inputs, decoder_finished) = decoder.step(time, inputs, state)
        #======2,根據decoder_finished和time是否已經大於maximum_iterations綜合判斷解碼是否結束
      next_finished = math_ops.logical_or(decoder_finished, finished)
      if maximum_iterations is not None:
        next_finished = math_ops.logical_or(
            next_finished, time + 1 >= maximum_iterations)
      next_sequence_lengths = array_ops.where(
          math_ops.logical_and(math_ops.logical_not(finished), next_finished),
          array_ops.fill(array_ops.shape(sequence_lengths), time + 1),
          sequence_lengths)

      nest.assert_same_structure(state, decoder_state)
      nest.assert_same_structure(outputs_ta, next_outputs)
      nest.assert_same_structure(inputs, next_inputs),
        ##======3,如果設定了impute_finished為真,在程式結束時將next_outputs置為零,不讓其進行反向傳播。並對decoder_state進行拷貝得到下一時刻狀態。所以這裡如果設定為true,會浪費一些時間
      if impute_finished:
        emit = nest.map_structure(lambda out, zero: array_ops.where(finished, zero, out), next_outputs, zero_outputs)
      else:
        emit = next_outputs

      # Copy through states past finish
      def _maybe_copy_state(new, cur):
        # TensorArrays and scalar states get passed through.
        if isinstance(cur, tensor_array_ops.TensorArray):
          pass_through = True
        else:
          new.set_shape(cur.shape)
          pass_through = (new.shape.ndims == 0)
        return new if pass_through else array_ops.where(finished, cur, new)

      if impute_finished:
        next_state = nest.map_structure(_maybe_copy_state, decoder_state, state)
      else:
        next_state = decoder_state
        #=====4,返回結果。
      outputs_ta = nest.map_structure(lambda ta, out: ta.write(time, out), outputs_ta, emit)
      return (time + 1, outputs_ta, next_state, next_inputs, next_finished, next_sequence_lengths)
    #呼叫上面定義的cond和body進行迴圈解碼
    res = control_flow_ops.while_loop(condition, body,
        loop_vars=[initial_time, initial_outputs_ta, initial_state, initial_inputs, initial_finished, initial_sequence_lengths, ],
        parallel_iterations=parallel_iterations, swap_memory=swap_memory)

看完上面程式碼,就會想知道decoder.step()函式究竟做了哪些工作。其實你可以把它理解為RNNCell.cell滾動了一次。只不過考慮到解碼,會在此基礎上新增一些諸如使用helper得到輸出答案,並將其轉換為下一時刻輸入等操作。如下所示:

      def step(self, time, inputs, state, name=None):
        with ops.name_scope(name, "BasicDecoderStep", (time, inputs, state)):
          cell_outputs, cell_state = self._cell(inputs, state)
          if self._output_layer is not None:
            #如果設定了output層,將cell的輸出進行對映
            cell_outputs = self._output_layer(cell_outputs)
          #根據輸出結果,選出想要的答案,比如說貪婪法選擇概率最大的單詞,Scheduled使用某種概率分佈進行取樣等等
          sample_ids = self._helper.sample(time=time, outputs=cell_outputs, state=cell_state)
          #得到輸出結果將其轉化為下一時刻輸入。train的時候就是decoder_inputs的下一時刻,預測的時候將選出的單詞進行embedding即可
          (finished, next_inputs, next_state) = self._helper.next_inputs(time=time, outputs=cell_outputs, state=cell_state, sample_ids=sample_ids)
        outputs = BasicDecoderOutput(cell_outputs, sample_ids)#nameTulpe,將其一起作為outputs變數
        return (outputs, next_state, next_inputs, finished)

helper檔案的TrainingHelper和GreedyEmbeddingHelper

接下來我們就看一下不同的helper類的sample和next_inputs兩個函式分別幹了什麼。

TrainingHelper

      def sample(self, time, outputs, name=None, **unused_kwargs):
        with ops.name_scope(name, "TrainingHelperSample", [time, outputs]):
          #使用argmax函式取出outputs中的最大值
          sample_ids = math_ops.cast(math_ops.argmax(outputs, axis=-1), dtypes.int32)
          return sample_ids

      def next_inputs(self, time, outputs, state, name=None, **unused_kwargs):
        with ops.name_scope(name, "TrainingHelperNextInputs", [time, outputs, state]):
          next_time = time + 1
          finished = (next_time >= self._sequence_length)
          all_finished = math_ops.reduce_all(finished)
          #直接從decode_inputs中讀取下一個值作為下一時刻的解碼輸入
          def read_from_ta(inp):
            return inp.read(next_time)
          next_inputs = control_flow_ops.cond(
              all_finished, lambda: self._zero_inputs,
              lambda: nest.map_structure(read_from_ta, self._input_tas))
          return (finished, next_inputs, state)

GreedyEmbeddingHelper

      def sample(self, time, outputs, state, name=None):
        del time, state  # unused by sample_fn
        if not isinstance(outputs, ops.Tensor):
          raise TypeError("Expected outputs to be a single Tensor, got: %s" %type(outputs))
        #使用argmax函式取出outputs中的最大值
        sample_ids = math_ops.cast(math_ops.argmax(outputs, axis=-1), dtypes.int32)
        return sample_ids

      def next_inputs(self, time, outputs, state, sample_ids, name=None):
        del time, outputs  # unused by next_inputs_fn
        finished = math_ops.equal(sample_ids, self._end_token)
        all_finished = math_ops.reduce_all(finished)
        #將sample_ids通過embedding轉化成下一時刻輸入的詞向量
        next_inputs = control_flow_ops.cond(
            all_finished,
            # If we're finished, the next_inputs value doesn't matter
            lambda: self._start_inputs,
            lambda: self._embedding_fn(sample_ids))
        return (finished, next_inputs, state)

attention_wrapper檔案的BahdanauAttention和LuongAttention

可能上面的程式碼中並未涉及到attention機制,那這部分程式碼在哪裡會用到呢。tf原始碼理是把attention封裝到了RNNCell上面,就像DropoutWrapper那樣,這裡實現了一個AttentionWrapper,直接把attention封裝在RNNCell裡面,這樣每次呼叫self._cell(inputs, state)這句程式碼是都會執行attention機制。除此之外,這裡的attention機制採用了和Memory Networks裡面相似的思想,所以很多變數在定義的時候都是memory、query、keys、values等,不要造成理解上的困擾。此外,本文只會介紹這兩種attention機制的程式碼實現,具體的原理可以參考我之前的文章:從頭實現一個深度學習對話系統–Seq-to-Seq模型詳解

LuongAttention

先來看一下相似性分數的計算方法:

    def _luong_score(query, keys, scale):
      #本函式用來計算query和memory之間的相似性分數,直接使用內積表示
      depth = query.get_shape()[-1]
      key_units = keys.get_shape()[-1]
      dtype = query.dtype
      # query是本時刻的狀態,[batch_size, rnn_size],keys是記憶向量,[batch_size, rnn_size, max_time]
      # 所以二者相乘需要先將query擴充套件一個維度
      query = array_ops.expand_dims(query, 1) #[batch_size, 1, rnn_size]
      score = math_ops.matmul(query, keys, transpose_b=True) # [batch_size, 1, max_time]
      score = array_ops.squeeze(score, [1]) # [batch_size, max_time],表示max_time個記憶與query之間的相似性

      if scale:
        g = variable_scope.get_variable(
            "attention_g", dtype=dtype, initializer=1.)
        score = g * score
      return score

接下來再看看LuongAttention類的定義

    class LuongAttention(_BaseAttentionMechanism):
      def __init__(self, num_units, memory, memory_sequence_length=None, scale=False, probability_fn=None, score_mask_value=float("-inf"), name="LuongAttention"):
        #定義probability_fn函式,用來將得分進行歸一化操作,一般使用softmax
        if probability_fn is None:
          probability_fn = nn_ops.softmax
        wrapped_probability_fn = lambda score, _: probability_fn(score)
        #呼叫_BaseAttentionMechanism基類的建構函式
        super(LuongAttention, self).__init__(query_layer=None,
            memory_layer=layers_core.Dense(num_units, name="memory_layer", use_bias=False),
            memory=memory, probability_fn=wrapped_probability_fn, memory_sequence_length=memory_sequence_length,
            score_mask_value=score_mask_value, name=name)
        self._num_units = num_units
        self._scale = scale
        self._name = name

      def __call__(self, query, previous_alignments):
        with variable_scope.variable_scope(None, "luong_attention", [query]):
          #計算得分
          score = _luong_score(query, self._keys, self._scale)
        #歸一化
        alignments = self._probability_fn(score, previous_alignments)
        return alignments

BahdanauAttention

最終BahdanauAttention類的定義與上面LuongAttention相差無幾,就不在貼程式碼了,有興趣的可以自己看一下原始碼:

    def _bahdanau_score(processed_query, keys, normalize):
      dtype = processed_query.dtype
      num_units = keys.shape[2].value or array_ops.shape(keys)[2]
      processed_query = array_ops.expand_dims(processed_query, 1)
      v = variable_scope.get_variable("attention_v", [num_units], dtype=dtype)
      if normalize:
        # Scalar used in weight normalization
        g = variable_scope.get_variable("attention_g", dtype=dtype, initializer=math.sqrt((1. / num_units)))
        # Bias added prior to the nonlinearity
        b = variable_scope.get_variable("attention_b", [num_units], dtype=dtype, initializer=init_ops.zeros_initializer())
        # normed_v = g * v / ||v||
        normed_v = g * v * math_ops.rsqrt(math_ops.reduce_sum(math_ops.square(v)))
        return math_ops.reduce_sum(normed_v * math_ops.tanh(keys + processed_query + b), [2])
      else:
        return math_ops.reduce_sum(v * math_ops.tanh(keys + processed_query), [2])

原始碼裡還實現了其他的attention機制,這裡就不再贅述了。

beam_search_decoder檔案的BeamSearchDecoder

_tile_batch

beam_search這部分的程式碼比較多,不過看得開心啊,為什麼,因為他用的方法跟我之前自己想的方法一樣啊,我湊,我在沒看原始碼的時候想到了跟他一樣的方案,就是把輸入擴充套件beam_size倍,我可能能吹好久==#雖然就是很小的一個點,但是我們還是看程式碼吧。

之前實現chatbot的時候也說過,要想用beam_search的話,需要先將encoder的output、state、length使用tile_batch函式處理一下,將batch_size擴充套件beam_size倍變成batch_size*beam_size,具體原因就不說了,那我們就來看一下這個函式具體做了哪些工作:

    def _tile_batch(t, multiplier):
      t = ops.convert_to_tensor(t, name="t")
      shape_t = array_ops.shape(t)
      if t.shape.ndims is None or t.shape.ndims < 1:
        raise ValueError("t must have statically known rank")
      tiling = [1] * (t.shape.ndims + 1)
      tiling[1] = multiplier
      tiled_static_batch_size = (t.shape[0].value * multiplier if t.shape[0].value is not None else None)
      #將t擴充套件一個維度,然後使用tile函式複製
      tiled = array_ops.tile(array_ops.expand_dims(t, 1), tiling)
      #將tile之後的tensor進行reshape變成[batch_size*beam_size, ...]
      tiled = array_ops.reshape(tiled, array_ops.concat(([shape_t[0] * multiplier], shape_t[1:]), 0))
      tiled.set_shape(tensor_shape.TensorShape([tiled_static_batch_size]).concatenate(t.shape[1:]))
      return tiled

通過下面這個例子看一下上面這個函式的功效:

    a = tf.constant([[1,2,3], [4,5,6]]) # batch_size為2 [2, 3]
    tiling = [1, 3, 1] # 取beam_size=3
    tiled = tf.tile(tf.expand_dims(a, 1), tiling) # 將a的每個元素複製三次
    sess.run(tiled)
    輸出:array([[[1, 2, 3],
                [1, 2, 3],
                [1, 2, 3]],
               [[4, 5, 6],
                [4, 5, 6],
                [4, 5, 6]]])
    tiled = tf.reshape(tiled, tf.concat(([6], [3]), 0)) # 6=2*3,進行reshape,變成[6, 3]
    sess.run(tiled)
    Out[11]: 
    array([[1, 2, 3],
           [1, 2, 3],
           [1, 2, 3],
           [4, 5, 6],
           [4, 5, 6],
           [4, 5, 6]])

BeamSearchDecoder

我們知道,BeamSearchDecoder其實就是一個Decoder類,跟BasicDecoder一樣。不過他不需要helper函式而已。下面看一下他的定義(去掉了一些沒用的判斷程式碼):


  def __init__(self,
               cell,
               embedding,
               start_tokens,
               end_token,
               initial_state,
               beam_width,
               output_layer=None,
               length_penalty_weight=0.0):
    #本函式主要是一個複製作用,將一些變數初始化以供解碼開始
    self._cell = cell
    self._output_layer = output_layer

    #注意這裡embedding既可以是一個矩陣變數,也可以是一個可呼叫的look_up函式。
    if callable(embedding):
      self._embedding_fn = embedding
    else:
      self._embedding_fn = (
          lambda ids: embedding_ops.embedding_lookup(embedding, ids))

    self._start_tokens = ops.convert_to_tensor(start_tokens, dtype=dtypes.int32, name="start_tokens")
    self._end_token = ops.convert_to_tensor(end_token, dtype=dtypes.int32, name="end_token")

    self._batch_size = array_ops.size(start_tokens)
    self._beam_width = beam_width
    self._length_penalty_weight = length_penalty_weight
    self._initial_cell_state = nest.map_structure(self._maybe_split_batch_beams, initial_state, self._cell.state_size)
    #將start_token擴充套件到batch_size*beam_size維度,並進行embedding得到其詞向量
    self._start_tokens = array_ops.tile(array_ops.expand_dims(self._start_tokens, 1), [1, self._beam_width])
    self._start_inputs = self._embedding_fn(self._start_tokens)
    #finished變數也進行擴充套件
    self._finished = array_ops.zeros([self._batch_size, self._beam_width], dtype=dtypes.bool)

然後看一下step函式,我們知道step就是while_loop裡面每一次解碼呼叫的函式,所以這裡實現了主要功能。而這裡跟BasicDecoder一樣,先是呼叫cell_outputs, next_cell_state = self._cell(inputs, cell_state)函式執行RNNCell,得到本時刻的輸出和狀態,接下來會將其shape由[batch_size*beam_szie, vocab_size]轉換為[batch_szie, beam_size, vocab_szie]的格式,然後呼叫_beam_search_step()函式選擇輸出併產生下一時刻的輸入,其實這部分相當於helper類的功能。

在看程式碼之前首先要明白一個概念就是,因為一共需要獲得beam_size個序列,但是這些序列可能在到達最大長度之前就會產生符號,也就是說有些序列會比較早結束編碼,而有些序列可能會一直編碼到最後一步。那如何標識序列是否已經解碼完畢呢,就是加一個finished和length變數,記錄每個序列是否編碼結束以及最終的長度。而且_beam_search_step函式很大一部分篇幅都是在進行這個工作(其實我倒感覺為了程式碼簡單起見不如直接全部解碼到最大長度,然後在轉換成字串的時候如果出現了eos,就不管後面的符號即可)。

下面主要將_beam_search_step函式的實現:

    def _beam_search_step(time, logits, next_cell_state, beam_state, batch_size,
                          beam_width, end_token, length_penalty_weight):
      """Performs a single step of Beam Search Decoding.
      Args:
        time: 解碼步數,從零開始。第一步是因為輸入全都是start_token,所以這裡只取第一個輸入的前beam_size個輸出
        logits: cell的輸出為[batch_size*beam_size, vocab_size],先將其轉化為[batch_size, beam_width, vocab_size]在輸入
        next_cell_state: cell輸出的下一時刻state
        beam_state:  An instance of `BeamSearchDecoderState`.
        batch_size: The batch size for this input.
        beam_width: Python int.  The size of the beams.
        end_token: The int32 end token.
        length_penalty_weight: Float weight to penalize length. Disabled with 0.0.
      """
      static_batch_size = tensor_util.constant_value(batch_size)

      # Calculate the current lengths of the predictions
      prediction_lengths = beam_state.lengths
      previously_finished = beam_state.finished
      #對cell的輸出概率進行softmax歸一化,如果某個beam已經結束則給其新增eos結束編碼,其他的保持不變。然後與之前序列的概率值相加,以便後面選擇概率最大的幾個序列
      step_log_probs = nn_ops.log_softmax(logits)
      step_log_probs = _mask_probs(step_log_probs, end_token, previously_finished)
      total_probs = array_ops.expand_dims(beam_state.log_probs, 2) + step_log_probs

      # 對於還沒有結束編碼的序列,為其新增長度標識.
      vocab_size = logits.shape[-1].value or array_ops.shape(logits)[-1]
      lengths_to_add = array_ops.one_hot(indices=array_ops.tile(array_ops.reshape(end_token, [1, 1]), [batch_size, beam_width]), depth=vocab_size, on_value=constant_op.constant(0, dtype=dtypes.int64), off_value=constant_op.constant(1, dtype=dtypes.int64),  dtype=dtypes.int64)
      add_mask = (1 - math_ops.to_int64(previously_finished))
      lengths_to_add = array_ops.expand_dims(add_mask, 2) * lengths_to_add
      new_prediction_lengths = (lengths_to_add + array_ops.expand_dims(prediction_lengths, 2))

      # 根據長度重新計算每個序列的得分。比如不想要很長的序列時可以對長度進行懲罰
      scores = _get_scores(
          log_probs=total_probs,
          sequence_lengths=new_prediction_lengths,
          length_penalty_weight=length_penalty_weight)

      time = ops.convert_to_tensor(time, name="time")
      # 第一次計算時只計算第一個序列的輸出即可,後面則需要對所有序列計算求他們的前K個最大值
      scores_shape = array_ops.shape(scores)
      scores_flat = control_flow_ops.cond(time > 0, lambda: array_ops.reshape(scores, [batch_size, -1]), lambda: scores[:, 0])
      num_available_beam = control_flow_ops.cond(time > 0, lambda: math_ops.reduce_prod(scores_shape[1:]), lambda: math_ops.reduce_prod(scores_shape[2:]))

      # next_beam_size為beam_width和num_available_beam的最小值,因為可能在最後一個編碼階段,有正常輸出的序列總共都不到beam_width個,所以這裡進行一次取最小值操作。然後選擇得分最高的next_beam_size個序列作為結果
      next_beam_size = math_ops.minimum(ops.convert_to_tensor(beam_width, dtype=dtypes.int32, name="beam_width"), num_available_beam)
      next_beam_scores, word_indices = nn_ops.top_k(scores_flat, k=next_beam_size)
      #將結果reshape成[static_batch_size, beam_width],也就說每次編碼結束後,對batch中每個樣本最終只會保留beam_width個概率最大的序列
      next_beam_scores.set_shape([static_batch_size, beam_width])
      word_indices.set_shape([static_batch_size, beam_width])

      # Pick out the probs, beam_ids, and states according to the chosen predictions
      next_beam_probs = _tensor_gather_helper(
          gather_indices=word_indices,
          gather_from=total_probs,
          batch_size=batch_size,
          range_size=beam_width * vocab_size,
          gather_shape=[-1],
          name="next_beam_probs")
      # Note: just doing the following
      #   math_ops.to_int32(word_indices % vocab_size,
      #       name="next_beam_word_ids")
      # would be a lot cleaner but for reasons unclear, that hides the results of
      # the op which prevents capturing it with tfdbg debug ops.
      raw_next_word_ids = math_ops.mod(word_indices, vocab_size,
                                       name="next_beam_word_ids")
      next_word_ids = math_ops.to_int32(raw_next_word_ids)
      next_beam_ids = math_ops.to_int32(word_indices / vocab_size,
                                        name="next_beam_parent_ids")

      # Append new ids to current predictions
      previously_finished = _tensor_gather_helper(
          gather_indices=next_beam_ids,
          gather_from=previously_finished,
          batch_size=batch_size,
          range_size=beam_width,
          gather_shape=[-1])
      next_finished = math_ops.logical_or(previously_finished,
                                          math_ops.equal(next_word_ids, end_token),
                                          name="next_beam_finished")

      # Calculate the length of the next predictions.
      # 1. Finished beams remain unchanged
      # 2. Beams that are now finished (EOS predicted) remain unchanged
      # 3. Beams that are not yet finished have their length increased by 1
      lengths_to_add = math_ops.to_int64(
          math_ops.not_equal(next_word_ids, end_token))
      lengths_to_add = (1 - math_ops.to_int64(next_finished)) * lengths_to_add
      next_prediction_len = _tensor_gather_helper(
          gather_indices=next_beam_ids,
          gather_from=beam_state.lengths,
          batch_size=batch_size,
          range_size=beam_width,
          gather_shape=[-1])
      next_prediction_len += lengths_to_add

      # Pick out the cell_states according to the next_beam_ids. We use a
      # different gather_shape here because the cell_state tensors, i.e.
      # the tensors that would be gathered from, all have dimension
      # greater than two and we need to preserve those dimensions.
      # pylint: disable=g-long-lambda
      next_cell_state = nest.map_structure(
          lambda gather_from: _maybe_tensor_gather_helper(
              gather_indices=next_beam_ids,
              gather_from=gather_from,
              batch_size=batch_size,
              range_size=beam_width,
              gather_shape=[batch_size * beam_width, -1]),
          next_cell_state)
      # pylint: enable=g-long-lambda

      next_state = BeamSearchDecoderState(
          cell_state=next_cell_state,
          log_probs=next_beam_probs,
          lengths=next_prediction_len,
          finished=next_finished)

      output = BeamSearchDecoderOutput(
          scores=next_beam_scores,
          predicted_ids=next_word_ids,
          parent_ids=next_beam_ids)

      return output, next_state

至此我們就大致的分析了一下tf.contrib.seq2seq的原始碼,相比看完之後大家應該就有了新的認識,可以自己動手寫程式碼寫程式了。可以試著基於CustomHelper寫自己的Helper類實現自定義的seq2seq模型~~