1. 快速开始

在大数据浪潮的推动下,有标签训练数据的规模取得了飞速的增长。现在人们通常用数百万甚至上千万的有标签图像来训练图像分类器(如,ImageNet包含1400万幅图像,涵盖两万多个种类),用成千上万小时的语音数据来训练语音模型(如,Deep Speech 2系统使用了11940小时的语音数据以及超过200万句表述来训练语音识别模型)。在真实的业务场景中,训练数据的规模可以达到上述数据集的数十倍甚至数百倍,如此庞大的数据需要消耗大量的计算资源和训练时间使模型达到收敛状态(数天时间)。

为了提高模型的训练效率,分布式训练应运而生,其中基于参数服务器的分布式训练为一种常见的中心化共享参数的同步方式。与单机训练不同的是在参数服务器分布式训练中,各个节点充当着不同的角色:

  • 训练节点:该节点负责完成数据读取、前向计算、反向梯度计算等过程,并将计算出的梯度上传至服务节点。

  • 服务节点:在收到所有训练节点传来的梯度后,该节点会将梯度聚合并更新参数。最后将参数发送给训练节点,开始新一轮的训练。

根据参数更新的方式不同,可以分为同步/异步/Geo异步三种:

  • 同步训练:在同步参数服务器分布式训练中,所有训练节点的进度保持一致。每训练完一个Batch后,训练节点会上传梯度,然后开始等待服务节点返回更新后的参数。服务节点拿到所有训练节点上传的梯度后,才会对参数进行更新。因此,在任何一个时间点,所有训练节点都处于相同的训练阶段。

  • 异步训练:与同步训练不同,在异步训练中任何两个训练节点之间的参数更新都互不影响。每一个训练节点完成训练、上传梯度后,服务节点都会立即更新参数并将结果返回至相应的训练节点。拿到最新的参数后,该训练节点会立即开始新一轮的训练。

  • GEO异步训练:与前两种训练不同,GEO异步训练也是一种异步训练模式,每个训练节点本地都会拥有完整的训练流程(前向-反向-参数优化),训练过程中任何两个训练节点之间的参数更新都互不影响。每训练到一定的轮次(Step) 训练节点会将本地的参数计算一次差值(Step间隔带来的参数差值),将差值发送给服务端累计更新,并拿到最新的参数后,该训练节点会立即开始新一轮的训练。

下面我们将通过例子,为您介绍同步/异步/Geo异步训练在Fleet中的实现,本次快速开始的示例代码位于https://github.com/PaddlePaddle/FleetX/tree/develop/examples/wide_and_deep。

1.1. 实用样例

1.1.1. 导入依赖

import paddle
import os
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker

1.1.2. 定义分布式模式并初始化

通过fleet.init()接口,用户可以定义训练相关的环境,注意此环境是用户预先在环境变量中配置好的,包括:训练节点个数,服务节点个数,当前节点的序号,服务节点完整的IP:PORT列表等。

# 当前参数服务器模式只支持静态图模式, 因此训练前必须指定`paddle.enable_static()`
paddle.enable_static()
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)

1.1.3. 加载模型及数据

# 模型定义参考examples/wide_and_deep中model.py
from model import net
from reader import data_reader

feeds, predict, avg_cost = net()

train_reader = paddle.batch(data_reader(), batch_size=4)
reader.decorate_sample_list_generator(train_reader)

1.1.4. 定义同步训练 Strategy 及 Optimizer

在Fleet API中,用户可以使用fleet.DistributedStrategy()接口定义自己想要使用的分布式策略。

其中a_sync选项用于定义参数服务器相关的策略,当其被设定为False时,分布式训练将在同步的模式下进行。反之,当其被设定成True时,分布式训练将在异步的模式下进行。

# 定义异步训练
dist_strategy = fleet.DistributedStrategy()
dist_strategy.a_sync = True

# 定义同步训练
dist_strategy = fleet.DistributedStrategy()
dist_strategy.a_sync = False

# 定义Geo异步训练, Geo异步目前只支持SGD优化算法
dist_strategy = fleet.DistributedStrategy()
dist_strategy.a_sync = True
dist_strategy.a_sync_configs = {"k_steps": 100}

optimizer = paddle.optimizer.SGD(learning_rate=0.0001)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)

1.1.5. 开始训练

完成模型及训练策略以后,我们就可以开始训练模型了。因为在参数服务器模式下会有不同的角色,所以根据不同节点分配不同的任务。

对于服务器节点,首先用init_server()接口对其进行初始化,然后启动服务并开始监听由训练节点传来的梯度。

同样对于训练节点,用init_worker()接口进行初始化后, 开始执行训练任务。运行exe.run()接口开始训练,并得到训练中每一步的损失值。

if fleet.is_server():
    fleet.init_server()
    fleet.run_server()
else:
    exe = paddle.static.Executor(paddle.CPUPlace())
    exe.run(paddle.static.default_startup_program())

    fleet.init_worker()

    for epoch_id in range(1):
        reader.start()
        try:
            while True:
                loss_val = exe.run(program=paddle.static.default_main_program(),
                                   fetch_list=[avg_cost.name])
                loss_val = np.mean(loss_val)
                print("TRAIN ---> pass: {} loss: {}\n".format(epoch_id,
                                                              loss_val))
        except paddle.core.EOFException:
            reader.reset()

    fleet.stop_worker()

1.1.6. 运行训练脚本

定义完训练脚本后,我们就可以用fleetrun指令运行分布式任务了。其中server_num, worker_num分别为服务节点和训练节点的数量。在本例中,服务节点有1个,训练节点有两个。

fleetrun --server_num=1 --worker_num=2 train.py