Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions docs/en/advance/turbomind_priority_scheduling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# TurboMind Priority Scheduling

TurboMind supports request-level priority scheduling. This is useful when one service handles different classes of traffic at the same time, such as interactive requests, background batch jobs, regular users, and high-priority users. After priority scheduling is enabled, requests with higher priority are admitted to inference first, while requests that have already started are kept whenever possible to reduce extra KV cache swapping overhead.

Priority scheduling only takes effect with the TurboMind backend. The default scheduling policy is `fifo`; in that mode, requests are still scheduled by arrival order and the `priority` field does not change the scheduling order.

## Enable Priority Scheduling

Start the API Server with `--schedule-policy priority`:

```bash
lmdeploy serve api_server internlm/internlm2_5-7b-chat \
--backend turbomind \
--schedule-policy priority
```

When creating a pipeline in Python, enable it through `TurbomindEngineConfig`:

```python
from lmdeploy import GenerationConfig, TurbomindEngineConfig, pipeline

backend_config = TurbomindEngineConfig(schedule_policy='priority')
pipe = pipeline('internlm/internlm2_5-7b-chat', backend_config=backend_config)

response = pipe(
'Introduce LMDeploy',
gen_config=GenerationConfig(priority=0, max_new_tokens=256),
)
```

`schedule_policy` supports the following values:

| Value | Behavior |
| ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `fifo` | Default policy. Schedule requests by arrival order and preserve the original behavior. |
| `priority` | Schedule by request priority. Smaller values are admitted first; requests with the same priority keep FIFO order; already-started requests are kept before new requests. |

## Set Request Priority

Set request priority with `priority`. The valid range is `[0, 255]`. Smaller values have higher priority; `0` is the highest priority and `255` is the lowest priority. The default value is `0`. `priority` must be an integer; out-of-range values and strings, floats, booleans, or other non-integer types are rejected by validation. `null` is treated as not set (equivalent to `0`).

### OpenAI-Compatible API

Both `/v1/chat/completions` and `/v1/completions` support the LMDeploy extension field `priority`.

```bash
curl http://localhost:23333/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "internlm/internlm2_5-7b-chat",
"messages": [{"role": "user", "content": "Summarize this text"}],
"priority": 0,
"max_tokens": 256
}'
```

When using the OpenAI Python SDK, pass this extension field through `extra_body`:

```python
from openai import OpenAI

client = OpenAI(api_key='EMPTY', base_url='http://localhost:23333/v1')

response = client.chat.completions.create(
model='internlm/internlm2_5-7b-chat',
messages=[{'role': 'user', 'content': 'Summarize this text'}],
max_tokens=256,
extra_body={'priority': 0},
)
```

Use a larger value for normal-priority requests:

```python
response = client.chat.completions.create(
model='internlm/internlm2_5-7b-chat',
messages=[{'role': 'user', 'content': 'Generate a long background report'}],
max_tokens=1024,
extra_body={'priority': 32},
)
```

### Pipeline

For pipeline calls, use `GenerationConfig.priority`:

```python
from lmdeploy import GenerationConfig

urgent = pipe(
'Quickly review this code',
gen_config=GenerationConfig(priority=0, max_new_tokens=256),
)

background = pipe(
'Generate a long offline analysis report',
gen_config=GenerationConfig(priority=32, max_new_tokens=1024),
)
```

Priority scheduling is most visible when the service is handling concurrent requests. If there is only one synchronous request and no resource contention, changing its priority does not change the output content.

## Scheduling Semantics

After `schedule_policy='priority'` is enabled, TurboMind uses request priority in two stages:

1. Before requests enter the engine, the waiting queue admits requests with smaller `priority` values first.
2. After requests enter the engine, already-started requests are kept first. Requests that have not started are then ordered by `priority` and arrival order.

Therefore, `priority` is a non-preemptive priority policy:

- A high-priority request can overtake lower-priority requests that are still waiting.
- A new high-priority request does not preempt a lower-priority request that has already started.
- Requests with the same priority are processed in arrival order.
- A continuous stream of high-priority requests may make lower-priority requests wait longer. The current policy does not include aging, deadlines, quotas, or weighted fair scheduling.

This policy favors throughput and stability. It is a good fit for online services that need traffic differentiation while avoiding frequent preemption and KV cache swaps.

## Usage Tips

- Reserve smaller values, such as `0` or `1`, for the most urgent requests.
- Use middle values, such as `16` or `32`, for normal online requests.
- Use larger values, such as `128` or `255`, for background, offline, or delay-tolerant requests.
- If all requests use the default value `0`, scheduling is equivalent to FIFO among requests with the same priority.
- This feature only changes scheduling order. It does not change sampling parameters, output quality, or token generation rules.
1 change: 1 addition & 0 deletions docs/en/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Documentation
advance/long_context.md
advance/chat_template.md
advance/debug_turbomind.md
advance/turbomind_priority_scheduling.md
advance/structed_output.md
advance/pytorch_multinodes.md
advance/pytorch_profiling.md
Expand Down
125 changes: 125 additions & 0 deletions docs/zh_cn/advance/turbomind_priority_scheduling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# TurboMind 优先级调度

TurboMind 支持按请求优先级进行调度。该功能适合在同一个服务中同时处理多类请求,例如交互式请求、后台批处理任务、普通用户请求和高优先级用户请求。启用后,服务会优先让优先级更高的请求进入推理,同时尽量保留已经开始执行的请求,减少 KV cache 交换带来的额外开销。

优先级调度只在 TurboMind 后端生效。默认调度策略是 `fifo`,此时请求仍按到达顺序处理,`priority` 字段不会改变调度顺序。

## 启用优先级调度

启动 API Server 时设置 `--schedule-policy priority`:

```bash
lmdeploy serve api_server internlm/internlm2_5-7b-chat \
--backend turbomind \
--schedule-policy priority
```

在 Python 中创建 pipeline 时,可以通过 `TurbomindEngineConfig` 启用:

```python
from lmdeploy import GenerationConfig, TurbomindEngineConfig, pipeline

backend_config = TurbomindEngineConfig(schedule_policy='priority')
pipe = pipeline('internlm/internlm2_5-7b-chat', backend_config=backend_config)

response = pipe(
'介绍一下 LMDeploy',
gen_config=GenerationConfig(priority=0, max_new_tokens=256),
)
```

`schedule_policy` 支持以下取值:

| 取值 | 行为 |
| ---------- | ----------------------------------------------------------------------------------------------------- |
| `fifo` | 默认策略。按请求到达顺序调度,保持原有行为。 |
| `priority` | 按请求优先级调度。数值更小的请求优先进入推理;同优先级请求保持 FIFO;已经开始执行的请求优先于新请求。 |

## 设置请求优先级

请求优先级通过 `priority` 设置,取值范围为 `[0, 255]`。数值越小,优先级越高;`0` 是最高优先级,`255` 是最低优先级。未设置时默认值为 `0`。`priority` 必须是整数;超出范围或使用字符串、浮点数、布尔值等非整数类型会被校验拒绝。`null` 会被当作未设置(等价于 `0`)。

### OpenAI 兼容接口

`/v1/chat/completions` 和 `/v1/completions` 都支持 LMDeploy 扩展字段 `priority`。

```bash
curl http://localhost:23333/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "internlm/internlm2_5-7b-chat",
"messages": [{"role": "user", "content": "请总结这段文本"}],
"priority": 0,
"max_tokens": 256
}'
```

使用 OpenAI Python SDK 时,可以通过 `extra_body` 传入该扩展字段:

```python
from openai import OpenAI

client = OpenAI(api_key='EMPTY', base_url='http://localhost:23333/v1')

response = client.chat.completions.create(
model='internlm/internlm2_5-7b-chat',
messages=[{'role': 'user', 'content': '请总结这段文本'}],
max_tokens=256,
extra_body={'priority': 0},
)
```

普通优先级请求可以使用更大的数值:

```python
response = client.chat.completions.create(
model='internlm/internlm2_5-7b-chat',
messages=[{'role': 'user', 'content': '后台生成一份长报告'}],
max_tokens=1024,
extra_body={'priority': 32},
)
```

### Pipeline

Pipeline 调用中使用 `GenerationConfig.priority`:

```python
from lmdeploy import GenerationConfig

urgent = pipe(
'帮我快速检查这段代码',
gen_config=GenerationConfig(priority=0, max_new_tokens=256),
)

background = pipe(
'生成一份较长的离线分析报告',
gen_config=GenerationConfig(priority=32, max_new_tokens=1024),
)
```

优先级调度主要在服务并发处理多个请求时体现效果。单个同步请求没有其他请求竞争资源时,设置不同优先级不会改变输出内容。

## 调度语义

启用 `schedule_policy='priority'` 后,TurboMind 会在两个阶段使用请求优先级:

1. 请求进入 engine 前,等待队列优先取出 `priority` 数值更小的请求。
2. 请求进入 engine 后,已经开始执行的请求优先保留;尚未开始执行的请求再按 `priority` 和到达顺序排序。

因此,`priority` 是非抢占式优先级策略:

- 高优先级请求可以超过仍在等待的低优先级请求。
- 高优先级新请求不会抢占已经开始执行的低优先级请求。
- 同优先级请求按到达顺序处理。
- 持续到来的高优先级请求可能让低优先级请求等待更久;当前策略不包含 aging、deadline、配额或加权公平调度。

这种策略更偏向吞吐和稳定性,适合希望区分请求等级,同时避免频繁抢占和 KV cache swap 的在线服务。

## 使用建议

- 为最紧急的请求保留较小的优先级,例如 `0` 或 `1`。
- 为普通在线请求使用中间值,例如 `16` 或 `32`。
- 为后台、离线或可延迟请求使用更大的值,例如 `128` 或 `255`。
- 如果所有请求都使用默认值 `0`,调度效果等价于同优先级 FIFO。
- 该功能只影响调度顺序,不改变采样参数、输出质量或 token 生成规则。
1 change: 1 addition & 0 deletions docs/zh_cn/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ LMDeploy 工具箱提供以下核心功能:
advance/long_context.md
advance/chat_template.md
advance/debug_turbomind.md
advance/turbomind_priority_scheduling.md
advance/structed_output.md
advance/pytorch_multinodes.md
advance/pytorch_profiling.md
Expand Down
1 change: 1 addition & 0 deletions lmdeploy/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def add_parser_chat():
ArgumentHelper.communicator(tb_group)
ArgumentHelper.cp(tb_group)
ArgumentHelper.async_(tb_group)
ArgumentHelper.schedule_policy(tb_group)

# speculative decoding
ArgumentHelper.add_spec_group(parser)
Expand Down
2 changes: 2 additions & 0 deletions lmdeploy/cli/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def add_parser_api_server():
ArgumentHelper.num_tokens_per_iter(tb_group)
ArgumentHelper.max_prefill_iters(tb_group)
ArgumentHelper.async_(tb_group)
ArgumentHelper.schedule_policy(tb_group)
ArgumentHelper.communicator(tb_group)
ArgumentHelper.dist_init_addr(tb_group)

Expand Down Expand Up @@ -273,6 +274,7 @@ def api_server(args):
num_tokens_per_iter=args.num_tokens_per_iter,
max_prefill_iters=args.max_prefill_iters,
async_=args.async_,
schedule_policy=args.schedule_policy,
communicator=args.communicator,
enable_metrics=not args.disable_metrics,
hf_overrides=args.hf_overrides)
Expand Down
10 changes: 10 additions & 0 deletions lmdeploy/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,16 @@ def async_(parser):
help='Enable async execution (default: 1, enabled). '
'Set to 0 to disable async mode, 1 to enable it.')

@staticmethod
def schedule_policy(parser):
return parser.add_argument('--schedule-policy',
type=str,
default='fifo',
choices=['fifo', 'priority'],
help='TurboMind scheduling policy. '
'"fifo" preserves existing behavior; "priority" admits lower request priority '
'values first and avoids preempting started requests.')

@staticmethod
def max_prefill_token_num(parser):
return parser.add_argument('--max-prefill-token-num',
Expand Down
13 changes: 13 additions & 0 deletions lmdeploy/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ class GenerationConfig:
Must be non-negative; values below 0 are treated as 0.
repetition_ngram_threshold: The number of times an n-gram must be repeated to trigger early stop.
Must be non-negative; values below 0 are treated as 0.
priority: TurboMind scheduling priority. Smaller values have higher priority.
Must be in range [0, 255]. Defaults to 0.
"""

n: int = 1
Expand Down Expand Up @@ -146,6 +148,9 @@ class GenerationConfig:
repetition_ngram_size: int = 0
repetition_ngram_threshold: int = 0

# TurboMind scheduling priority. Smaller values have higher priority.
priority: int = 0

def convert_stop_bad_words_to_ids(self, tokenizer: Tokenizer):
"""Convert stop_words/bad_sords to ids and append the ids to
stop_token_ids/bad_token_ids."""
Expand Down Expand Up @@ -198,6 +203,8 @@ def __post_init__(self):
if self.repetition_ngram_size <= 0 or self.repetition_ngram_threshold <= 0:
self.repetition_ngram_size = 0
self.repetition_ngram_threshold = 0
assert type(self.priority) is int, 'priority must be an integer'
assert 0 <= self.priority <= 255, 'priority must be in range [0, 255]'


@pydantic_dataclass
Expand Down Expand Up @@ -267,6 +274,10 @@ class TurbomindEngineConfig:
hf_overrides: Huggingface overrides for the model.
It can be used to override the default config of the model
enable_metrics: enable metrics system
schedule_policy: TurboMind scheduling policy.
`fifo` preserves existing behavior. `priority` admits lower
GenerationConfig.priority values first and keeps already-started
requests before new requests inside TurboMind.
"""

dtype: str = 'auto'
Expand Down Expand Up @@ -305,6 +316,7 @@ class TurbomindEngineConfig:
communicator: str = 'nccl'
hf_overrides: dict[str, Any] | None = None
enable_metrics: bool = True
schedule_policy: Literal['fifo', 'priority'] = 'fifo'

def __post_init__(self):
"""Check input validation."""
Expand All @@ -318,6 +330,7 @@ def __post_init__(self):
'invalid max_prefill_token_num'
assert self.num_tokens_per_iter >= 0, 'invalid num_tokens_per_iter'
assert self.async_ in (0, 1), 'async_ must be 0 (disabled) or 1 (enabled)'
assert self.schedule_policy in ('fifo', 'priority'), 'invalid schedule_policy'


@dataclass
Expand Down
2 changes: 2 additions & 0 deletions lmdeploy/serve/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ async def chat_completions_v1(request: ChatCompletionRequest, raw_request: Reque
logits_processors=logits_processors,
min_new_tokens=request.min_new_tokens,
min_p=request.min_p,
priority=request.priority or 0,
random_seed=random_seed,
spaces_between_special_tokens=request.spaces_between_special_tokens,
migration_request=migration_request,
Expand Down Expand Up @@ -760,6 +761,7 @@ async def completions_v1(request: CompletionRequest, raw_request: Request = None
stop_words=request.stop,
skip_special_tokens=request.skip_special_tokens,
min_p=request.min_p,
priority=request.priority or 0,
random_seed=random_seed,
spaces_between_special_tokens=request.spaces_between_special_tokens,
migration_request=migration_request,
Expand Down
Loading