Tensorflow

本文翻译自: 《Multi-GPU processing with data parallelism》, 如有侵权请联系删除, 仅限于学术交流, 请勿商用。 如有谬误, 请联系指出。

如果你使用类似 C++这样的语言在单核 CPU 上编写你的软件, 为使其能够在多个 GPU 上并行运行, 你可能需要从头开始重写你的软件。 但是在 TensorFlow 中并非如此。 由于其符号性质, tensorflow 可以隐藏所有这些复杂的过程, 使你无需在多个 CPU 和 GPU 上扩展程序。

让我们从在 CPU 上添加两个向量开始:

import tensorflow as tf

with tf.device(tf.DeviceSpec(device_type="CPU", device_index=0)):
   a = tf.random_uniform([1000, 100])
   b = tf.random_uniform([1000, 100])
   c = a + b

tf.Session().run(c)

同样的事情在 GPU 上也可以简单地完成:

with tf.device(tf.DeviceSpec(device_type="GPU", device_index=0)):
    a = tf.random_uniform([1000, 100])
    b = tf.random_uniform([1000, 100])
    c = a + b

但是, 如果我们有两个 GPU 并希望同时使用它们呢? 为此, 我们可以把数据分成两份, 并让每个 GPU 单独处理一个部分:

split_a = tf.split(a, 2)
split_b = tf.split(b, 2)

split_c = []
for i in range(2):
    with tf.device(tf.DeviceSpec(device_type="GPU", device_index=i)):
        split_c.append(split_a[i] + split_b[i])

c = tf.concat(split_c, axis=0)

让我们以更一般的形式重写它, 以便我们可以用任何其他操作集替换添加:

def make_parallel(fn, num_gpus, **kwargs):
    in_splits = {}
    for k, v in kwargs.items():
        in_splits[k] = tf.split(v, num_gpus)

    out_split = []
    for i in range(num_gpus):
        with tf.device(tf.DeviceSpec(device_type="GPU", device_index=i)):
            with tf.variable_scope(tf.get_variable_scope(), reuse=tf.AUTO_REUSE):
                out_split.append(fn(**{k : v[i] for k, v in in_splits.items()}))

    return tf.concat(out_split, axis=0)

def model(a, b):
    return a + b

c = make_parallel(model, 2, a=a, b=b)

你可以使用任何一个将张量作为输入并返回张量的函数来替换模型, 限定条件是输入和输出都必须在一个批次(batch)内。 值得注意的是, 我们还添加了一个变量作用域并将 reuse 属性设置为 true。 这个操作确保我们可以使用相同的变量来处理两个部分的数据。 如此操作让我们在下一个例子中变得很方便。

让我们看一个稍微更实际的例子。 我们想在多个 GPU 上训练神经网络。 在训练期间, 我们不仅需要计算前向传播, 还需要计算后向传播(梯度变化)。 但是我们如何并行化梯度计算呢? 事实证明这很简单。

回忆一下第一项我们想要把一个二阶多项式拟合到一组样本中。 我们对代码进行了一些重组, 以便在模型函数中进行大量的操作:

import numpy as np
import tensorflow as tf

def model(x, y):
    w = tf.get_variable("w", shape=[3, 1])

    f = tf.stack([tf.square(x), x, tf.ones_like(x)], 1)
    yhat = tf.squeeze(tf.matmul(f, w), 1)

    loss = tf.square(yhat - y)
    return loss

x = tf.placeholder(tf.float32)
y = tf.placeholder(tf.float32)

loss = model(x, y)

train_op = tf.train.AdamOptimizer(0.1).minimize(
    tf.reduce_mean(loss))

def generate_data():
    x_val = np.random.uniform(-10.0, 10.0, size=100)
    y_val = 5 * np.square(x_val) + 3
    return x_val, y_val

sess = tf.Session()
sess.run(tf.global_variables_initializer())
for _ in range(1000):
    x_val, y_val = generate_data()
    _, loss_val = sess.run([train_op, loss], {x: x_val, y: y_val})

_, loss_val = sess.run([train_op, loss], {x: x_val, y: y_val})
print(sess.run(tf.contrib.framework.get_variables_by_name("w")))

现在让我们使用我们刚刚编写的 make_parallel 函数来并行化这个操作吧。 我们只需要从上面的代码中更改两行代码:

loss = make_parallel(model, 2, x=x, y=y)

train_op = tf.train.AdamOptimizer(0.1).minimize(
    tf.reduce_mean(loss),
    colocate_gradients_with_ops=True)

要并行化梯度的反向传播, 唯一需要改变的是将 colocate_gradients_with_ops 设置为true。 这确保了梯度操作可以在与初始操作相同的设备上运行。