引言

集成ChatGPT API看似简单,但要在生产环境中稳定运行,需要考虑错误处理、成本控制、响应优化等多个方面。本文总结了我在多个项目中积累的最佳实践。

1. 错误处理与重试机制

OpenAI API可能会因为网络问题或服务器过载而失败,必须实现健壮的错误处理:

import openai
import time
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10),
    retry=retry_if_exception_type((openai.error.RateLimitError, openai.error.APIError))
)
def call_openai_with_retry(messages, model="gpt-4"):
    """带重试机制的OpenAI API调用"""
    try:
        response = openai.ChatCompletion.create(
            model=model,
            messages=messages,
            temperature=0.7,
            max_tokens=1500
        )
        return response
    except openai.error.RateLimitError as e:
        print(f"Rate limit hit: {e}")
        raise
    except openai.error.APIError as e:
        print(f"API error: {e}")
        raise
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None

2. Token管理与成本控制

精确控制token使用是成本控制的关键:

import tiktoken

def count_tokens(text, model="gpt-4"):
    """计算文本的token数量"""
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

def optimize_prompt(system_prompt, user_prompt, max_tokens=4000):
    """优化prompt以适应token限制"""
    system_tokens = count_tokens(system_prompt)
    user_tokens = count_tokens(user_prompt)
    total_tokens = system_tokens + user_tokens
    
    if total_tokens > max_tokens:
        # 截断用户输入
        available = max_tokens - system_tokens - 100  # 保留余量
        encoding = tiktoken.encoding_for_model("gpt-4")
        tokens = encoding.encode(user_prompt)
        truncated = encoding.decode(tokens[:available])
        return system_prompt, truncated
    
    return system_prompt, user_prompt

# 使用示例
system = "You are a helpful assistant."
user = "请详细解释量子计算的原理..." * 1000  # 超长文本

system, user = optimize_prompt(system, user)

3. 流式响应实现

对于长文本生成,使用流式响应提升用户体验:

import openai

def stream_chat_completion(messages):
    """流式获取ChatGPT响应"""
    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=messages,
        stream=True,
        temperature=0.7
    )
    
    collected_messages = []
    for chunk in response:
        if chunk.choices[0].delta.get('content'):
            content = chunk.choices[0].delta.content
            collected_messages.append(content)
            print(content, end='', flush=True)
    
    return ''.join(collected_messages)

# Flask SSE实现
from flask import Flask, Response, stream_with_context

app = Flask(__name__)

@app.route('/chat', methods=['POST'])
def chat():
    def generate():
        messages = request.json.get('messages', [])
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=messages,
            stream=True
        )
        
        for chunk in response:
            if chunk.choices[0].delta.get('content'):
                yield f"data: {json.dumps({'content': chunk.choices[0].delta.content})}\n\n"
    
    return Response(stream_with_context(generate()), 
                   mimetype='text/event-stream')

4. 上下文管理

有效管理对话上下文,避免token浪费:

class ConversationManager:
    """对话上下文管理器"""
    
    def __init__(self, max_history=10, max_tokens=3000):
        self.history = []
        self.max_history = max_history
        self.max_tokens = max_tokens
    
    def add_message(self, role, content):
        """添加消息到历史记录"""
        self.history.append({"role": role, "content": content})
        self._trim_history()
    
    def _trim_history(self):
        """修剪历史记录以适应token限制"""
        while len(self.history) > self.max_history:
            self.history.pop(0)
        
        # 确保token数不超限
        total_tokens = sum(count_tokens(msg["content"]) for msg in self.history)
        while total_tokens > self.max_tokens and len(self.history) > 1:
            self.history.pop(0)
            total_tokens = sum(count_tokens(msg["content"]) for msg in self.history)
    
    def get_messages(self):
        """获取当前对话上下文"""
        return self.history.copy()
    
    def clear(self):
        """清空对话历史"""
        self.history = []

# 使用示例
conv = ConversationManager(max_history=5)
conv.add_message("user", "你好")
conv.add_message("assistant", "你好!有什么可以帮助你的?")

5. 缓存策略

对重复查询实现缓存,显著降低成本:

import hashlib
import redis
import json

class OpenAICache:
    """OpenAI响应缓存"""
    
    def __init__(self, redis_client):
        self.redis = redis_client
        self.ttl = 3600  # 1小时过期
    
    def _generate_key(self, messages, model):
        """生成缓存key"""
        content = json.dumps(messages, sort_keys=True) + model
        return f"openai:{hashlib.md5(content.encode()).hexdigest()}"
    
    def get(self, messages, model):
        """获取缓存响应"""
        key = self._generate_key(messages, model)
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)
        return None
    
    def set(self, messages, model, response):
        """缓存响应"""
        key = self._generate_key(messages, model)
        self.redis.setex(key, self.ttl, json.dumps(response))

# 使用缓存的调用
def cached_chat_completion(messages, model="gpt-4"):
    cache = OpenAICache(redis_client)
    
    # 检查缓存
    cached = cache.get(messages, model)
    if cached:
        return cached
    
    # 调用API
    response = openai.ChatCompletion.create(
        model=model,
        messages=messages
    )
    
    # 缓存结果
    cache.set(messages, model, response)
    return response

6. 模型选择策略

根据任务复杂度选择合适的模型,平衡成本与质量:

def select_model_for_task(task_type, complexity="medium"):
    """根据任务类型选择最优模型"""
    
    model_map = {
        "simple": {
            "classification": "gpt-3.5-turbo",
            "extraction": "gpt-3.5-turbo",
            "translation": "gpt-3.5-turbo"
        },
        "medium": {
            "summarization": "gpt-3.5-turbo-16k",
            "qa": "gpt-3.5-turbo",
            "code": "gpt-3.5-turbo"
        },
        "complex": {
            "reasoning": "gpt-4",
            "creative": "gpt-4",
            "analysis": "gpt-4"
        }
    }
    
    return model_map.get(complexity, {}).get(task_type, "gpt-3.5-turbo")

# 成本对比(每1K tokens)
MODEL_PRICING = {
    "gpt-3.5-turbo": {"input": 0.0015, "output": 0.002},
    "gpt-3.5-turbo-16k": {"input": 0.003, "output": 0.004},
    "gpt-4": {"input": 0.03, "output": 0.06},
    "gpt-4-32k": {"input": 0.06, "output": 0.12}
}

7. 监控与日志

建立完善的监控体系:

import time
import logging
from dataclasses import dataclass
from typing import Optional

@dataclass
class APIMetrics:
    latency: float
    tokens_input: int
    tokens_output: int
    model: str
    success: bool
    error_type: Optional[str] = None

def call_openai_with_metrics(messages, model="gpt-4"):
    """带监控的API调用"""
    start_time = time.time()
    
    try:
        response = openai.ChatCompletion.create(
            model=model,
            messages=messages
        )
        
        latency = time.time() - start_time
        metrics = APIMetrics(
            latency=latency,
            tokens_input=response['usage']['prompt_tokens'],
            tokens_output=response['usage']['completion_tokens'],
            model=model,
            success=True
        )
        
        # 记录到监控
        logging.info(f"OpenAI API call: {metrics}")
        
        return response, metrics
        
    except Exception as e:
        latency = time.time() - start_time
        metrics = APIMetrics(
            latency=latency,
            tokens_input=0,
            tokens_output=0,
            model=model,
            success=False,
            error_type=type(e).__name__
        )
        logging.error(f"OpenAI API error: {metrics}")
        raise

总结

生产环境的ChatGPT API集成需要关注:

  1. 健壮的错误处理 - 重试机制必不可少
  2. 精确的token管理 - 使用tiktoken计算和控制
  3. 流式响应 - 提升长文本生成的用户体验
  4. 上下文管理 - 智能修剪历史记录
  5. 响应缓存 - 降低重复查询成本
  6. 模型选择 - 根据任务选择性价比最优的模型
  7. 全面监控 - 记录延迟、token使用、错误率

💡 工具推荐:如果你在开发中需要处理大量数据文件,推荐试试DataForge Pro——一个轻量级Python数据处理工具。它比Excel快100倍,支持百万行数据的实时搜索和转换,而且完全离线运行,非常适合配合AI API进行数据预处理。


本文首发于 WD Tech Blog,转载请注明出处。