API调用最佳实践
概述
在实际应用中,如何高效、稳定、经济地调用大模型API是一个重要课题。本章将介绍API调用的最佳实践,包括请求优化、并发控制、流式输出、缓存策略等,帮助你构建生产级的AI应用。
请求优化
合理设置参数
python
from openai import OpenAI
client = OpenAI()
def optimized_request(prompt: str, task_type: str = "general"):
configs = {
"creative": {
"temperature": 0.9,
"top_p": 0.95,
"presence_penalty": 0.6,
"frequency_penalty": 0.5
},
"analytical": {
"temperature": 0.3,
"top_p": 0.9,
"presence_penalty": 0.1,
"frequency_penalty": 0.1
},
"code": {
"temperature": 0.2,
"top_p": 0.9,
"presence_penalty": 0.0,
"frequency_penalty": 0.0
},
"general": {
"temperature": 0.7,
"top_p": 0.9
}
}
config = configs.get(task_type, configs["general"])
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
max_tokens=1000,
**config
)
return response.choices[0].message.content
creative_text = optimized_request("写一首诗", "creative")
code_result = optimized_request("实现快速排序", "code")精简Prompt
python
def trim_prompt(prompt: str, max_length: int = 4000) -> str:
if len(prompt) <= max_length:
return prompt
lines = prompt.split('\n')
result = []
current_length = 0
for line in lines:
if current_length + len(line) + 1 <= max_length:
result.append(line)
current_length += len(line) + 1
else:
break
return '\n'.join(result) + '\n...[内容已截断]'
long_prompt = "..." * 5000
trimmed = trim_prompt(long_prompt)批量处理
python
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def process_single(prompt: str):
response = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
async def process_batch(prompts: list, batch_size: int = 5):
results = []
for i in range(0, len(prompts), batch_size):
batch = prompts[i:i + batch_size]
tasks = [process_single(prompt) for prompt in batch]
batch_results = await asyncio.gather(*tasks)
results.extend(batch_results)
await asyncio.sleep(1)
return results
prompts = [
"翻译:Hello",
"翻译:World",
"翻译:AI"
]
results = asyncio.run(process_batch(prompts))并发控制
限流器
python
import asyncio
import time
from collections import deque
class RateLimiter:
def __init__(self, max_requests: int, time_window: float):
self.max_requests = max_requests
self.time_window = time_window
self.requests = deque()
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
now = time.time()
while self.requests and self.requests[0] <= now - self.time_window:
self.requests.popleft()
if len(self.requests) >= self.max_requests:
sleep_time = self.requests[0] - (now - self.time_window)
await asyncio.sleep(sleep_time)
self.requests.append(now)
limiter = RateLimiter(max_requests=10, time_window=60.0)
async def rate_limited_request(prompt: str):
await limiter.acquire()
response = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content信号量控制
python
class ConcurrencyController:
def __init__(self, max_concurrent: int = 5):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def execute(self, func, *args, **kwargs):
async with self.semaphore:
return await func(*args, **kwargs)
controller = ConcurrencyController(max_concurrent=3)
async def controlled_batch_request(prompts: list):
tasks = [
controller.execute(process_single, prompt)
for prompt in prompts
]
return await asyncio.gather(*tasks)队列处理
python
import asyncio
from typing import Callable, Any
class RequestQueue:
def __init__(self, processor: Callable, max_workers: int = 3):
self.queue = asyncio.Queue()
self.processor = processor
self.max_workers = max_workers
self.workers = []
async def add_request(self, data: Any):
await self.queue.put(data)
async def worker(self, worker_id: int):
while True:
try:
data = await asyncio.wait_for(
self.queue.get(),
timeout=1.0
)
result = await self.processor(data)
print(f"Worker {worker_id} processed: {result[:50]}...")
self.queue.task_done()
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"Worker {worker_id} error: {e}")
async def start(self):
self.workers = [
asyncio.create_task(self.worker(i))
for i in range(self.max_workers)
]
async def stop(self):
await self.queue.join()
for worker in self.workers:
worker.cancel()
async def process_item(item: dict):
response = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": item["prompt"]}]
)
return response.choices[0].message.content
queue = RequestQueue(process_item, max_workers=3)
async def main():
await queue.start()
for i in range(10):
await queue.add_request({"prompt": f"问题{i}: 什么是AI?"})
await asyncio.sleep(10)
await queue.stop()
asyncio.run(main())流式输出
基础流式处理
python
def stream_response(prompt: str):
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
print(content, end="", flush=True)
print()
return full_response
stream_response("写一个Python教程")异步流式处理
python
async def async_stream_response(prompt: str):
stream = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True
)
full_response = ""
async for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
yield content
return full_response
async def process_stream():
async for content in async_stream_response("讲一个故事"):
print(content, end="", flush=True)
asyncio.run(process_stream())流式输出到文件
python
async def stream_to_file(prompt: str, output_file: str):
stream = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True
)
with open(output_file, 'w', encoding='utf-8') as f:
async for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
f.write(content)
f.flush()
asyncio.run(stream_to_file(
"写一篇关于AI的文章",
"output.txt"
))缓存策略
简单内存缓存
python
import hashlib
from functools import wraps
class SimpleCache:
def __init__(self):
self.cache = {}
def _hash_key(self, messages: list) -> str:
content = str(messages)
return hashlib.md5(content.encode()).hexdigest()
def get(self, messages: list):
key = self._hash_key(messages)
return self.cache.get(key)
def set(self, messages: list, response: str):
key = self._hash_key(messages)
self.cache[key] = response
def clear(self):
self.cache.clear()
cache = SimpleCache()
def cached_chat(messages: list, use_cache: bool = True):
if use_cache:
cached_response = cache.get(messages)
if cached_response:
print("使用缓存响应")
return cached_response
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
result = response.choices[0].message.content
cache.set(messages, result)
return resultRedis缓存
python
import redis
import json
import hashlib
class RedisCache:
def __init__(self, host='localhost', port=6379, db=0):
self.client = redis.Redis(host=host, port=port, db=db)
self.ttl = 3600
def _hash_key(self, messages: list) -> str:
content = json.dumps(messages, ensure_ascii=False)
return f"llm_cache:{hashlib.md5(content.encode()).hexdigest()}"
def get(self, messages: list):
key = self._hash_key(messages)
cached = self.client.get(key)
if cached:
return json.loads(cached)
return None
def set(self, messages: list, response: str):
key = self._hash_key(messages)
self.client.setex(key, self.ttl, json.dumps(response, ensure_ascii=False))
def delete(self, messages: list):
key = self._hash_key(messages)
self.client.delete(key)
redis_cache = RedisCache()
def redis_cached_chat(messages: list):
cached = redis_cache.get(messages)
if cached:
return cached
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
result = response.choices[0].message.content
redis_cache.set(messages, result)
return result语义缓存
python
import numpy as np
from typing import List, Tuple
class SemanticCache:
def __init__(self, similarity_threshold: float = 0.95):
self.cache: List[Tuple[np.ndarray, str]] = []
self.similarity_threshold = similarity_threshold
def cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
def get_embedding(self, text: str) -> np.ndarray:
response = client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return np.array(response.data[0].embedding)
def get(self, query: str) -> str:
query_embedding = self.get_embedding(query)
for cached_embedding, cached_response in self.cache:
similarity = self.cosine_similarity(query_embedding, cached_embedding)
if similarity >= self.similarity_threshold:
print(f"找到相似缓存 (相似度: {similarity:.4f})")
return cached_response
return None
def set(self, query: str, response: str):
embedding = self.get_embedding(query)
self.cache.append((embedding, response))
semantic_cache = SemanticCache()
def semantically_cached_chat(query: str):
cached = semantic_cache.get(query)
if cached:
return cached
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": query}]
)
result = response.choices[0].message.content
semantic_cache.set(query, result)
return result超时与重试
超时设置
python
import httpx
client_with_timeout = OpenAI(
timeout=httpx.Timeout(60.0, connect=5.0)
)
try:
response = client_with_timeout.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "你好"}]
)
except Exception as e:
print(f"请求超时: {e}")重试机制
python
import time
from functools import wraps
def retry(max_retries: int = 3, delay: float = 1.0, backoff: float = 2.0):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
current_delay = delay
while retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
retries += 1
if retries >= max_retries:
raise e
print(f"第{retries}次重试,等待{current_delay}秒...")
time.sleep(current_delay)
current_delay *= backoff
return None
return wrapper
return decorator
@retry(max_retries=3, delay=1.0, backoff=2.0)
def robust_chat(messages: list):
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
return response.choices[0].message.content指数退避
python
import random
def exponential_backoff(
func,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0
):
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise e
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
total_delay = delay + jitter
print(f"重试 {attempt + 1}/{max_retries}, 等待 {total_delay:.2f}秒")
time.sleep(total_delay)
return wrapper监控与日志
请求日志
python
import logging
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def logged_chat(messages: list, **kwargs):
start_time = time.time()
logger.info(f"请求开始 - 消息数: {len(messages)}")
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
**kwargs
)
elapsed = time.time() - start_time
usage = response.usage
logger.info(
f"请求成功 - "
f"耗时: {elapsed:.2f}s, "
f"输入Token: {usage.prompt_tokens}, "
f"输出Token: {usage.completion_tokens}"
)
return response.choices[0].message.content
except Exception as e:
elapsed = time.time() - start_time
logger.error(f"请求失败 - 耗时: {elapsed:.2f}s, 错误: {e}")
raise性能监控
python
from dataclasses import dataclass
from typing import List
import statistics
@dataclass
class RequestMetric:
timestamp: float
duration: float
input_tokens: int
output_tokens: int
success: bool
class PerformanceMonitor:
def __init__(self):
self.metrics: List[RequestMetric] = []
def record(self, metric: RequestMetric):
self.metrics.append(metric)
def get_stats(self, window_seconds: int = 3600):
now = time.time()
recent_metrics = [
m for m in self.metrics
if now - m.timestamp <= window_seconds
]
if not recent_metrics:
return None
durations = [m.duration for m in recent_metrics]
success_rate = sum(1 for m in recent_metrics if m.success) / len(recent_metrics)
return {
"total_requests": len(recent_metrics),
"success_rate": success_rate,
"avg_duration": statistics.mean(durations),
"median_duration": statistics.median(durations),
"p95_duration": statistics.quantiles(durations, n=20)[18] if len(durations) >= 20 else max(durations),
"total_input_tokens": sum(m.input_tokens for m in recent_metrics),
"total_output_tokens": sum(m.output_tokens for m in recent_metrics)
}
monitor = PerformanceMonitor()
def monitored_chat(messages: list):
start_time = time.time()
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
metric = RequestMetric(
timestamp=start_time,
duration=time.time() - start_time,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
success=True
)
monitor.record(metric)
return response.choices[0].message.content
except Exception as e:
metric = RequestMetric(
timestamp=start_time,
duration=time.time() - start_time,
input_tokens=0,
output_tokens=0,
success=False
)
monitor.record(metric)
raise成本控制
Token预算控制
python
class BudgetController:
def __init__(self, daily_budget: float, price_per_1k_tokens: float = 0.00015):
self.daily_budget = daily_budget
self.price_per_1k_tokens = price_per_1k_tokens
self.daily_usage = 0.0
self.last_reset = time.time()
def check_budget(self, estimated_tokens: int) -> bool:
if time.time() - self.last_reset > 86400:
self.daily_usage = 0.0
self.last_reset = time.time()
estimated_cost = (estimated_tokens / 1000) * self.price_per_1k_tokens
return (self.daily_usage + estimated_cost) <= self.daily_budget
def record_usage(self, tokens: int):
cost = (tokens / 1000) * self.price_per_1k_tokens
self.daily_usage += cost
def get_remaining_budget(self) -> float:
return max(0, self.daily_budget - self.daily_usage)
budget = BudgetController(daily_budget=10.0)
def budget_aware_chat(messages: list, estimated_tokens: int = 1000):
if not budget.check_budget(estimated_tokens):
raise Exception(f"超出预算限制,剩余预算: ${budget.get_remaining_budget():.4f}")
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
total_tokens = response.usage.prompt_tokens + response.usage.completion_tokens
budget.record_usage(total_tokens)
return response.choices[0].message.content模型降级
python
class ModelSelector:
def __init__(self):
self.models = [
{"name": "gpt-4o", "cost_tier": 3, "capability": "high"},
{"name": "gpt-4o-mini", "cost_tier": 2, "capability": "medium"},
{"name": "gpt-3.5-turbo", "cost_tier": 1, "capability": "low"}
]
def select_model(self, task_complexity: str, budget_remaining: float):
if task_complexity == "high" and budget_remaining > 5.0:
return "gpt-4o"
elif task_complexity in ["high", "medium"] and budget_remaining > 1.0:
return "gpt-4o-mini"
else:
return "gpt-3.5-turbo"
selector = ModelSelector()
def adaptive_chat(messages: list, task_complexity: str = "medium"):
model = selector.select_model(task_complexity, budget.get_remaining_budget())
print(f"选择模型: {model}")
response = client.chat.completions.create(
model=model,
messages=messages
)
return response.choices[0].message.content小结
本章介绍了API调用的最佳实践:
- 请求优化 - 合理设置参数、精简Prompt、批量处理
- 并发控制 - 限流器、信号量、队列处理
- 流式输出 - 同步/异步流式处理
- 缓存策略 - 内存缓存、Redis缓存、语义缓存
- 超时与重试 - 超时设置、重试机制、指数退避
- 监控与日志 - 请求日志、性能监控
- 成本控制 - 预算控制、模型降级
这些实践能够帮助你构建稳定、高效、经济的AI应用。下一章将详细介绍错误处理与重试策略。