LSF提供强大的扩展机制,允许管理员和开发者通过ELIM、ESUB、ECHKPT等插件自定义集群行为,实现深度定制和第三方集成。

扩展点概览

LSF主要扩展机制:

扩展点 用途 语言
ELIM 外部负载信息管理器 Shell/C/Python
ESUB 作业提交过滤器 Shell/C/Perl
EEXEC 作业执行包装器 Shell
ECHKPNT Checkpoint/Restart C
Pre/Post Exec 作业前后脚本 Shell

ELIM - 外部负载信息管理器

功能

报告自定义资源给LSF:

  • GPU状态和利用率
  • 许可证可用性
  • 专有硬件(FPGA、加速卡)
  • 应用级资源

编写ELIM脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/bin/sh
# elim.gpu - 报告GPU资源

echo "BEGIN"     # 必须的起始标记

# 查询NVIDIA GPU
nvidia-smi --query-gpu=index,utilization.gpu --format=csv,noheader | \
while IFS=',' read gpu_id util; do
    # 格式: 更新周期(秒) 资源名 资源值
    echo "10 gpu_${gpu_id} 1"
    echo "10 gpu_${gpu_id}_util ${util%% *}"
done

echo "END"       # 必须的结束标记

配置ELIM

1
2
3
# lsf.conf
LSF_ELIM=$LSF_SERVERDIR/elim.gpu
LSF_ELIM_UPDATE_INTERVAL=10

使用自定义资源

1
2
# 请求特定GPU
bsub -R "select[gpu_0>0 && gpu_0_util<50]" ./cuda_app

ESUB - 作业提交过滤器

功能

拦截和修改作业提交:

  • 自动添加资源需求
  • 执行策略检查
  • 记录作业元数据
  • 自动化环境设置

编写ESUB脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!/bin/bash
# esub.policy - 作业提交策略

# ESUB接收环境变量:
# LSB_SUB_* 包含所有提交参数

# 示例:自动为Python作业添加Conda环境
if echo "$LSB_SUB_COMMAND_LINE" | grep -q "python"; then
    echo "#BSUB -env \"all, CONDA_ENV=pytorch\""
fi

# 示例:检查项目代码合法性
if [ -z "$LSB_SUB_PROJECT_NAME" ]; then
    echo "Error: Project name required" >&2
    exit 1
fi

# 示例:大内存作业自动分配特定队列
MEM_REQ=$(echo "$LSB_SUB_RES_REQ" | grep -oP 'mem=\K[0-9]+')
if [ "$MEM_REQ" -gt 64000 ]; then
    echo "#BSUB -q bigmem"
fi

exit 0  # 0=允许提交,非0=拒绝

配置ESUB

1
2
# lsf.conf
LSB_ESUB=$LSF_SERVERDIR/esub.policy

EEXEC - 作业执行包装器

功能

在作业前后执行自定义代码:

  • 环境初始化
  • 许可证checkout/checkin
  • 日志记录
  • 数据暂存

EEXEC脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#!/bin/bash
# eexec.wrapper

# 作业前:准备环境
setup_environment() {
    echo "Setting up job $LSB_JOBID"
    
    # Checkout许可证
    /opt/license/checkout vcs_license
    
    # 创建临时目录
    export JOB_TEMPDIR=/scratch/$LSB_JOBID
    mkdir -p $JOB_TEMPDIR
    cd $JOB_TEMPDIR
    
    # 加载依赖
    module load gcc/9.3
    module load python/3.9
}

# 作业后:清理
cleanup() {
    echo "Cleaning up job $LSB_JOBID"
    
    # Checkin许可证
    /opt/license/checkin vcs_license
    
    # 清理临时文件
    rm -rf $JOB_TEMPDIR
}

# 设置trap确保清理执行
trap cleanup EXIT

# 准备环境
setup_environment

# 执行实际作业
eval "$@"
EXIT_CODE=$?

# cleanup会被trap自动调用
exit $EXIT_CODE

配置EEXEC

1
2
3
4
5
# lsb.queues
Begin Queue
QUEUE_NAME = vcs_queue
EEXEC = $LSF_SERVERDIR/eexec.wrapper
End Queue

Pre/Post Exec脚本

作业级别前后处理

1
2
3
4
# 提交时指定
bsub -E "./pre_exec.sh" \
     -Ep "./post_exec.sh" \
     ./main_job.sh

Pre-Exec示例

1
2
3
4
5
6
7
8
9
10
11
12
#!/bin/bash
# pre_exec.sh - 在作业主体前运行

# 下载数据
aws s3 cp s3://mybucket/input.tar.gz .
tar xzf input.tar.gz

# 验证环境
if ! command -v matlab &> /dev/null; then
    echo "MATLAB not found" >&2
    exit 1
fi

Post-Exec示例

1
2
3
4
5
6
7
8
9
10
#!/bin/bash
# post_exec.sh - 在作业主体后运行(无论成功或失败)

# 上传结果
if [ -f results.dat ]; then
    aws s3 cp results.dat s3://mybucket/results/
fi

# 发送通知
curl -X POST webhookurl -d "Job $LSB_JOBID completed"

ECHKPNT - Checkpoint/Restart

C语言接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <lsf/lsbatch.h>

int main() {
    // 注册checkpoint函数
    ls_checkpoint_register(my_checkpoint_func);
    
    // 主计算循环
    while (!done) {
        do_computation();
    }
    
    return 0;
}

int my_checkpoint_func() {
    // 保存状态到文件
    save_state("checkpoint.dat");
    return 0;  // 成功
}

Python封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import lsf

def checkpoint():
    """保存当前状态"""
    state = get_current_state()
    with open('checkpoint.pkl', 'wb') as f:
        pickle.dump(state, f)

# 注册checkpoint函数
lsf.register_checkpoint(checkpoint, interval=1800)  # 30分钟

# 从checkpoint恢复
if lsf.has_checkpoint():
    state = lsf.load_checkpoint()
    restore_state(state)

资源映射(Resource Mapping)

elim.resource_map

映射外部资源名到LSF资源:

1
2
3
4
5
6
7
8
9
/* elim_resource_map.c */
#include <stdio.h>

int main() {
    // 映射Slurm gres到LSF资源
    printf("gpu:%s\n", getenv("SLURM_GPUS"));
    printf("mem:%s\n", getenv("SLURM_MEM_PER_NODE"));
    return 0;
}

动态主机组(Dynamic Host Groups)

基于资源自动分组

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/bin/bash
# elim.hostgroup

# 根据GPU数量动态分组
GPU_COUNT=$(nvidia-smi --list-gpus | wc -l)

if [ $GPU_COUNT -ge 8 ]; then
    echo "HOSTGROUP gpu_rich"
elif [ $GPU_COUNT -ge 1 ]; then
    echo "HOSTGROUP gpu_medium"
else
    echo "HOSTGROUP cpu_only"
fi

第三方集成示例

Ansible集成

1
2
3
4
5
6
7
8
9
10
11
12
13
# deploy_lsf.yml
- name: Deploy ELIM script
  copy:
    src: elim.gpu
    dest: /opt/lsf/10.1/linux/etc/
    mode: '0755'
  notify: restart lim

- name: Configure ELIM
  lineinfile:
    path: /opt/lsf/conf/lsf.conf
    line: 'LSF_ELIM=/opt/lsf/10.1/linux/etc/elim.gpu'
  notify: reconfig lsf

Prometheus监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# lsf_exporter.py
from prometheus_client import Gauge, start_http_server
import subprocess

# 定义指标
lsf_jobs_pending = Gauge('lsf_jobs_pending', 'Number of pending jobs')
lsf_jobs_running = Gauge('lsf_jobs_running', 'Number of running jobs')

def collect_metrics():
    # 调用bjobs统计
    result = subprocess.check_output(['bjobs', '-sum'])
    # 解析输出
    lsf_jobs_pending.set(parse_pending(result))
    lsf_jobs_running.set(parse_running(result))

if __name__ == '__main__':
    start_http_server(9100)
    while True:
        collect_metrics()
        time.sleep(30)

C API编程

LSF C API示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <lsf/lsbatch.h>

int main() {
    struct submit submit_req;
    struct submitReply submit_reply;
    
    // 初始化LSF
    if (lsb_init("my_app") < 0) {
        lsb_perror("lsb_init");
        return 1;
    }
    
    // 准备提交请求
    memset(&submit_req, 0, sizeof(submit_req));
    submit_req.command = "./my_job";
    submit_req.queue = "normal";
    submit_req.numProcessors = 4;
    
    // 提交作业
    if (lsb_submit(&submit_req, &submit_reply) < 0) {
        lsb_perror("lsb_submit");
        return 1;
    }
    
    printf("Job submitted: %ld\n", submit_reply.jobId);
    return 0;
}

编译:

1
gcc -o submit_job submit_job.c -llsf -lbat

调试扩展脚本

日志记录

1
2
3
4
# 在ELIM/ESUB中添加日志
LOG_FILE=/var/log/lsf/esub.log
echo "[$(date)] Processing job" >> $LOG_FILE
env | grep LSB_ >> $LOG_FILE

测试ELIM

1
2
3
4
5
6
7
8
# 手动运行ELIM查看输出
$LSF_SERVERDIR/elim.gpu

# 应输出:
# BEGIN
# 10 gpu_0 1
# 10 gpu_0_util 25
# END

测试ESUB

1
2
3
4
5
6
7
# 模拟环境变量
export LSB_SUB_COMMAND_LINE="python train.py"
export LSB_SUB_PROJECT_NAME="ml_project"

# 运行ESUB
$LSF_SERVERDIR/esub.policy
echo "Exit code: $?"

最佳实践

  1. 错误处理:ELIM/ESUB必须健壮,崩溃会影响整个集群
  2. 性能:ELIM频繁调用,避免耗时操作
  3. 幂等性:ESUB可能被多次调用,确保幂等
  4. 版本控制:扩展脚本纳入版本管理
  5. 文档化:记录脚本用途和依赖

总结

LSF的扩展机制为管理员提供了极大的灵活性,可以深度定制集群行为以适应特定需求。通过ELIM、ESUB等插件,LSF可以无缝集成到复杂的企业IT环境中。


参考资源