创建在线 API 服务¶
BentoML Service 是 BentoML 项目的核心构建块,它允许您定义机器学习模型的服务逻辑。本页面解释了 BentoML Service。
Service 定义¶
BentoML Service 使用基于类的定义。每个类代表一个独立的 Service,可以执行特定任务,例如预处理数据或使用 ML 模型进行预测。您可以使用装饰器 @bentoml.service
来注释一个类,表明它是一个 BentoML Service。按照惯例,您在 service.py
文件中定义一个(或多个)Service。更多信息,请参见 Service 装饰器。
这是来自 Hello world 的 Service 定义示例。
from __future__ import annotations
import bentoml
from transformers import pipeline
@bentoml.service(
resources={"cpu": "2"},
traffic={"timeout": 10},
)
class Summarization:
model_path = bentoml.models.HuggingFaceModel("sshleifer/distilbart-cnn-12-6")
def __init__(self) -> None:
# Load model into pipeline
self.pipeline = pipeline('summarization', model=self.model_path)
@bentoml.api
def summarize(self, text: str) -> str:
result = self.pipeline(text)
return result[0]['summary_text']
类中被定义为可访问的 HTTP API 端点的方法用 @bentoml.api
装饰器修饰。这使得它们在 Service 部署后可以被调用。
注意
这个 Service 使用 HuggingFaceModel
方法从 Hugging Face 下载了一个预训练模型。您可以在 Service 类中使用您自己的模型。更多信息,请参见 加载和管理模型。
测试 Service 代码¶
使用 bentoml serve
测试您的 Service,它会启动一个本地模型服务器并暴露定义的 API 端点。
bentoml serve
按照惯例,BentoML Service 通常在 service.py
文件中定义,但您可以使用 <module_name>:<attribute_name>
格式指定任何模块和属性名称。属性名称对应于您模块中定义的 Service 类。如果您的 Service 在不同的模块中,请相应地更新命令。
bentoml serve mymodule:MyService
默认情况下,服务器可在 https://:3000/ 访问。具体来说,bentoml serve
执行以下操作:
将 API 代码转化为 REST API 端点。默认的 HTTP 方法是
POST
。管理定义的 Service 实例的生命周期。
根据方法名称创建一个 URL 路由。在此示例中,它是
https://:3000/summarize
。此路由可以自定义。
Service 配置¶
您使用 @bentoml.service
装饰器来指定 Service 级别配置,例如资源需求和超时设置。这些配置在您本地提供 Service 服务以及在 BentoCloud 上部署生成的 Bento(或将 Bento 镜像作为容器部署到 Kubernetes 等环境)时都适用。例如:
@bentoml.service(
resources={"memory": "500MiB"},
traffic={"timeout": 10},
)
class Summarization:
# Service definition here
所有配置字段都是可选的,并带有默认值。这允许您根据您的特定用例和部署环境对 Service 进行精细调整和优化。
Service API¶
BentoML 中的 @bentoml.api
装饰器是为 BentoML Service 定义 API 端点的关键组件。此装饰器将一个普通的 Python 函数转换为 API 端点,为其提供了作为 Web API 端点所需的附加能力:
@bentoml.api
def summarize(self, text: str) -> str:
result = self.pipeline(text)
return result[0]['summary_text']
您可以自定义 Service API 的输入和输出逻辑。请参阅 定义输入和输出类型 了解更多信息。
自定义路由路径¶
每个 API 端点都有一个唯一的路由(URL 路径)。默认情况下,路由是根据函数名称派生的,但您可以使用 route
参数进行自定义。
@bentoml.api(route="/custom/url/name")
def summarize(self, text: str) -> str:
result = self.pipeline(text)
return result[0]['summary_text']
推理上下文¶
您可以通过在 Service API 函数中添加 bentoml.Context
来获取推理调用的上下文。此参数允许您访问有关传入请求的信息(例如客户端头部)并修改传出响应(例如设置响应头部、cookie 或 HTTP 状态码)。此外,您还可以通过 ctx.state
属性读取和写入全局状态字典,这是一个 每个工作进程 的字典,可以在不同的 API 端点之间读取和写入。
@bentoml.api
def summarize(self, text: str, ctx: bentoml.Context) -> str:
# Get request headers
request_headers = ctx.request.headers
result = self.pipeline(text)
# Set response headers, cookies, and status code
ctx.response.status_code = 202
ctx.response.cookies = [
bentoml.Cookie(
key="key",
value="value",
max_age=None,
expires=None,
path="/summarize",
domain=None,
secure=True,
httponly=True,
samesite="None"
)
]
# Add a custom header to the response
ctx.response.headers.append("X-Custom-Header", "value")
return result[0]['summary_text']
生命周期钩子¶
BentoML 的生命周期钩子提供了一种在 Service 生命周期的特定阶段插入自定义逻辑的方式。
部署钩子 (
@bentoml.on_deployment
): 在生成 Service 工作进程 之前执行全局设置操作。无论工作进程数量如何,它们只运行一次,非常适合一次性初始化。关闭钩子 (
@bentoml.on_shutdown
): 当 BentoML Service 关闭时运行清理逻辑。它们可以执行关闭连接和释放资源等任务,以确保优雅关闭。
您可以使用装饰器设置生命周期钩子。详细信息请参见 配置生命周期钩子。
同步和异步 API¶
BentoML Service 中的 API 可以定义为 Python 中的同步函数或异步协程。
基本用法¶
对于同步逻辑,BentoML 会创建一个最优大小的工作进程池来处理执行。同步 API 简单明了,适用于大多数模型服务场景。这是一个同步 API 的示例:
import bentoml
@bentoml.service(name="iris_classifier", resources={"cpu": "200m", "memory": "512Mi"})
class IrisClassifier:
iris_model = bentoml.models.BentoModel("iris_sklearn:latest")
preprocessing = bentoml.depends(Preprocessing)
def __init__(self):
import joblib
self.model = joblib.load(self.iris_model.path_of("model.pkl"))
@bentoml.api
def classify(self, input_series: np.ndarray) -> np.ndarray:
return self.model.predict(input_series)
然而,对于想要最大化性能和吞吐量的场景,同步 API 可能不足。当处理逻辑是 IO 密集型且支持异步模型执行时,异步 API 是理想的选择。这是一个示例:
import bentoml
from vllm import AsyncEngineArgs, AsyncLLMEngine, SamplingParams
from typing import Optional, AsyncGenerator, List
SAMPLING_PARAM = SamplingParams(max_tokens=4096)
@bentoml.service(workers=1, resources={"gpu": "1"}, envs=[{"name": "HF_TOKEN"}])
class VLLMService:
model = bentoml.models.HuggingFaceModel("meta-llama/Meta-Llama-3.1-8B-Instruct")
def __init__(self) -> None:
ENGINE_ARGS = AsyncEngineArgs(model=self.model)
self.engine = AsyncLLMEngine.from_engine_args(ENGINE_ARGS)
self.request_id = 0
@bentoml.api
async def generate(self, prompt: str = "Explain superconductors like I'm five years old", tokens: Optional[List[int]] = None) -> AsyncGenerator[str, None]:
stream = await self.engine.add_request(self.request_id, prompt, SAMPLING_PARAM, prompt_token_ids=tokens)
self.request_id += 1
async for request_output in stream:
yield request_output.outputs[0].text
异步 API 的实现更有效,因为当调用异步方法时,事件循环变得可用以处理其他请求,而当前请求正在等待方法结果。此外,BentoML 会根据可用的 CPU 核心数自动配置理想的并行度。这使得在常见用例中无需进一步配置事件循环。
警告
避免在异步 API 中实现阻塞逻辑,因为此类操作会阻塞 IO 事件循环,导致 /readyz
等健康检查端点无法正常工作。
同步转换为异步¶
对于机器学习推理任务,尽管传统上是同步执行的,但由于不同原因可能需要异步执行,例如:
并行运行任务
使用支持异步连接的数据库等资源
然而,直接在异步上下文中调用同步阻塞函数通常被认为是不好的做法,因为这会阻塞事件循环,导致性能下降和响应性降低。在这种情况下,您可以使用 Service 的 .to_async
属性,它允许您将 Service 的同步方法转换为异步方法。这可以实现非阻塞执行并提高 IO 密集型操作的性能。这是一个示例:
...
@bentoml.service(
traffic={"timeout": 600},
workers=4,
resources={
"memory": "4Gi"
},
)
class GreetingCardService:
# Services StableLMService, SDXLTurboService, and XTTSService are previously defined
# Retrieve these Services using `bentoml.depends` so that their methods can be called directly
stablelm = bentoml.depends(StableLMService)
sdxl = bentoml.depends(SDXLTurboService)
xtts = bentoml.depends(XTTSService)
@bentoml.api
async def generate_card(
self,
context: bentoml.Context,
message: str = "Happy new year!",
) -> Annotated[Path, bentoml.validators.ContentType("video/*")]:
greeting_message = await self.stablelm.enhance_message(message)
sdxl_prompt_tmpl = "a happy and heart-warming greeting card based on greeting message {message}"
sdxl_prompt = sdxl_prompt_tmpl.format(message=greeting_message)
# Run `txt2img` and `synthesize` operations in parallel
audio_path, image = await asyncio.gather(
self.xtts.to_async.synthesize(greeting_message),
self.sdxl.to_async.txt2img(sdxl_prompt)
)
image_path = os.path.join(context.temp_dir, "output.png")
image.save(image_path)
cmd = ["ffmpeg", "-loop", "1", "-i", str(image_path), "-i", str(audio_path), "-shortest"]
output_path = os.path.join(context.temp_dir, "output.mp4")
cmd.append(output_path)
subprocess.run(cmd)
return Path(output_path)
注意
bentoml.depends()
通常用于服务间通信,因为它允许您直接调用另一个 Service 中的 BentoML Service API 方法,就好像它们是本地类函数一样。更多信息,请参见 运行分布式服务。
在此示例中,.to_async
属性将同步方法(SDXLTurboService
和 XTTSService
的 txt2img
和 synthesize
方法)转换为其异步版本,使得 generate_card
方法可以与 asyncio.gather
并发地执行多个异步操作。
任务¶
BentoML 中的任务允许您在后台执行长时间运行的操作,通过类似任务队列的 API 进行管理。这些后台任务非常适合批处理和图像或视频生成等场景,在这些场景中您不需要立即或同步获取结果。
要定义任务端点,请在 Service 构造函数中使用 @bentoml.task
装饰器。更多信息,请参见 异步任务队列。
将遗留 Runner 转换为 Service¶
Runner 是 BentoML 1.1 中的一个遗留概念,它表示可以在远程 Python worker 上执行并独立扩展的计算单元。在 BentoML 1.1 中,Service 是使用 Service
和 Runner
组件共同定义的,其中一个 Service 可以包含一个或多个 Runner。从 BentoML 1.2 开始,框架已得到精简,使用 Python 类来定义 BentoML Service。
为了在从 1.1 迁移到 1.2+ 时最大程度地减少代码更改,您可以使用 bentoml.runner_service()
函数将 Runner 转换为 Service。这是一个示例:
import bentoml
import numpy as np
# Create a legacy runner
sample_legacy_runner = bentoml.models.get("model_name:version").to_runner()
# Create an internal Service
SampleService = bentoml.runner_service(runner = sample_legacy_runner)
# Use the @bentoml.service decorator to mark a class as a Service
@bentoml.service(
resources={"cpu": "2", "memory": "500MiB"},
workers=1,
traffic={"timeout": 20},
)
# Define the BentoML Service
class MyService:
# Integrate the internal Service using bentoml.depends() to inject it as a dependency
sample_model_runner = bentoml.depends(SampleService)
# Define Service API and IO schema
@bentoml.api
def classify(self, input_series: np.ndarray) -> np.ndarray:
# Use the internal Service for prediction
result = self.sample_model_runner.predict.run(input_series)
return result