集成ChatGPT API看似简单,但要在生产环境中稳定运行,需要考虑错误处理、成本控制、响应优化等多个方面。本文总结了我在多个项目中积累的最佳实践。
OpenAI API可能会因为网络问题或服务器过载而失败,必须实现健壮的错误处理:
import openai
import time
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type((openai.error.RateLimitError, openai.error.APIError))
)
def call_openai_with_retry(messages, model="gpt-4"):
"""带重试机制的OpenAI API调用"""
try:
response = openai.ChatCompletion.create(
model=model,
messages=messages,
temperature=0.7,
max_tokens=1500
)
return response
except openai.error.RateLimitError as e:
print(f"Rate limit hit: {e}")
raise
except openai.error.APIError as e:
print(f"API error: {e}")
raise
except Exception as e:
print(f"Unexpected error: {e}")
return None
精确控制token使用是成本控制的关键:
import tiktoken
def count_tokens(text, model="gpt-4"):
"""计算文本的token数量"""
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
def optimize_prompt(system_prompt, user_prompt, max_tokens=4000):
"""优化prompt以适应token限制"""
system_tokens = count_tokens(system_prompt)
user_tokens = count_tokens(user_prompt)
total_tokens = system_tokens + user_tokens
if total_tokens > max_tokens:
# 截断用户输入
available = max_tokens - system_tokens - 100 # 保留余量
encoding = tiktoken.encoding_for_model("gpt-4")
tokens = encoding.encode(user_prompt)
truncated = encoding.decode(tokens[:available])
return system_prompt, truncated
return system_prompt, user_prompt
# 使用示例
system = "You are a helpful assistant."
user = "请详细解释量子计算的原理..." * 1000 # 超长文本
system, user = optimize_prompt(system, user)
对于长文本生成,使用流式响应提升用户体验:
import openai
def stream_chat_completion(messages):
"""流式获取ChatGPT响应"""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=messages,
stream=True,
temperature=0.7
)
collected_messages = []
for chunk in response:
if chunk.choices[0].delta.get('content'):
content = chunk.choices[0].delta.content
collected_messages.append(content)
print(content, end='', flush=True)
return ''.join(collected_messages)
# Flask SSE实现
from flask import Flask, Response, stream_with_context
app = Flask(__name__)
@app.route('/chat', methods=['POST'])
def chat():
def generate():
messages = request.json.get('messages', [])
response = openai.ChatCompletion.create(
model="gpt-4",
messages=messages,
stream=True
)
for chunk in response:
if chunk.choices[0].delta.get('content'):
yield f"data: {json.dumps({'content': chunk.choices[0].delta.content})}\n\n"
return Response(stream_with_context(generate()),
mimetype='text/event-stream')
有效管理对话上下文,避免token浪费:
class ConversationManager:
"""对话上下文管理器"""
def __init__(self, max_history=10, max_tokens=3000):
self.history = []
self.max_history = max_history
self.max_tokens = max_tokens
def add_message(self, role, content):
"""添加消息到历史记录"""
self.history.append({"role": role, "content": content})
self._trim_history()
def _trim_history(self):
"""修剪历史记录以适应token限制"""
while len(self.history) > self.max_history:
self.history.pop(0)
# 确保token数不超限
total_tokens = sum(count_tokens(msg["content"]) for msg in self.history)
while total_tokens > self.max_tokens and len(self.history) > 1:
self.history.pop(0)
total_tokens = sum(count_tokens(msg["content"]) for msg in self.history)
def get_messages(self):
"""获取当前对话上下文"""
return self.history.copy()
def clear(self):
"""清空对话历史"""
self.history = []
# 使用示例
conv = ConversationManager(max_history=5)
conv.add_message("user", "你好")
conv.add_message("assistant", "你好!有什么可以帮助你的?")
对重复查询实现缓存,显著降低成本:
import hashlib
import redis
import json
class OpenAICache:
"""OpenAI响应缓存"""
def __init__(self, redis_client):
self.redis = redis_client
self.ttl = 3600 # 1小时过期
def _generate_key(self, messages, model):
"""生成缓存key"""
content = json.dumps(messages, sort_keys=True) + model
return f"openai:{hashlib.md5(content.encode()).hexdigest()}"
def get(self, messages, model):
"""获取缓存响应"""
key = self._generate_key(messages, model)
cached = self.redis.get(key)
if cached:
return json.loads(cached)
return None
def set(self, messages, model, response):
"""缓存响应"""
key = self._generate_key(messages, model)
self.redis.setex(key, self.ttl, json.dumps(response))
# 使用缓存的调用
def cached_chat_completion(messages, model="gpt-4"):
cache = OpenAICache(redis_client)
# 检查缓存
cached = cache.get(messages, model)
if cached:
return cached
# 调用API
response = openai.ChatCompletion.create(
model=model,
messages=messages
)
# 缓存结果
cache.set(messages, model, response)
return response
根据任务复杂度选择合适的模型,平衡成本与质量:
def select_model_for_task(task_type, complexity="medium"):
"""根据任务类型选择最优模型"""
model_map = {
"simple": {
"classification": "gpt-3.5-turbo",
"extraction": "gpt-3.5-turbo",
"translation": "gpt-3.5-turbo"
},
"medium": {
"summarization": "gpt-3.5-turbo-16k",
"qa": "gpt-3.5-turbo",
"code": "gpt-3.5-turbo"
},
"complex": {
"reasoning": "gpt-4",
"creative": "gpt-4",
"analysis": "gpt-4"
}
}
return model_map.get(complexity, {}).get(task_type, "gpt-3.5-turbo")
# 成本对比(每1K tokens)
MODEL_PRICING = {
"gpt-3.5-turbo": {"input": 0.0015, "output": 0.002},
"gpt-3.5-turbo-16k": {"input": 0.003, "output": 0.004},
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-4-32k": {"input": 0.06, "output": 0.12}
}
建立完善的监控体系:
import time
import logging
from dataclasses import dataclass
from typing import Optional
@dataclass
class APIMetrics:
latency: float
tokens_input: int
tokens_output: int
model: str
success: bool
error_type: Optional[str] = None
def call_openai_with_metrics(messages, model="gpt-4"):
"""带监控的API调用"""
start_time = time.time()
try:
response = openai.ChatCompletion.create(
model=model,
messages=messages
)
latency = time.time() - start_time
metrics = APIMetrics(
latency=latency,
tokens_input=response['usage']['prompt_tokens'],
tokens_output=response['usage']['completion_tokens'],
model=model,
success=True
)
# 记录到监控
logging.info(f"OpenAI API call: {metrics}")
return response, metrics
except Exception as e:
latency = time.time() - start_time
metrics = APIMetrics(
latency=latency,
tokens_input=0,
tokens_output=0,
model=model,
success=False,
error_type=type(e).__name__
)
logging.error(f"OpenAI API error: {metrics}")
raise
生产环境的ChatGPT API集成需要关注:
💡 工具推荐:如果你在开发中需要处理大量数据文件,推荐试试DataForge Pro——一个轻量级Python数据处理工具。它比Excel快100倍,支持百万行数据的实时搜索和转换,而且完全离线运行,非常适合配合AI API进行数据预处理。
本文首发于 WD Tech Blog,转载请注明出处。