使用FleetAPI进行分布式训练

    下面会针对Fleet API最常见的两种使用场景,用一个模型做示例,目的是让用户有快速上手体验的模板。

    • 假设我们定义MLP网络如下:

    • 定义一个在内存生成数据的Reader如下:

      1. def gen_data():
      2. return {"x": np.random.random(size=(128, 32)).astype('float32'),
      3. "y": np.random.randint(2, size=(128, 1)).astype('int64')}
    • 单机Trainer定义

      1. import paddle
      2. from nets import mlp
      3. from utils import gen_data
      4. input_x = paddle.static.data(name="x", shape=[None, 32], dtype='float32')
      5. input_y = paddle.static.data(name="y", shape=[None, 1], dtype='int64')
      6. cost = mlp(input_x, input_y)
      7. optimizer = paddle.optimizer.SGD(learning_rate=0.01)
      8. optimizer.minimize(cost)
      9. place = paddle.CUDAPlace(0)
      10. exe = paddle.static.Executor(place)
      11. exe.run(paddle.static.default_startup_program())
      12. step = 1001
      13. for i in range(step):
      14. cost_val = exe.run(feed=gen_data(), fetch_list=[cost.name])
      15. print("step%d cost=%f" % (i, cost_val[0]))
    • Parameter Server训练方法

      参数服务器方法对于大规模数据,简单模型的并行训练非常适用,我们基于单机模型的定义给出使用Parameter Server进行训练的示例如下:

    • Collective训练方法

      1. import paddle
      2. import paddle.distributed.fleet.base.role_maker as role_maker
      3. import paddle.distributed.fleet as fleet
      4. from nets import mlp
      5. from utils import gen_data
      6. input_x = paddle.static.data(name="x", shape=[None, 32], dtype='float32')
      7. input_y = paddle.static.data(name="y", shape=[None, 1], dtype='int64')
      8. cost = mlp(input_x, input_y)
      9. optimizer = paddle.optimizer.SGD(learning_rate=0.01)
      10. role = role_maker.PaddleCloudRoleMaker(is_collective=True)
      11. fleet.init(role)
      12. optimizer = fleet.distributed_optimizer(optimizer)
      13. optimizer.minimize(cost)
      14. place = paddle.CUDAPlace(0)
      15. exe = paddle.static.Executor(place)
      16. exe.run(paddle.static.default_startup_program())
      17. step = 1001
      18. cost_val = exe.run(
      19. program=paddle.static.default_main_program(),
      20. feed=gen_data(),
      21. fetch_list=[cost.name])
      22. print("worker_index: %d, step%d cost = %f" %
      23. (fleet.worker_index(), i, cost_val[0]))
    • init(role_maker=None)

      • fleet初始化,需要在使用fleet其他接口前先调用,用于定义多机的环境配置
    • is_worker()

      • Parameter Server训练中使用,判断当前节点是否是Worker节点,是则返回True,否则返回False
    • is_server(model_dir=None)

      • Parameter Server训练中使用,判断当前节点是否是Server节点,是则返回True,否则返回False
    • run_server()

      • Parameter Server训练中使用,用来启动server端服务
    • init_worker()

      • Parameter Server训练中使用,用来启动worker端服务
    • stop_worker()

      • 训练结束后,停止worker
      • 分布式优化算法装饰器,用户可带入单机optimizer,并配置分布式训练策略,返回一个分布式的optimizer

    RoleMaker

    • PaddleCloudRoleMaker

      • 描述:PaddleCloudRoleMaker是一个高级封装,支持使用paddle.distributed.launch或者paddle.distributed.launch_ps启动脚本

      • Parameter Server训练示例:

        1. import paddle
        2. paddle.enable_static()
        3. import paddle.distributed.fleet.base.role_maker as role_maker
        4. import paddle.distributed.fleet as fleet
        5. role = role_maker.PaddleCloudRoleMaker()
        6. fleet.init(role)
      • 启动方法:

      • Collective训练示例:

        1. import paddle
        2. paddle.enable_static()
        3. import paddle.distributed.fleet.base.role_maker as role_maker
        4. import paddle.distributed.fleet as fleet
        5. role = role_maker.PaddleCloudRoleMaker(is_collective=True)
        6. fleet.init(role)
      • 启动方法:

        1. python -m paddle.distributed.launch trainer.py
    • UserDefinedRoleMaker

      • 示例: