模型组合

模型组合允许您结合多个模型来构建复杂的 AI 应用,例如 RAG 和 AI Agent。BentoML 提供了简单的服务 API,用于创建模型需要协同工作(按顺序或并行)的工作流。

BentoML multi-model composition diagram - Inference Graph

您可能需要在以下情况下使用 BentoML 中的模型组合:

  • 使用不同的模型一起处理不同类型的数据(例如,图像和文本)

  • 通过组合多个模型的结果来提高准确性和性能

  • 在专用硬件(例如,GPU 和 CPU)上运行不同的模型

  • 使用专用模型或服务编排序列步骤,如预处理、推理和后处理

另请参见

更多信息,请参阅博客文章 A Guide to Model Composition

示例

BentoML 中的模型组合可以涉及单个或多个服务,具体取决于您的应用。

对于每个服务,您可以在 @bentoml.service 装饰器中使用 resources 来配置部署所需的资源,例如 GPU。请注意,此字段仅在 BentoCloud 上生效

在一个服务中运行多个模型

您可以在同一硬件设备上运行多个模型,并为它们公开单独或组合的 API。

import bentoml
from bentoml.models import HuggingFaceModel
from transformers import pipeline
from typing import List

# Run two models in the same Service on the same hardware device
@bentoml.service(
    resources={"gpu": 1, "memory": "4GiB"},
    traffic={"timeout": 20},
)
class MultiModelService:
    # Retrieve model references from HF by specifying its HF ID
    model_a_path = HuggingFaceModel("FacebookAI/roberta-large-mnli")
    model_b_path = HuggingFaceModel("distilbert/distilbert-base-uncased")

    def __init__(self) -> None:
        # Initialize pipelines for each model
        self.pipeline_a = pipeline(task="zero-shot-classification", model=self.model_a_path, hypothesis_template="This text is about {}")
        self.pipeline_b = pipeline(task="sentiment-analysis", model=self.model_b_path)

    # Define an API for data processing with model A
    @bentoml.api
    def process_a(self, input_data: str, labels: List[str] = ["positive", "negative", "neutral"]) -> dict:
        return self.pipeline_a(input_data, labels)

    # Define an API for data processing with model B
    @bentoml.api
    def process_b(self, input_data: str) -> dict:
        return self.pipeline_b(input_data)[0]

    # Define an API endpoint that combines the processing of both models
    @bentoml.api
    def combined_process(self, input_data: str, labels: List[str] = ["positive", "negative", "neutral"]) -> dict:
        classification = self.pipeline_a(input_data, labels)
        sentiment = self.pipeline_b(input_data)[0]
        return {
            "classification": classification,
            "sentiment": sentiment
        }

注意

HuggingFaceModel 函数以字符串形式返回下载的模型路径。您必须传入 Hugging Face 上显示的模型 ID(例如,HuggingFaceModel("FacebookAI/roberta-large-mnli"))。详情请参阅加载和管理模型

在单独的服务中独立运行和扩展多个模型

当您的模型需要独立扩展或不同的硬件时,将它们拆分为单独的服务。

顺序执行

您可以让模型按顺序工作,其中一个模型的输出成为另一个模型的输入。这对于创建需要先对数据进行预处理再用于预测的流水线非常有用。

import bentoml
from bentoml.models import HuggingFaceModel
from transformers import pipeline
from typing import Dict, Any


@bentoml.service(resources={"cpu": "2", "memory": "2Gi"})
class PreprocessingService:
    model_a_path = HuggingFaceModel("distilbert/distilbert-base-uncased")

    def __init__(self) -> None:
        # Initialize pipeline for model A
        self.pipeline_a = pipeline(task="text-classification", model=self.model_a_path)

    @bentoml.api
    def preprocess(self, input_data: str) -> Dict[str, Any]:
        # Dummy preprocessing steps
        return self.pipeline_a(input_data)[0]


@bentoml.service(resources={"gpu": 1, "memory": "4Gi"})
class InferenceService:
    model_b_path = HuggingFaceModel("distilbert/distilroberta-base")
    preprocessing_service = bentoml.depends(PreprocessingService)

    def __init__(self) -> None:
        # Initialize pipeline for model B
        self.pipeline_b = pipeline(task="text-classification", model=self.model_b_path)

    @bentoml.api
    async def predict(self, input_data: str) -> Dict[str, Any]:
        # Dummy inference on preprocessed data
        # Implement your custom logic here
        preprocessed_data = await self.preprocessing_service.to_async.preprocess(input_data)
        final_result = self.pipeline_b(input_data)[0]
        return {
            "preprocessing_result": preprocessed_data,
            "final_result": final_result
        }

您可以使用 bentoml.depends 从一个服务访问另一个服务。它接受依赖的服务类作为参数,并允许您调用其可用的函数。详情请参阅运行分布式服务

您可以使用服务的 .to_async 属性将同步方法转换为异步。请注意,不建议在异步上下文中直接调用同步阻塞函数,因为它可能会阻塞事件循环。

并行执行

您可以同时运行多个独立模型,然后组合它们的结果。这对于集成模型非常有用,您可以在其中聚合来自不同模型的预测结果以提高准确性。

import asyncio
import bentoml
from bentoml.models import HuggingFaceModel
from transformers import pipeline
from typing import Dict, Any, List

@bentoml.service(resources={"gpu": 1, "memory": "4Gi"})
class ModelAService:
    model_a_path = HuggingFaceModel("FacebookAI/roberta-large-mnli")

    def __init__(self) -> None:
        # Initialize pipeline for model A
        self.pipeline_a = pipeline(task="zero-shot-classification", model=self.model_a_path, hypothesis_template="This text is about {}")

    @bentoml.api
    def predict(self, input_data: str, labels: List[str] = ["positive", "negative", "neutral"]) -> Dict[str, Any]:
        # Dummy preprocessing steps
        return self.pipeline_a(input_data, labels)

@bentoml.service(resources={"gpu": 1, "memory": "4Gi"})
class ModelBService:
    model_b_path = HuggingFaceModel("distilbert/distilbert-base-uncased")

    def __init__(self) -> None:
        # Initialize pipeline for model B
        self.pipeline_b = pipeline(task="sentiment-analysis", model=self.model_b_path)

    @bentoml.api
    def predict(self, input_data: str) -> Dict[str, Any]:
        # Dummy preprocessing steps
        return self.pipeline_b(input_data)[0]

@bentoml.service(resources={"cpu": "4", "memory": "8Gi"})
class EnsembleService:
    service_a = bentoml.depends(ModelAService)
    service_b = bentoml.depends(ModelBService)

    @bentoml.api
    async def ensemble_predict(self, input_data: str, labels: List[str] = ["positive", "negative", "neutral"]) -> Dict[str, Any]:
        result_a, result_b = await asyncio.gather(
            self.service_a.to_async.predict(input_data, labels),
            self.service_b.to_async.predict(input_data)
        )
        # Dummy aggregation
        return {
            "zero_shot_classification": result_a,
            "sentiment_analysis": result_b
        }

推理图

您可以创建结合并行和顺序处理的更复杂工作流。

import asyncio
import typing as t

import transformers

import bentoml

MAX_LENGTH = 128
NUM_RETURN_SEQUENCE = 1

@bentoml.service(
    resources={"gpu": 1, "memory": "4Gi"}
)
class GPT2:
    model_path = bentoml.models.HuggingFaceModel("openai-community/gpt2")

    def __init__(self):
        self.generation_pipeline_1 = transformers.pipeline(
            task="text-generation",
            model=self.model_path,
        )

    @bentoml.api
    def generate(self, sentence: str) -> t.List[t.Any]:
        return self.generation_pipeline_1(sentence)

@bentoml.service(
    resources={"gpu": 1, "memory": "4Gi"}
)
class DistilGPT2:
    model_path = bentoml.models.HuggingFaceModel("distilbert/distilgpt2")

    def __init__(self):
        self.generation_pipeline_2 = transformers.pipeline(
            task="text-generation",
            model=self.model_path,
        )

    @bentoml.api
    def generate(self, sentence: str) -> t.List[t.Any]:
        return self.generation_pipeline_2(sentence)

@bentoml.service(
    resources={"cpu": "2", "memory": "2Gi"}
)
class BertBaseUncased:
    model_path = bentoml.models.HuggingFaceModel("google-bert/bert-base-uncased")

    def __init__(self):
        self.classification_pipeline = transformers.pipeline(
            task="text-classification",
            model=self.model_path,
            tokenizer=self.model_path,
        )

    @bentoml.api
    def classify_generated_texts(self, sentence: str) -> float | str:
        score = self.classification_pipeline(sentence)[0]["score"]  # type: ignore
        return score

@bentoml.service(
    resources={"cpu": "4", "memory": "8Gi"}
)
class InferenceGraph:
    gpt2_generator = bentoml.depends(GPT2)
    distilgpt2_generator = bentoml.depends(DistilGPT2)
    bert_classifier = bentoml.depends(BertBaseUncased)

    @bentoml.api
    async def generate_score(
        self, original_sentence: str = "I have an idea!"
    ) -> t.List[t.Dict[str, t.Any]]:
        generated_sentences = [  # type: ignore
            result[0]["generated_text"]
            for result in await asyncio.gather(  # type: ignore
                self.gpt2_generator.to_async.generate(  # type: ignore
                    original_sentence,
                    max_length=MAX_LENGTH,
                    num_return_sequences=NUM_RETURN_SEQUENCE,
                ),
                self.distilgpt2_generator.to_async.generate(  # type: ignore
                    original_sentence,
                    max_length=MAX_LENGTH,
                    num_return_sequences=NUM_RETURN_SEQUENCE,
                ),
            )
        ]

        results = []
        for sentence in generated_sentences:  # type: ignore
            score = await self.bert_classifier.to_async.classify_generated_texts(
                sentence
            )  # type: ignore
            results.append(
                {
                    "generated": sentence,
                    "score": score,
                }
            )

        return results

此示例创建一个工作流,该工作流:

  1. 接受文本提示作为输入

  2. 并行使用 GPT2 和 DistilGPT2 生成新文本

  3. 顺序使用 BERT 对每个生成的文本响应进行评分

  4. 返回生成的文本及其得分

注意

在某些情况下,您可能希望将一个 LLM 的输出直接流式传输到另一个 LLM 作为输入,以构建复合 LLM 系统。目前 BentoML 尚不支持此功能,但已列入其路线图。如果您对此主题感兴趣,欢迎加入我们的 BentoML Slack 社区讨论,或在 GitHub 上提出问题