本文翻译自: 《Multi-GPU processing with data parallelism》, 如有侵权请联系删除, 仅限于学术交流, 请勿商用。 如有谬误, 请联系指出。
如果你使用类似 C++这样的语言在单核 CPU 上编写你的软件, 为使其能够在多个 GPU 上并行运行, 你可能需要从头开始重写你的软件。 但是在 TensorFlow 中并非如此。 由于其符号性质, tensorflow 可以隐藏所有这些复杂的过程, 使你无需在多个 CPU 和 GPU 上扩展程序。
让我们从在 CPU 上添加两个向量开始:
import tensorflow as tf
with tf.device(tf.DeviceSpec(device_type="CPU", device_index=0)):
a = tf.random_uniform([1000, 100])
b = tf.random_uniform([1000, 100])
c = a + b
tf.Session().run(c)
同样的事情在 GPU 上也可以简单地完成:
with tf.device(tf.DeviceSpec(device_type="GPU", device_index=0)):
a = tf.random_uniform([1000, 100])
b = tf.random_uniform([1000, 100])
c = a + b
但是, 如果我们有两个 GPU 并希望同时使用它们呢? 为此, 我们可以把数据分成两份, 并让每个 GPU 单独处理一个部分:
split_a = tf.split(a, 2)
split_b = tf.split(b, 2)
split_c = []
for i in range(2):
with tf.device(tf.DeviceSpec(device_type="GPU", device_index=i)):
split_c.append(split_a[i] + split_b[i])
c = tf.concat(split_c, axis=0)
让我们以更一般的形式重写它, 以便我们可以用任何其他操作集替换添加:
def make_parallel(fn, num_gpus, **kwargs):
in_splits = {}
for k, v in kwargs.items():
in_splits[k] = tf.split(v, num_gpus)
out_split = []
for i in range(num_gpus):
with tf.device(tf.DeviceSpec(device_type="GPU", device_index=i)):
with tf.variable_scope(tf.get_variable_scope(), reuse=tf.AUTO_REUSE):
out_split.append(fn(**{k : v[i] for k, v in in_splits.items()}))
return tf.concat(out_split, axis=0)
def model(a, b):
return a + b
c = make_parallel(model, 2, a=a, b=b)
你可以使用任何一个将张量作为输入并返回张量的函数来替换模型, 限定条件是输入和输出都必须在一个批次(batch)内。 值得注意的是, 我们还添加了一个变量作用域并将 reuse
属性设置为 true。 这个操作确保我们可以使用相同的变量来处理两个部分的数据。 如此操作让我们在下一个例子中变得很方便。
让我们看一个稍微更实际的例子。 我们想在多个 GPU 上训练神经网络。 在训练期间, 我们不仅需要计算前向传播, 还需要计算后向传播(梯度变化)。 但是我们如何并行化梯度计算呢? 事实证明这很简单。
回忆一下第一项我们想要把一个二阶多项式拟合到一组样本中。 我们对代码进行了一些重组, 以便在模型函数中进行大量的操作:
import numpy as np
import tensorflow as tf
def model(x, y):
w = tf.get_variable("w", shape=[3, 1])
f = tf.stack([tf.square(x), x, tf.ones_like(x)], 1)
yhat = tf.squeeze(tf.matmul(f, w), 1)
loss = tf.square(yhat - y)
return loss
x = tf.placeholder(tf.float32)
y = tf.placeholder(tf.float32)
loss = model(x, y)
train_op = tf.train.AdamOptimizer(0.1).minimize(
tf.reduce_mean(loss))
def generate_data():
x_val = np.random.uniform(-10.0, 10.0, size=100)
y_val = 5 * np.square(x_val) + 3
return x_val, y_val
sess = tf.Session()
sess.run(tf.global_variables_initializer())
for _ in range(1000):
x_val, y_val = generate_data()
_, loss_val = sess.run([train_op, loss], {x: x_val, y: y_val})
_, loss_val = sess.run([train_op, loss], {x: x_val, y: y_val})
print(sess.run(tf.contrib.framework.get_variables_by_name("w")))
现在让我们使用我们刚刚编写的 make_parallel
函数来并行化这个操作吧。 我们只需要从上面的代码中更改两行代码:
loss = make_parallel(model, 2, x=x, y=y)
train_op = tf.train.AdamOptimizer(0.1).minimize(
tf.reduce_mean(loss),
colocate_gradients_with_ops=True)
要并行化梯度的反向传播, 唯一需要改变的是将 colocate_gradients_with_ops
设置为true。 这确保了梯度操作可以在与初始操作相同的设备上运行。