[IBM Spectrum LSF DOC]扩展:自定义插件和集成开发
|
wanlinwang
|
9 min read
LSF提供强大的扩展机制,允许管理员和开发者通过ELIM、ESUB、ECHKPT等插件自定义集群行为,实现深度定制和第三方集成。
扩展点概览
LSF主要扩展机制:
| 扩展点 |
用途 |
语言 |
| ELIM |
外部负载信息管理器 |
Shell/C/Python |
| ESUB |
作业提交过滤器 |
Shell/C/Perl |
| EEXEC |
作业执行包装器 |
Shell |
| ECHKPNT |
Checkpoint/Restart |
C |
| Pre/Post Exec |
作业前后脚本 |
Shell |
ELIM - 外部负载信息管理器
功能
报告自定义资源给LSF:
- GPU状态和利用率
- 许可证可用性
- 专有硬件(FPGA、加速卡)
- 应用级资源
编写ELIM脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| #!/bin/sh
# elim.gpu - 报告GPU资源
echo "BEGIN" # 必须的起始标记
# 查询NVIDIA GPU
nvidia-smi --query-gpu=index,utilization.gpu --format=csv,noheader | \
while IFS=',' read gpu_id util; do
# 格式: 更新周期(秒) 资源名 资源值
echo "10 gpu_${gpu_id} 1"
echo "10 gpu_${gpu_id}_util ${util%% *}"
done
echo "END" # 必须的结束标记
|
配置ELIM
1
2
3
| # lsf.conf
LSF_ELIM=$LSF_SERVERDIR/elim.gpu
LSF_ELIM_UPDATE_INTERVAL=10
|
使用自定义资源
1
2
| # 请求特定GPU
bsub -R "select[gpu_0>0 && gpu_0_util<50]" ./cuda_app
|
ESUB - 作业提交过滤器
功能
拦截和修改作业提交:
- 自动添加资源需求
- 执行策略检查
- 记录作业元数据
- 自动化环境设置
编写ESUB脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| #!/bin/bash
# esub.policy - 作业提交策略
# ESUB接收环境变量:
# LSB_SUB_* 包含所有提交参数
# 示例:自动为Python作业添加Conda环境
if echo "$LSB_SUB_COMMAND_LINE" | grep -q "python"; then
echo "#BSUB -env \"all, CONDA_ENV=pytorch\""
fi
# 示例:检查项目代码合法性
if [ -z "$LSB_SUB_PROJECT_NAME" ]; then
echo "Error: Project name required" >&2
exit 1
fi
# 示例:大内存作业自动分配特定队列
MEM_REQ=$(echo "$LSB_SUB_RES_REQ" | grep -oP 'mem=\K[0-9]+')
if [ "$MEM_REQ" -gt 64000 ]; then
echo "#BSUB -q bigmem"
fi
exit 0 # 0=允许提交,非0=拒绝
|
配置ESUB
1
2
| # lsf.conf
LSB_ESUB=$LSF_SERVERDIR/esub.policy
|
EEXEC - 作业执行包装器
功能
在作业前后执行自定义代码:
- 环境初始化
- 许可证checkout/checkin
- 日志记录
- 数据暂存
EEXEC脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| #!/bin/bash
# eexec.wrapper
# 作业前:准备环境
setup_environment() {
echo "Setting up job $LSB_JOBID"
# Checkout许可证
/opt/license/checkout vcs_license
# 创建临时目录
export JOB_TEMPDIR=/scratch/$LSB_JOBID
mkdir -p $JOB_TEMPDIR
cd $JOB_TEMPDIR
# 加载依赖
module load gcc/9.3
module load python/3.9
}
# 作业后:清理
cleanup() {
echo "Cleaning up job $LSB_JOBID"
# Checkin许可证
/opt/license/checkin vcs_license
# 清理临时文件
rm -rf $JOB_TEMPDIR
}
# 设置trap确保清理执行
trap cleanup EXIT
# 准备环境
setup_environment
# 执行实际作业
eval "$@"
EXIT_CODE=$?
# cleanup会被trap自动调用
exit $EXIT_CODE
|
配置EEXEC
1
2
3
4
5
| # lsb.queues
Begin Queue
QUEUE_NAME = vcs_queue
EEXEC = $LSF_SERVERDIR/eexec.wrapper
End Queue
|
Pre/Post Exec脚本
作业级别前后处理
1
2
3
4
| # 提交时指定
bsub -E "./pre_exec.sh" \
-Ep "./post_exec.sh" \
./main_job.sh
|
Pre-Exec示例
1
2
3
4
5
6
7
8
9
10
11
12
| #!/bin/bash
# pre_exec.sh - 在作业主体前运行
# 下载数据
aws s3 cp s3://mybucket/input.tar.gz .
tar xzf input.tar.gz
# 验证环境
if ! command -v matlab &> /dev/null; then
echo "MATLAB not found" >&2
exit 1
fi
|
Post-Exec示例
1
2
3
4
5
6
7
8
9
10
| #!/bin/bash
# post_exec.sh - 在作业主体后运行(无论成功或失败)
# 上传结果
if [ -f results.dat ]; then
aws s3 cp results.dat s3://mybucket/results/
fi
# 发送通知
curl -X POST webhookurl -d "Job $LSB_JOBID completed"
|
ECHKPNT - Checkpoint/Restart
C语言接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| #include <lsf/lsbatch.h>
int main() {
// 注册checkpoint函数
ls_checkpoint_register(my_checkpoint_func);
// 主计算循环
while (!done) {
do_computation();
}
return 0;
}
int my_checkpoint_func() {
// 保存状态到文件
save_state("checkpoint.dat");
return 0; // 成功
}
|
Python封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| import lsf
def checkpoint():
"""保存当前状态"""
state = get_current_state()
with open('checkpoint.pkl', 'wb') as f:
pickle.dump(state, f)
# 注册checkpoint函数
lsf.register_checkpoint(checkpoint, interval=1800) # 30分钟
# 从checkpoint恢复
if lsf.has_checkpoint():
state = lsf.load_checkpoint()
restore_state(state)
|
资源映射(Resource Mapping)
elim.resource_map
映射外部资源名到LSF资源:
1
2
3
4
5
6
7
8
9
| /* elim_resource_map.c */
#include <stdio.h>
int main() {
// 映射Slurm gres到LSF资源
printf("gpu:%s\n", getenv("SLURM_GPUS"));
printf("mem:%s\n", getenv("SLURM_MEM_PER_NODE"));
return 0;
}
|
动态主机组(Dynamic Host Groups)
基于资源自动分组
1
2
3
4
5
6
7
8
9
10
11
12
13
| #!/bin/bash
# elim.hostgroup
# 根据GPU数量动态分组
GPU_COUNT=$(nvidia-smi --list-gpus | wc -l)
if [ $GPU_COUNT -ge 8 ]; then
echo "HOSTGROUP gpu_rich"
elif [ $GPU_COUNT -ge 1 ]; then
echo "HOSTGROUP gpu_medium"
else
echo "HOSTGROUP cpu_only"
fi
|
第三方集成示例
Ansible集成
1
2
3
4
5
6
7
8
9
10
11
12
13
| # deploy_lsf.yml
- name: Deploy ELIM script
copy:
src: elim.gpu
dest: /opt/lsf/10.1/linux/etc/
mode: '0755'
notify: restart lim
- name: Configure ELIM
lineinfile:
path: /opt/lsf/conf/lsf.conf
line: 'LSF_ELIM=/opt/lsf/10.1/linux/etc/elim.gpu'
notify: reconfig lsf
|
Prometheus监控
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| # lsf_exporter.py
from prometheus_client import Gauge, start_http_server
import subprocess
# 定义指标
lsf_jobs_pending = Gauge('lsf_jobs_pending', 'Number of pending jobs')
lsf_jobs_running = Gauge('lsf_jobs_running', 'Number of running jobs')
def collect_metrics():
# 调用bjobs统计
result = subprocess.check_output(['bjobs', '-sum'])
# 解析输出
lsf_jobs_pending.set(parse_pending(result))
lsf_jobs_running.set(parse_running(result))
if __name__ == '__main__':
start_http_server(9100)
while True:
collect_metrics()
time.sleep(30)
|
C API编程
LSF C API示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| #include <lsf/lsbatch.h>
int main() {
struct submit submit_req;
struct submitReply submit_reply;
// 初始化LSF
if (lsb_init("my_app") < 0) {
lsb_perror("lsb_init");
return 1;
}
// 准备提交请求
memset(&submit_req, 0, sizeof(submit_req));
submit_req.command = "./my_job";
submit_req.queue = "normal";
submit_req.numProcessors = 4;
// 提交作业
if (lsb_submit(&submit_req, &submit_reply) < 0) {
lsb_perror("lsb_submit");
return 1;
}
printf("Job submitted: %ld\n", submit_reply.jobId);
return 0;
}
|
编译:
1
| gcc -o submit_job submit_job.c -llsf -lbat
|
调试扩展脚本
日志记录
1
2
3
4
| # 在ELIM/ESUB中添加日志
LOG_FILE=/var/log/lsf/esub.log
echo "[$(date)] Processing job" >> $LOG_FILE
env | grep LSB_ >> $LOG_FILE
|
测试ELIM
1
2
3
4
5
6
7
8
| # 手动运行ELIM查看输出
$LSF_SERVERDIR/elim.gpu
# 应输出:
# BEGIN
# 10 gpu_0 1
# 10 gpu_0_util 25
# END
|
测试ESUB
1
2
3
4
5
6
7
| # 模拟环境变量
export LSB_SUB_COMMAND_LINE="python train.py"
export LSB_SUB_PROJECT_NAME="ml_project"
# 运行ESUB
$LSF_SERVERDIR/esub.policy
echo "Exit code: $?"
|
最佳实践
- 错误处理:ELIM/ESUB必须健壮,崩溃会影响整个集群
- 性能:ELIM频繁调用,避免耗时操作
- 幂等性:ESUB可能被多次调用,确保幂等
- 版本控制:扩展脚本纳入版本管理
- 文档化:记录脚本用途和依赖
总结
LSF的扩展机制为管理员提供了极大的灵活性,可以深度定制集群行为以适应特定需求。通过ELIM、ESUB等插件,LSF可以无缝集成到复杂的企业IT环境中。
参考资源:
Support the Creator
If you found this article helpful, consider supporting.