流式响应

流式处理允许您通过将大型或增量数据分块发送到客户端来高效地处理它们,从而提供更好的实时用户体验。BentoML 支持流式响应用于各种应用,例如大型语言模型 (LLM) 输出和音频合成。

LLM 输出

在 BentoML 中,您可以使用 Python 生成器流式传输 LLM 输出。以下是一个使用 OpenAI API 的示例

service.py
import bentoml
from typing import Literal, Generator
from pydantic import BaseModel

# Define message structure for LLM input
class Message(BaseModel):
    content: str
    role: Literal['assistant', 'user', 'system']

@bentoml.service
class LLMExample:
    def __init__(self) -> None:
        # Initialize your model configuration
        # A dummy example here
        self.model_id = MODEL_ID

    @bentoml.api
    async def generate(self, prompt: str) -> Generator[str, None, None]:
        # Yields text chunks from the LLM response
        from openai import AsyncOpenAI

        # Initialize OpenAI client
        client = AsyncOpenAI()
        message = Message(role="user", content=prompt)

        # Call OpenAI's chat completion API with streaming enabled
        completion = await client.chat.completions.create(
            model=self.model_id,
            messages=[message.model_dump()], # type: ignore
            stream=True,
        )

        # Stream and yield the response chunks
        async for chunk in completion:
            yield chunk.choices[0].delta.content or ""

有关更多实际示例,请参阅 如何使用 BentoML 和 vLLM 提供不同的 LLM 服务

音频字节

音频流对于文本转语音 (TTS)、实时语音助手和实时音频处理等应用至关重要。这些用例通常需要构建一个 WebSocket 服务器来将音频数据流式传输到客户端。

以下是一个配置 WebSocket 服务器以便在 BentoML 中流式传输音频字节的示例。

service.py
import bentoml
from fastapi import FastAPI, WebSocket
from typing import Generator

# Create a FastAPI app
app = FastAPI()

@bentoml.service
@bentoml.asgi_app(app) # Integrate FastAPI app with BentoML
class TTSExample:
    def __init__(self) -> None:
        # Initialize your TTS engine here
        self.engine = self.setup_tts_engine()

    def setup_tts_engine(self):
        # Configure your TTS engine here
        pass

    def synthesize(self, text: str) -> Generator[bytes, None, None]:
        # Implement your TTS logic here
        pass

    # Define a WebSocket endpoint for streaming audio
    @app.websocket("/ws")
    async def speech(self, websocket: WebSocket):
        await websocket.accept()
        try:
            while True:
                # Receive text from client
                data = await websocket.receive_text()
                # Stream audio chunks back to client
                for chunk in self.engine.synthesize(data):
                    await websocket.send_bytes(chunk)
        except Exception as e:
            print(f"Error in WebSocket connection: {e}")
        finally:
            await websocket.close()

了解更多

有关更多实际示例,请参阅 如何使用开源模型构建语音助手