分布式学习指南
本指南介绍 LightGBM 中的分布式学习。分布式学习允许使用多台机器来生成单个模型。
首先请按照快速入门了解如何使用 LightGBM。
LightGBM 分布式学习工作原理
本节介绍 LightGBM 中的分布式学习工作原理。要了解如何在各种编程语言和框架中进行此操作,请参阅集成。
选择合适的并行算法
LightGBM 目前提供 3 种分布式学习算法。
并行算法 |
使用方法 |
---|---|
数据并行 |
|
特征并行 |
|
投票并行 |
|
这些算法适用于不同的场景,如下表所示
#数据量小 |
#数据量大 |
|
---|---|---|
#特征数少 |
特征并行 |
数据并行 |
#特征数多 |
特征并行 |
投票并行 |
有关这些并行算法的更多详细信息,请参阅分布式学习中的优化。
集成
本节介绍如何在各种编程语言和框架中运行分布式 LightGBM 训练。要了解 LightGBM 中分布式学习的一般工作原理,请参阅LightGBM 分布式学习工作原理。
Apache Spark
Apache Spark 用户可以使用 SynapseML 进行基于 LightGBM 的机器学习工作流。此项目非 LightGBM 维护者所维护。
请参阅此 SynapseML 示例,了解有关在 Spark 上使用 LightGBM 的更多信息。
注意
SynapseML
非 LightGBM 维护者所维护。错误报告或功能请求应提交至 https://github.com/microsoft/SynapseML/issues。
Dask
从版本 3.2.0 开始添加。
LightGBM 的 Python 包通过 Dask 支持分布式学习。此集成由 LightGBM 维护者维护。
警告
Dask 集成仅在 Linux 上测试过。
Dask 示例
有关使用 lightgbm.dask
的示例代码,请参阅这些 Dask 示例。
使用 Dask 进行训练
本节详细介绍如何使用 Dask 执行 LightGBM 分布式训练。
配置 Dask 集群
分配线程
设置 Dask 集群进行训练时,请为每个 Dask 工作进程至少分配两个线程。如果不这样做,训练可能会大大变慢,因为通信工作和训练工作会互相阻塞。
如果您没有其他重要进程与 Dask 竞争资源,只需接受您选择的 dask.distributed
集群的默认 nthreads
。
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=3)
client = Client(cluster)
管理内存
使用 Dask 诊断面板或您首选的监控工具来监控训练期间 Dask 工作进程的内存消耗。如Dask 工作进程文档中所述,如果内存消耗过高,Dask 工作进程将自动开始将数据溢出到磁盘。这会大大减慢计算速度,因为磁盘 I/O 通常比从内存读取相同数据慢得多。
当内存负载达到 60% 时,[Dask 将] 把最近最少使用的数据溢出到磁盘
为了降低达到内存限制的风险,请考虑在运行任何数据加载或训练代码之前重启每个工作进程。
client.restart()
设置训练数据
lightgbm.dask
中的 estimator 期望以 Dask DataFrame、Dask Array 或(在某些情况下)Dask Series 格式提供类似矩阵或数组的数据。有关如何创建此类数据结构的更多信息,请参阅Dask DataFrame 文档和Dask Array 文档。
在设置训练时,lightgbm
会将一个工作进程上的所有分区连接成单个数据集。分布式训练然后以每个 Dask 工作进程对应一个 LightGBM 工作进程的方式进行。
使用 Dask 为 LightGBM 训练设置数据分区时,请尝试遵循以下建议
确保集群中的每个工作进程都有部分训练数据
尝试为每个工作进程分配大致相同的数据量,尤其是在数据集较小的情况下
如果您计划在相同数据上训练多个模型(例如,调优超参数),请在训练之前使用
client.persist()
一次性实现数据
使用特定的 Dask 客户端
在大多数情况下,您无需告知 lightgbm.dask
使用特定的 Dask 客户端。默认情况下,将使用 distributed.default_client()
返回的客户端。
但是,如果您在同一会话中有多个活动的客户端,您可能需要显式控制 LightGBM 使用的 Dask 客户端。这在更复杂的工作流程中很有用,例如在不同的 Dask 集群上运行多个训练作业。
LightGBM 的 Dask estimator 支持设置属性 client
来控制使用的客户端。
import lightgbm as lgb
from distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
# option 1: keyword argument in constructor
dask_model = lgb.DaskLGBMClassifier(client=client)
# option 2: set_params() after construction
dask_model = lgb.DaskLGBMClassifier()
dask_model.set_params(client=client)
使用特定端口
在训练开始时,lightgbm.dask
会建立一个 LightGBM 网络,其中每个 Dask 工作进程运行一个长时间运行的任务,充当 LightGBM 工作进程。训练期间,LightGBM 工作进程通过 TCP 套接字相互通信。默认情况下,创建这些套接字时会使用随机开放端口。
如果用于训练的集群中 Dask 工作进程之间的通信受到防火墙规则的限制,您必须准确告知 LightGBM 使用哪些端口。
选项 1:提供地址和端口的特定列表
LightGBM 支持参数 machines
,它是一个逗号分隔的字符串,其中每个条目指向一个工作进程(主机名或 IP)以及该工作进程将接受连接的端口。如果您将此参数提供给 lightgbm.dask
中的 estimator,LightGBM 将不会随机搜索端口。
例如,考虑在以下每个 IP 地址上运行一个 Dask 工作进程的情况
10.0.1.0
10.0.2.0
10.0.3.0
您可以编辑防火墙规则以允许在这些主机上的每个主机上额外开放一个端口的流量,然后直接提供 machines
。
import lightgbm as lgb
machines = "10.0.1.0:12401,10.0.2.0:12402,10.0.3.0:15000"
dask_model = lgb.DaskLGBMRegressor(machines=machines)
如果您在集群中的物理主机上运行多个 Dask 工作进程,请确保该 IP 地址有多个条目,并使用不同的端口。例如,如果您正在运行一个 nprocs=2
(每台机器 2 个 Dask 工作进程)的集群,您可能需要在每台主机上额外打开两个端口,然后如下所示提供 machines
。
import lightgbm as lgb
machines = ",".join([
"10.0.1.0:16000",
"10.0.1.0:16001",
"10.0.2.0:16000",
"10.0.2.0:16001",
])
dask_model = lgb.DaskLGBMRegressor(machines=machines)
警告
提供 machines
让您完全控制训练的网络细节,但它也使训练过程变得脆弱。如果您使用 machines
且出现以下任何情况,训练将失败
machines
中提到的任何端口在训练开始时未打开训练数据的某些分区由不在
machines
中列出的机器持有machines
中提到的某些机器不持有任何训练数据
选项 2:指定每个工作进程使用的端口
如果您在每台主机上只运行一个 Dask 工作进程,并且能够可靠地识别每台主机上都开放的端口,那么使用 machines
会不必要地复杂。如果指定了 local_listen_port
且未指定 machines
,LightGBM 将不会随机搜索端口,而是会将 LightGBM 网络中的地址列表限制为持有部分训练数据的 Dask 工作进程。
例如,考虑在以下每个 IP 地址上运行一个 Dask 工作进程的情况
10.0.1.0
10.0.2.0
10.0.3.0
您可以编辑防火墙规则以允许任何工作进程之间通过一个端口进行通信,然后通过参数 local_listen_port
提供该端口。
import lightgbm as lgb
dask_model = lgb.DaskLGBMRegressor(local_listen_port=12400)
警告
提供 local_listen_port
比 machines
稍微不那么脆弱,因为 LightGBM 会自动确定哪些工作进程持有部分训练数据。但是,使用此方法时,如果出现以下任何情况,训练可能会失败
参数
local_listen_port
指定的端口在任何工作主机上未开放任何机器上运行着多个 Dask 工作进程
使用 Dask 的自定义目标函数
从版本 4.0.0 开始添加。
可以通过提供用 Python 编写的自定义目标函数来定制 Boosting 过程。有关如何实现此类函数的详细信息,请参阅 Dask API 文档。
警告
与 lightgbm.dask
一起使用的自定义目标函数将由每个工作进程在其本地数据上调用。
按照下面的示例使用自定义实现的 regression_l2
目标。
import dask.array as da
import lightgbm as lgb
import numpy as np
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
X = da.random.random((1000, 10), (500, 10))
y = da.random.random((1000,), (500,))
def custom_l2_obj(y_true, y_pred):
grad = y_pred - y_true
hess = np.ones(len(y_true))
return grad, hess
dask_model = lgb.DaskLGBMRegressor(
objective=custom_l2_obj
)
dask_model.fit(X, y)
使用 Dask 进行预测
lightgbm.dask
中的 estimator 可用于根据存储在 Dask 集合中的数据创建预测。在该接口中,.predict()
期望输入 Dask Array 或 Dask DataFrame,并返回一个 Dask Array 格式的预测结果。
请参阅Dask 预测示例,获取一些展示如何执行基于 Dask 的预测的示例代码。
对于模型评估,请考虑使用dask-ml 中的评估指标函数。这些函数旨在提供与 sklearn.metrics
中等效函数相同的 API,但它们使用 Dask 支持的分布式计算来计算指标,而无需将所有输入数据全部放在一台机器上。
保存 Dask 模型
使用 Dask 训练后,您有几种保存拟合模型的选项。
选项 1:序列化 Dask estimator
LightGBM 的 Dask estimator 可以直接使用 cloudpickle
、joblib
或 pickle
进行序列化。
import dask.array as da
import pickle
import lightgbm as lgb
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
X = da.random.random((1000, 10), (500, 10))
y = da.random.random((1000,), (500,))
dask_model = lgb.DaskLGBMRegressor()
dask_model.fit(X, y)
with open("dask-model.pkl", "wb") as f:
pickle.dump(dask_model, f)
以这种方式保存的模型以后可以使用保存它时使用的任何序列化库进行加载。
import pickle
with open("dask-model.pkl", "rb") as f:
dask_model = pickle.load(f)
注意
如果您显式设置了 Dask 客户端(参见使用特定的 Dask 客户端),它在序列化 estimator 时将不会被保存。从磁盘加载 Dask estimator 时,如果您需要使用特定客户端,可以在加载后使用 dask_model.set_params(client=client)
添加。
选项 2:序列化 sklearn estimator
从 lightgbm.dask
可用的 estimator 可以转换为 lightgbm.sklearn
中等效类的实例。选择此选项可让您使用 Dask 进行训练,但在预测时避免依赖任何 Dask 库。
import dask.array as da
import joblib
import lightgbm as lgb
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
X = da.random.random((1000, 10), (500, 10))
y = da.random.random((1000,), (500,))
dask_model = lgb.DaskLGBMRegressor()
dask_model.fit(X, y)
# convert to sklearn equivalent
sklearn_model = dask_model.to_local()
print(type(sklearn_model))
#> lightgbm.sklearn.LGBMRegressor
joblib.dump(sklearn_model, "sklearn-model.joblib")
以这种方式保存的模型以后可以使用保存它时使用的任何序列化库进行加载。
import joblib
sklearn_model = joblib.load("sklearn-model.joblib")
选项 3:保存 LightGBM Booster
LightGBM 中最低级别的模型对象是 lightgbm.Booster
。训练后,您可以从 Dask estimator 中提取 Booster。
import dask.array as da
import lightgbm as lgb
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
X = da.random.random((1000, 10), (500, 10))
y = da.random.random((1000,), (500,))
dask_model = lgb.DaskLGBMRegressor()
dask_model.fit(X, y)
# get underlying Booster object
bst = dask_model.booster_
从此以后,您可以使用以下任何方法保存 Booster
使用
cloudpickle
、joblib
或pickle
序列化bst.dump_model()
:将模型转储为字典,可以将其写入 JSONbst.model_to_string()
:将模型转储为内存中的字符串bst.save_model()
:将bst.model_to_string()
的输出写入文本文件
Kubeflow
Kubeflow 用户还可以使用 Kubeflow XGBoost Operator 进行基于 LightGBM 的机器学习工作流。有关更多详细信息,请参阅此示例。
Kubeflow 的 LightGBM 集成非 LightGBM 维护者所维护。
注意
Kubeflow 的 LightGBM 集成非 LightGBM 维护者所维护。错误报告或功能请求应提交至 https://github.com/kubeflow/fairing/issues 或 https://github.com/kubeflow/xgboost-operator/issues。
LightGBM CLI
准备工作
默认情况下,LightGBM 的分布式学习使用基于 socket 的通信。
如果您需要构建支持 MPI 的分布式版本,请参阅安装指南。
Socket 版本
需要收集所有希望运行分布式学习的机器的 IP,并为所有机器分配一个 TCP 端口(此处假定为 12345),并更改防火墙规则以允许此端口(12345)的入站流量。然后将这些 IP 和端口写入一个文件(假定为 mlist.txt
),如下所示
machine1_ip 12345
machine2_ip 12345
MPI 版本
需要收集所有希望运行分布式学习的机器的 IP(或主机名)。然后将这些 IP 写入一个文件(假定为 mlist.txt
),如下所示
machine1_ip
machine2_ip
注意:对于 Windows 用户,需要启动“smpd”来启动 MPI 服务。更多详细信息可在此处找到。
运行分布式学习
Socket 版本
在配置文件中编辑以下参数
tree_learner=your_parallel_algorithm
,在此处编辑your_parallel_algorithm
(例如 feature/data)。num_machines=your_num_machines
,在此处编辑your_num_machines
(例如 4)。machine_list_file=mlist.txt
,mlist.txt
在准备工作部分创建。local_listen_port=12345
,12345
在准备工作部分分配。将数据文件、可执行文件、配置文件和
mlist.txt
复制到所有机器。在所有机器上运行以下命令,您需要将
your_config_file
更改为实际的配置文件。对于 Windows:
lightgbm.exe config=your_config_file
对于 Linux:
./lightgbm config=your_config_file
MPI 版本
在配置文件中编辑以下参数
tree_learner=your_parallel_algorithm
,在此处编辑your_parallel_algorithm
(例如 feature/data)。num_machines=your_num_machines
,在此处编辑your_num_machines
(例如 4)。将数据文件、可执行文件、配置文件和
mlist.txt
复制到所有机器。注意:MPI 需要在所有机器上的相同路径中运行。
在一台机器上运行以下命令(无需在所有机器上运行),需要将
your_config_file
更改为实际的配置文件。对于 Windows
mpiexec.exe /machinefile mlist.txt lightgbm.exe config=your_config_file
对于 Linux
mpiexec --machinefile mlist.txt ./lightgbm config=your_config_file
示例
Ray
Ray 是一个基于 Python 的分布式计算框架。在 Ray 官方 GitHub 组织内维护的 lightgbm_ray 项目可用于使用 ray
执行分布式 LightGBM 训练。
请参阅lightgbm_ray 文档获取使用示例。
注意
lightgbm_ray
非 LightGBM 维护者所维护。错误报告或功能请求应提交至 https://github.com/ray-project/lightgbm_ray/issues。
Mars
Mars 是一个基于张量的大规模数据计算框架。在 Mars GitHub 仓库内维护的 LightGBM 集成可用于使用 pymars
执行分布式 LightGBM 训练。
请参阅mars 文档获取使用示例。
注意
Mars
非 LightGBM 维护者所维护。错误报告或功能请求应提交至 https://github.com/mars-project/mars/issues。