批量推理作业

某些 AI 驱动的任务最适合批量推理,例如 RAG 系统的嵌入生成、推荐系统的定期更新,或用于特征提取的批量图像处理。

使用 BentoML 和 BentoCloud,您可以高效地管理这些批量推理作业,并具有以下几个关键优势

  • 按需部署:仅在需要时部署模型,并在作业完成后终止部署,确保您只为您使用的资源付费。您可以运行一次性或定期批量推理作业。

  • 自动扩展:根据作业的流量需求自动扩展您的资源。

  • 用于推理的专用硬件:在专用 GPU 上运行模型推理,确保推理任务不干扰批处理。

本文档解释了如何使用 BentoML 和 BentoCloud 运行批量推理作业。

创建作业

以下示例演示了作业执行的完整生命周期。

步骤 1:准备 BentoML 项目

确保您已有一个现有的 BentoML 项目或 Bento。下面的示例是 RAG 系统中典型的 BentoML Service 设置,其中端点 ingest_pdf_batchingest_text_batch 用于文件的批量摄取。它们可以计算文档的嵌入并将其写入向量数据库进行索引。与可能需要持续可用的常规服务不同,这些端点可以按需激活,这使得它们非常适合批量推理作业,因为资源仅在作业执行期间消耗。

...
@bentoml.service(
    resources={
        "gpu": 1,
    },
    traffic={
        "timeout": 30,
        "concurrency": 5,
        "external_queue": True,
    }
)
class RAGService:
    # Initialization setup
    ...

    @bentoml.api
    def ingest_pdf_batch(self, pdf: Annotated[Path, bentoml.validators.ContentType("application/pdf")]) -> str:

        import pypdf
        reader = pypdf.PdfReader(pdf)
        texts = []
        for page in reader.pages:
            text = page.extract_text()
            texts.append(text)
        all_text = "".join(texts)
        doc = Document(text=all_text)
        # Insert document into vector index and persist to storage
        if self.index is None:
            self.index = VectorStoreIndex.from_documents(
                [doc], storage_context=self.storage_context
            )
        else:
            self.index.insert(doc)

        self.index.storage_context.persist()
        return "Successfully Loaded Document"


    @bentoml.api
    def ingest_text_batch(self, txt: Annotated[Path, bentoml.validators.ContentType("text/plain")]) -> str:

        with open(txt) as f:
            text = f.read()

        doc = Document(text=text)

        # Insert document into vector index and persist to storage
        if self.index is None:
            self.index = VectorStoreIndex.from_documents(
                [doc], storage_context=self.storage_context
            )
        else:
            self.index.insert(doc)

        self.index.storage_context.persist()
        return "Successfully Loaded Document"

    @bentoml.api
    def query(self, query: str) -> str:
        # Implementation code for query handling
        ...

您可以在 rag-tutorials 仓库中找到完整的示例代码。

步骤 2:创建部署

要将此 BentoML 项目部署为批量作业,请创建一个脚本,使用特定的配置来启动部署。

import bentoml

# Define the path to your BentoML project or the Bento package
BENTO_PATH = "./path_to_your_project"
DEPLOYMENT_NAME = "my_batch_job"

# Create a Deployment
deployment = bentoml.deployment.create(
    bento=BENTO_PATH,
    name=DEPLOYMENT_NAME,
    scaling_min=1,
    scaling_max=3
)

# Optionally, wait for the Deployment to become ready
deployment.wait_until_ready(timeout=3600)

步骤 3:针对部署运行推理

一旦您的部署处于活动状态,您就可以通过创建调用其端点的客户端与其交互。下面是一个使用该客户端执行文件摄取任务的脚本。

import bentoml
from pathlib import Path

deployment = bentoml.deployment.get(name=DEPLOYMENT_NAME)

# Get synchronous HTTP client for the Deployment
client = deployment.get_client()
# Call the available endpoints to ingest files
result = client.ingest_text_batch(txt=Path("file_to_ingest.txt"))

步骤 4:清理

作业完成后,终止部署以节省资源非常重要。

import bentoml

# Clean-up: terminate the Deployment after job completion
bentoml.deployment.terminate(name=DEPLOYMENT_NAME)

# Optionally check and print the final status
final_status = bentoml.deployment.get(name=DEPLOYMENT_NAME).get_status()
print("Final status:", final_status.to_dict())

调度作业

要自动化和调度您的批量推理任务,您可以使用各种最适合您的操作环境和需求的作业调度工具。以下是一些常用的调度器