批量推理作业¶
某些 AI 驱动的任务最适合批量推理,例如 RAG 系统的嵌入生成、推荐系统的定期更新,或用于特征提取的批量图像处理。
使用 BentoML 和 BentoCloud,您可以高效地管理这些批量推理作业,并具有以下几个关键优势
按需部署:仅在需要时部署模型,并在作业完成后终止部署,确保您只为您使用的资源付费。您可以运行一次性或定期批量推理作业。
自动扩展:根据作业的流量需求自动扩展您的资源。
用于推理的专用硬件:在专用 GPU 上运行模型推理,确保推理任务不干扰批处理。
本文档解释了如何使用 BentoML 和 BentoCloud 运行批量推理作业。
创建作业¶
以下示例演示了作业执行的完整生命周期。
步骤 1:准备 BentoML 项目¶
确保您已有一个现有的 BentoML 项目或 Bento。下面的示例是 RAG 系统中典型的 BentoML Service 设置,其中端点 ingest_pdf_batch
和 ingest_text_batch
用于文件的批量摄取。它们可以计算文档的嵌入并将其写入向量数据库进行索引。与可能需要持续可用的常规服务不同,这些端点可以按需激活,这使得它们非常适合批量推理作业,因为资源仅在作业执行期间消耗。
...
@bentoml.service(
resources={
"gpu": 1,
},
traffic={
"timeout": 30,
"concurrency": 5,
"external_queue": True,
}
)
class RAGService:
# Initialization setup
...
@bentoml.api
def ingest_pdf_batch(self, pdf: Annotated[Path, bentoml.validators.ContentType("application/pdf")]) -> str:
import pypdf
reader = pypdf.PdfReader(pdf)
texts = []
for page in reader.pages:
text = page.extract_text()
texts.append(text)
all_text = "".join(texts)
doc = Document(text=all_text)
# Insert document into vector index and persist to storage
if self.index is None:
self.index = VectorStoreIndex.from_documents(
[doc], storage_context=self.storage_context
)
else:
self.index.insert(doc)
self.index.storage_context.persist()
return "Successfully Loaded Document"
@bentoml.api
def ingest_text_batch(self, txt: Annotated[Path, bentoml.validators.ContentType("text/plain")]) -> str:
with open(txt) as f:
text = f.read()
doc = Document(text=text)
# Insert document into vector index and persist to storage
if self.index is None:
self.index = VectorStoreIndex.from_documents(
[doc], storage_context=self.storage_context
)
else:
self.index.insert(doc)
self.index.storage_context.persist()
return "Successfully Loaded Document"
@bentoml.api
def query(self, query: str) -> str:
# Implementation code for query handling
...
您可以在 rag-tutorials 仓库中找到完整的示例代码。
步骤 2:创建部署¶
要将此 BentoML 项目部署为批量作业,请创建一个脚本,使用特定的配置来启动部署。
import bentoml
# Define the path to your BentoML project or the Bento package
BENTO_PATH = "./path_to_your_project"
DEPLOYMENT_NAME = "my_batch_job"
# Create a Deployment
deployment = bentoml.deployment.create(
bento=BENTO_PATH,
name=DEPLOYMENT_NAME,
scaling_min=1,
scaling_max=3
)
# Optionally, wait for the Deployment to become ready
deployment.wait_until_ready(timeout=3600)
步骤 3:针对部署运行推理¶
一旦您的部署处于活动状态,您就可以通过创建调用其端点的客户端与其交互。下面是一个使用该客户端执行文件摄取任务的脚本。
import bentoml
from pathlib import Path
deployment = bentoml.deployment.get(name=DEPLOYMENT_NAME)
# Get synchronous HTTP client for the Deployment
client = deployment.get_client()
# Call the available endpoints to ingest files
result = client.ingest_text_batch(txt=Path("file_to_ingest.txt"))
步骤 4:清理¶
作业完成后,终止部署以节省资源非常重要。
import bentoml
# Clean-up: terminate the Deployment after job completion
bentoml.deployment.terminate(name=DEPLOYMENT_NAME)
# Optionally check and print the final status
final_status = bentoml.deployment.get(name=DEPLOYMENT_NAME).get_status()
print("Final status:", final_status.to_dict())
调度作业¶
要自动化和调度您的批量推理任务,您可以使用各种最适合您的操作环境和需求的作业调度工具。以下是一些常用的调度器