1. 程式人生 > >tensorflow源碼學習之五 -- 同步訓練和異步訓練

tensorflow源碼學習之五 -- 同步訓練和異步訓練

stack location warning 可能 oss implicit mov -i ner

同步和異步訓練是由optimizer來決定的。

1. 同步訓練

同步訓練需要使用SyncReplicasOptimizer,參考https://www.tensorflow.org/api_docs/python/tf/train/SyncReplicasOptimizer 。其他optimizer都屬於異步訓練方式。

同步訓練實現在sync_replicas_optimizer.py文件中的def apply_gradient()方法中。假設有n個參數:

對於PS,需要創建n個參數收集器(每個參數對應一個收集器),每一個worker將自己計算得到的grad梯度推送到收集器上(推送是使用Send/Recv OP實現的)。每個參數收集器收集到所有的worker的推送值時,對所有的值求平均,然後更新參數的值。當所有的參數都更新完成之後,對global_step加1,並將global_step推送到每個worker的token_queue中,worker更新global_step,開始下一次訓練。

對於Worker,從PS拉取需要的參數,計算grad梯度值,然後將grad推送到相應的參數收集器。推送之後從token_queue中拉取新的global_step(拉取不到新的global_step 就等待?),繼續下一次訓練。

2. 異步訓練

訓練代碼中使用的是GradientDescentOptimizer(繼承了Optimizer),調用其minimize()方法,minimize()方法就是先調用compute_gradients()然後調用apply_gradient()方法。

異步訓練的實現在optimizer.py文件中的def apply_gradient()方法中(GradientDescentOptimizer沒有重寫Optimizer的apply_gradient()方法)。參考https://stackoverflow.com/questions/43147435/how-does-asynchronous-training-work-in-distributed-tensorflow。

對於Worker,worker從PS拉取需要的參數,拉取過程是沒有鎖的,因此拉取的值可能包含了其他worker的修改,也可能沒包含。計算gard梯度值,然後將grad梯度值發送給相應PS。

對於PS,ps收到grad值之後根據優化算法(如,SGD, SGD with Momentum, Adagrad, Adam, etc.)來更新參數。

:在異步訓練中,假設worker1讀取參數w1,worker2再讀取參數w1,然後worker1更新梯度,worker2再更新梯度,worker1更新的梯度就被worker2覆蓋掉了。如果想對修改做同步,GradientDescentOptimizer的構造函數提供了use_locking參數。

代碼邏輯如下:

def apply_gradients(self, grads_and_vars, global_step=None, name=None):
  """Apply gradients to variables.

  This is the second part of `minimize()`. It returns an `Operation` that
  applies gradients.

  Args:
    grads_and_vars: List of (gradient, variable) pairs as returned by
      `compute_gradients()`.
    global_step: Optional `Variable` to increment by one after the
      variables have been updated.
    name: Optional name for the returned operation.  Default to the
      name passed to the `Optimizer` constructor.

  Returns:
    An `Operation` that applies the specified gradients. If `global_step`
    was not None, that operation also increments `global_step`.

  Raises:
    TypeError: If `grads_and_vars` is malformed.
    ValueError: If none of the variables have gradients.
    RuntimeError: If you should use `_distributed_apply()` instead.
  """
  # This is a default implementation of apply_gradients() that can be shared
  # by most optimizers.  It relies on the subclass implementing the following
  # methods: _create_slots(), _prepare(), _apply_dense(), and _apply_sparse().

  # Handle DistributionStrategy case.
  if distribute_lib.get_cross_tower_context():
    raise RuntimeError("Use `_distributed_apply()` instead of "
                       "`apply_gradients()` in a cross-tower context.")
  # TODO(isaprykin): Get rid of `has_distribution_strategy()` check by
  # always calling _distributed_apply(), using the default distribution
  # as needed.
  if distribute_lib.has_distribution_strategy():
    grads_and_vars = get_filtered_grad_fn(lambda _: grads_and_vars)()
    return distribute_lib.get_tower_context().merge_call(
        self._distributed_apply, grads_and_vars, global_step, name)

  # No DistributionStrategy case.
  grads_and_vars = tuple(grads_and_vars)  # Make sure repeat iteration works.
  if not grads_and_vars:
    raise ValueError("No variables provided.")
  converted_grads_and_vars = []
  for g, v in grads_and_vars:
    if g is not None:
      try:
        # Convert the grad to Tensor or IndexedSlices if necessary.
        g = ops.convert_to_tensor_or_indexed_slices(g)
      except TypeError:
        raise TypeError(
            "Gradient must be convertible to a Tensor"
            " or IndexedSlices, or None: %s" % g)
      if not isinstance(g, (ops.Tensor, ops.IndexedSlices)):
        raise TypeError(
            "Gradient must be a Tensor, IndexedSlices, or None: %s" % g)
    p = _get_processor(v) # _RefVariableProcessor
    converted_grads_and_vars.append((g, v, p)) # v._ref() = Tensor("weights/Variable:0", shape=(784, 10), dtype=float32_ref, device=/job:ps/task:0)
  # ((<tf.Tensor ‘train/gradients/softmax/MatMul_grad/tuple/control_dependency_1:0‘ shape=(784, 10) dtype=float32>, <tf.Variable ‘weights/Variable:0‘ shape=(784, 10) dtype=float32_ref>, <tensorflow.python.training.optimizer._RefVariableProcessor object at 0x7f6798012410>), (<tf.Tensor ‘train/gradients/softmax/Add_grad/tuple/control_dependency_1:0‘ shape=(10,) dtype=float32>, <tf.Variable ‘biases/Variable:0‘ shape=(10,) dtype=float32_ref>, <tensorflow.python.training.optimizer._RefVariableProcessor object at 0x7f67980124d0>))
  converted_grads_and_vars = tuple(converted_grads_and_vars)
  var_list = [v for g, v, _ in converted_grads_and_vars if g is not None]
  if not var_list:
    raise ValueError("No gradients provided for any variable: %s." %
                     ([str(v) for _, _, v in converted_grads_and_vars],))
  with ops.init_scope():
    self._create_slots(var_list)
  update_ops = []
  with ops.name_scope(name, self._name) as name:
    self._prepare()
    for grad, var, processor in converted_grads_and_vars:
      if grad is None:
        continue
      # We colocate all ops created in _apply_dense or _apply_sparse
      # on the same device as the variable.
      # TODO(apassos): figure out how to get the variable name here.
      if context.executing_eagerly() or isinstance(
          var,
          resource_variable_ops.ResourceVariable) and not var._in_graph_mode:  # pylint: disable=protected-access
        scope_name = ""
      else:
        scope_name = var.op.name # var.op = {name: "weights/Variable", op: "VariableV2", device: "/job:ps/task:0"}
      with ops.name_scope("update_" + scope_name), ops.colocate_with(var):
        update_ops.append(processor.update_op(self, grad)) # 111行 def update_op() 更新op,worker->ps
    if global_step is None:
      apply_updates = self._finish(update_ops, name)
    else:
      with ops.control_dependencies([self._finish(update_ops, "update")]):
        with ops.colocate_with(global_step):
          if isinstance(global_step, resource_variable_ops.ResourceVariable):
            # TODO(apassos): the implicit read in assign_add is slow; consider
            # making it less so.
            apply_updates = resource_variable_ops.assign_add_variable_op(
                global_step.handle,
                ops.convert_to_tensor(1, dtype=global_step.dtype),
                name=name)
          else:
            apply_updates = state_ops.assign_add(global_step, 1, name=name)

    if not context.executing_eagerly():
      if isinstance(apply_updates, ops.Tensor):
        apply_updates = apply_updates.op
      train_op = ops.get_collection_ref(ops.GraphKeys.TRAIN_OP)
      if apply_updates not in train_op:
        train_op.append(apply_updates)

    return apply_updates

apply_gradients()方法中調用了update_ops.append(processor.update_op(self, grad))方法:

def update_op(self, optimizer, g):
  if isinstance(g, ops.Tensor): # update_op = {name: "train/GradientDescent/update_weights/Variable/ApplyGradientDescent",op: "ApplyGradientDescent", input: "weights/Variable", input: "train/GradientDescent/learning_rate", input: "train/gradients/softmax/MatMul_grad/tuple/control_dependency_1", device: "/job:ps/task:0"}
    update_op = optimizer._apply_dense(g, self._v)  # pylint: disable=protected-access
    if self._v.constraint is not None:
      with ops.control_dependencies([update_op]):
        return self._v.assign(self._v.constraint(self._v))
    else:
      return update_op # return
  else:
    assert isinstance(g, ops.IndexedSlices), ("Gradient ", g, " is neither a "
                                              "tensor nor IndexedSlices.")
    if self._v.constraint is not None:
      raise RuntimeError(
          "Cannot use a constraint function on a sparse variable.")
    # pylint: disable=protected-access
    return optimizer._apply_sparse_duplicate_indices(g, self._v)

update_op(self, grad))方法調用了optimizer的_app_dense()方法,由於這裏的optimizer是GradientDescentOptimizer,所以是調用GradientDescentOptimizer的_app_dense()方法:

def _apply_dense(self, grad, var):
  return training_ops.apply_gradient_descent(
      var, # <tf.Variable ‘weights/Variable:0‘ shape=(784, 10) dtype=float32_ref>
      math_ops.cast(self._learning_rate_tensor, var.dtype.base_dtype), # <tf.Tensor ‘train/GradientDescent/learning_rate:0‘ shape=() dtype=float32>
      grad, # Tensor("train/gradients/softmax/MatMul_grad/tuple/control_dependency_1:0", shape=(784, 10), dtype=float32, device=/job:worker/task:0)
      use_locking=self._use_locking).op # false

又調用了apply_gradient_descent()方法:

def apply_gradient_descent(var, alpha, delta, use_locking=False, name=None):
  r"""Update ‘*var‘ by subtracting ‘alpha‘ * ‘delta‘ from it.

  Args:
    var: A mutable `Tensor`. Must be one of the following types: `float32`, `float64`, `int32`, `uint8`, `int16`, `int8`, `complex64`, `int64`, `qint8`, `quint8`, `qint32`, `bfloat16`, `uint16`, `complex128`, `half`, `uint32`, `uint64`.
      Should be from a Variable().
    alpha: A `Tensor`. Must have the same type as `var`.
      Scaling factor. Must be a scalar.
    delta: A `Tensor`. Must have the same type as `var`. The change.
    use_locking: An optional `bool`. Defaults to `False`.
      If `True`, the subtraction will be protected by a lock;

      otherwise the behavior is undefined, but may exhibit less contention.
    name: A name for the operation (optional).

  Returns:
    A mutable `Tensor`. Has the same type as `var`.
  """
  _ctx = _context._context
  if _ctx is None or not _ctx._eager_context.is_eager:
    if use_locking is None:
      use_locking = False
    use_locking = _execute.make_bool(use_locking, "use_locking")
    _, _, _op = _op_def_lib._apply_op_helper(
        "ApplyGradientDescent", var=var, alpha=alpha, delta=delta,
        use_locking=use_locking, name=name)
    _result = _op.outputs[:]
    _inputs_flat = _op.inputs
    _attrs = ("T", _op.get_attr("T"), "use_locking",
              _op.get_attr("use_locking"))
    _execute.record_gradient(
      "ApplyGradientDescent", _inputs_flat, _attrs, _result, name)
    _result, = _result
    return _result

  else:
    raise RuntimeError("apply_gradient_descent op does not support eager execution. Arg ‘out‘ is a ref.")

又調用了_apply_op_helper()方法:

# keywords = {‘var‘: <tf.Variable ‘weights/Variable:0‘ shape=(784, 10) dtype=float32_ref>, ‘alpha‘: <tf.Tensor ‘train/GradientDescent/learning_rate:0‘ shape=() dtype=float32>, ‘use_locking‘: False, ‘delta‘: <tf.Tensor ‘train/gradients/softmax/MatMul_grad/tuple/control_dependency_1:0‘ shape=(784, 10) dtype=float32>}
def _apply_op_helper(self, op_type_name, name=None, **keywords):
  """Implementation of apply_op that returns output_structure, op."""
  op_info = self._ops.get(op_type_name, None)
  if op_info is None:
    raise RuntimeError("Unrecognized Op name " + op_type_name)
  op_def = op_info.op_def

  # Fill in the list of default types for all "type" attrs.  This
  # will be used to choose a preferred dtype to convert to in the
  # absence of input type information.
  #
  # TODO(b/31302892): Currently the defaults don‘t work in the right
  # way if you have two inputs, one of whose type resolution depends
  # on the other.  Handling this will require restructuring this code
  # significantly.
  default_type_attr_map = {}
  for attr_def in op_def.attr:
    if attr_def.type != "type":
      continue
    key = attr_def.name
    if attr_def.HasField("default_value"):
      default_type_attr_map[key] = dtypes.as_dtype(
          attr_def.default_value.type)

  # Requires that op_def has passed validation (using the C++
  # ValidateOpDef() from ../framework/op_def_util.h).
  attrs = {}
  inputs = []
  input_types = []
  with g.as_default(), ops.name_scope(name) as scope:
    # keywords = {‘var‘: <tf.Variable ‘weights/Variable:0‘ shape=(784, 10) dtype=float32_ref>, ‘alpha‘: <tf.Tensor ‘train/GradientDescent/learning_rate:0‘ shape=() dtype=float32>, ‘use_locking‘: False, ‘delta‘: <tf.Tensor ‘train/gradients/softmax/MatMul_grad/tuple/control_dependency_1:0‘ shape=(784, 10) dtype=float32>}
    ...
    # NOTE(mrry): We add an explicit colocation constraint between
    # the newly created op and any of its reference-typed inputs.
    must_colocate_inputs = [val for arg, val in zip(op_def.input_arg, inputs)
                            if arg.is_ref]
    with _MaybeColocateWith(must_colocate_inputs):
      # Add Op to graph
      op = g.create_op(op_type_name, inputs, output_types, name=scope,
                       input_types=input_types, attrs=attr_protos,
                       op_def=op_def)
    return output_structure, op_def.is_stateful, op

該方法比較長,最後是調用了create_op():

def create_op(
    self,
    op_type,
    inputs,
    dtypes,  # pylint: disable=redefined-outer-name
    input_types=None,
    name=None,
    attrs=None,
    op_def=None,
    compute_shapes=True,
    compute_device=True):
  """Creates an `Operation` in this graph.

  This is a low-level interface for creating an `Operation`. Most
  programs will not call this method directly, and instead use the
  Python op constructors, such as `tf.constant()`, which add ops to
  the default graph.

  Args:
    op_type: The `Operation` type to create. This corresponds to the
      `OpDef.name` field for the proto that defines the operation.
    inputs: A list of `Tensor` objects that will be inputs to the `Operation`.
    dtypes: A list of `DType` objects that will be the types of the tensors
      that the operation produces.
    input_types: (Optional.) A list of `DType`s that will be the types of
      the tensors that the operation consumes. By default, uses the base
      `DType` of each input in `inputs`. Operations that expect
      reference-typed inputs must specify `input_types` explicitly.
    name: (Optional.) A string name for the operation. If not specified, a
      name is generated based on `op_type`.
    attrs: (Optional.) A dictionary where the key is the attribute name (a
      string) and the value is the respective `attr` attribute of the
      `NodeDef` proto that will represent the operation (an `AttrValue`
      proto).
    op_def: (Optional.) The `OpDef` proto that describes the `op_type` that
      the operation will have.
    compute_shapes: (Optional.) Deprecated. Has no effect (shapes are always
      computed).
    compute_device: (Optional.) If True, device functions will be executed
      to compute the device property of the Operation.

  Raises:
    TypeError: if any of the inputs is not a `Tensor`.
    ValueError: if colocation conflicts with existing device assignment.

  Returns:
    An `Operation` object.
  """
  del compute_shapes

  self._check_not_finalized()
  for idx, a in enumerate(inputs):
    if not isinstance(a, Tensor):
      raise TypeError("Input #%d is not a tensor: %s" % (idx, a))
  if name is None:
    name = op_type
  # If a names ends with a ‘/‘ it is a "name scope" and we use it as-is,
  # after removing the trailing ‘/‘.
  if name and name[-1] == "/":
    name = _name_from_scope_name(name)
  else:
    name = self.unique_name(name)

  node_def = _NodeDef(op_type, name, device=None, attrs=attrs)

  input_ops = set([t.op for t in inputs])
  control_inputs = self._control_dependencies_for_inputs(input_ops)
  # _create_op_helper mutates the new Operation. `_mutation_lock` ensures a
  # Session.run call cannot occur between creating and mutating the op.
  with self._mutation_lock():
    ret = Operation(
        node_def,
        self,
        inputs=inputs,
        output_types=dtypes,
        control_inputs=control_inputs,
        input_types=input_types,
        original_op=self._default_original_op,
        op_def=op_def)
    self._create_op_helper(ret, compute_device=compute_device)
  return ret

又調用了_create_op_helper():

def _create_op_helper(self, op, compute_device=True):
  """Common logic for creating an op in this graph."""
  # Apply any additional attributes requested. Do not overwrite any existing
  # attributes.
  for key, value in self._attr_scope_map.items():
    try:
      op.get_attr(key)
    except ValueError:
      if callable(value):
        value = value(op.node_def)
        if not isinstance(value, (type(None), attr_value_pb2.AttrValue)):
          raise TypeError(
              "Callable for scope map key ‘%s‘ must return either None or "
              "an AttrValue protocol buffer; but it returned: %s" % (key,
                                                                     value))
      if value:
        op._set_attr(key, value)  # pylint: disable=protected-access

  # Apply a kernel label if one has been specified for this op type.
  try:
    kernel_label = self._op_to_kernel_label_map[op.type]
    op._set_attr("_kernel",  # pylint: disable=protected-access
                 attr_value_pb2.AttrValue(s=compat.as_bytes(kernel_label)))
  except KeyError:
    pass

  # Apply the overriding op type for gradients if one has been specified for
  # this op type.
  try:
    mapped_op_type = self._gradient_override_map[op.type]
    op._set_attr("_gradient_op_type",  # pylint: disable=protected-access
                 attr_value_pb2.AttrValue(s=compat.as_bytes(mapped_op_type)))
  except KeyError:
    pass

  self._record_op_seen_by_control_dependencies(op)

  if compute_device:
    self._apply_device_functions(op)

  if self._colocation_stack:
    all_colocation_groups = []
    for colocation_op in self._colocation_stack:
      all_colocation_groups.extend(colocation_op.colocation_groups())
      if colocation_op.device:
        # Make this device match the device of the colocated op, to provide
        # consistency between the device and the colocation property.
        if (op.device and pydev.canonical_name(op.device) !=
            pydev.canonical_name(colocation_op.device)):
          logging.warning("Tried to colocate %s with an op %s that had "
                          "a different device: %s vs %s. Postponing "
                          "error-checking until all devices are assigned.",
                          op.name, colocation_op.name, op.device,
                          colocation_op.device)
        else:
          op._set_device(colocation_op.device)  # pylint: disable=protected-access

    all_colocation_groups = sorted(set(all_colocation_groups))
    # pylint: disable=protected-access
    op._set_attr("_class", attr_value_pb2.AttrValue(
        list=attr_value_pb2.AttrValue.ListValue(s=all_colocation_groups)))
    # pylint: enable=protected-access

  # Sets "container" attribute if
  # (1) self._container is not None
  # (2) "is_stateful" is set in OpDef
  # (3) "container" attribute is in OpDef
  # (4) "container" attribute is None
  if self._container and op.op_def.is_stateful:
    try:
      container_attr = op.get_attr("container")
    except ValueError:
      # "container" attribute is not in OpDef
      pass
    else:
      if not container_attr:
        op._set_attr("container", attr_value_pb2.AttrValue(  # pylint: disable=protected-access
            s=compat.as_bytes(self._container)))

其中有段邏輯

  if compute_device:
    self._apply_device_functions(op)

compute_device = True,會接著調用_apply_device_functions(op):

def _apply_device_functions(self, op):
  """Applies the current device function stack to the given operation."""
  # Apply any device functions in reverse order, so that the most recently
  # pushed function has the first chance to apply a device to the op.
  # We apply here because the result can depend on the Operation‘s
  # signature, which is computed in the Operation constructor.
  for device_function in reversed(self._device_function_stack):
    if device_function is None:
      break
    op._set_device(device_function(op))  # pylint: disable=protected-access

這個方法裏為op分配的設備。分配策略為replica_device_setter()方法設置的策略。

參考:
[1] http://jcf94.com/2018/01/13/2018-01-13-tfunpacking/ (session.run())
[2] http://jcf94.com/2018/01/23/2018-01-23-tfunpacking2/ (tf數據流模型和自動求導)
[3] http://jcf94.com/2018/02/28/2018-02-28-tfunpacking3/ (graph和node)
[4] http://jcf94.com/2018/03/07/2018-03-07-tfunpacking4/ (device)
[5] http://jcf94.com/2018/03/09/2018-03-09-tfunpacking5/ (distributed)
[6] https://www.tensorflow.org/deploy/distributed (distributed tensorflow)
[7] https://stackoverflow.com/questions/43147435/how-does-asynchronous-training-work-in-distributed-tensorflow (asynchronous training in distributed tensorflow)

tensorflow源碼學習之五 -- 同步訓練和異步訓練