Skip to content

API调用最佳实践

概述

在实际应用中,如何高效、稳定、经济地调用大模型API是一个重要课题。本章将介绍API调用的最佳实践,包括请求优化、并发控制、流式输出、缓存策略等,帮助你构建生产级的AI应用。

请求优化

合理设置参数

python
from openai import OpenAI

client = OpenAI()

def optimized_request(prompt: str, task_type: str = "general"):
    configs = {
        "creative": {
            "temperature": 0.9,
            "top_p": 0.95,
            "presence_penalty": 0.6,
            "frequency_penalty": 0.5
        },
        "analytical": {
            "temperature": 0.3,
            "top_p": 0.9,
            "presence_penalty": 0.1,
            "frequency_penalty": 0.1
        },
        "code": {
            "temperature": 0.2,
            "top_p": 0.9,
            "presence_penalty": 0.0,
            "frequency_penalty": 0.0
        },
        "general": {
            "temperature": 0.7,
            "top_p": 0.9
        }
    }
    
    config = configs.get(task_type, configs["general"])
    
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=1000,
        **config
    )
    
    return response.choices[0].message.content

creative_text = optimized_request("写一首诗", "creative")
code_result = optimized_request("实现快速排序", "code")

精简Prompt

python
def trim_prompt(prompt: str, max_length: int = 4000) -> str:
    if len(prompt) <= max_length:
        return prompt
    
    lines = prompt.split('\n')
    result = []
    current_length = 0
    
    for line in lines:
        if current_length + len(line) + 1 <= max_length:
            result.append(line)
            current_length += len(line) + 1
        else:
            break
    
    return '\n'.join(result) + '\n...[内容已截断]'

long_prompt = "..." * 5000
trimmed = trim_prompt(long_prompt)

批量处理

python
import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

async def process_single(prompt: str):
    response = await async_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

async def process_batch(prompts: list, batch_size: int = 5):
    results = []
    
    for i in range(0, len(prompts), batch_size):
        batch = prompts[i:i + batch_size]
        tasks = [process_single(prompt) for prompt in batch]
        batch_results = await asyncio.gather(*tasks)
        results.extend(batch_results)
        
        await asyncio.sleep(1)
    
    return results

prompts = [
    "翻译:Hello",
    "翻译:World",
    "翻译:AI"
]

results = asyncio.run(process_batch(prompts))

并发控制

限流器

python
import asyncio
import time
from collections import deque

class RateLimiter:
    def __init__(self, max_requests: int, time_window: float):
        self.max_requests = max_requests
        self.time_window = time_window
        self.requests = deque()
        self.lock = asyncio.Lock()
    
    async def acquire(self):
        async with self.lock:
            now = time.time()
            
            while self.requests and self.requests[0] <= now - self.time_window:
                self.requests.popleft()
            
            if len(self.requests) >= self.max_requests:
                sleep_time = self.requests[0] - (now - self.time_window)
                await asyncio.sleep(sleep_time)
            
            self.requests.append(now)

limiter = RateLimiter(max_requests=10, time_window=60.0)

async def rate_limited_request(prompt: str):
    await limiter.acquire()
    
    response = await async_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}]
    )
    
    return response.choices[0].message.content

信号量控制

python
class ConcurrencyController:
    def __init__(self, max_concurrent: int = 5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def execute(self, func, *args, **kwargs):
        async with self.semaphore:
            return await func(*args, **kwargs)

controller = ConcurrencyController(max_concurrent=3)

async def controlled_batch_request(prompts: list):
    tasks = [
        controller.execute(process_single, prompt)
        for prompt in prompts
    ]
    return await asyncio.gather(*tasks)

队列处理

python
import asyncio
from typing import Callable, Any

class RequestQueue:
    def __init__(self, processor: Callable, max_workers: int = 3):
        self.queue = asyncio.Queue()
        self.processor = processor
        self.max_workers = max_workers
        self.workers = []
    
    async def add_request(self, data: Any):
        await self.queue.put(data)
    
    async def worker(self, worker_id: int):
        while True:
            try:
                data = await asyncio.wait_for(
                    self.queue.get(),
                    timeout=1.0
                )
                
                result = await self.processor(data)
                print(f"Worker {worker_id} processed: {result[:50]}...")
                
                self.queue.task_done()
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                print(f"Worker {worker_id} error: {e}")
    
    async def start(self):
        self.workers = [
            asyncio.create_task(self.worker(i))
            for i in range(self.max_workers)
        ]
    
    async def stop(self):
        await self.queue.join()
        for worker in self.workers:
            worker.cancel()

async def process_item(item: dict):
    response = await async_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": item["prompt"]}]
    )
    return response.choices[0].message.content

queue = RequestQueue(process_item, max_workers=3)

async def main():
    await queue.start()
    
    for i in range(10):
        await queue.add_request({"prompt": f"问题{i}: 什么是AI?"})
    
    await asyncio.sleep(10)
    await queue.stop()

asyncio.run(main())

流式输出

基础流式处理

python
def stream_response(prompt: str):
    stream = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    full_response = ""
    for chunk in stream:
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            full_response += content
            print(content, end="", flush=True)
    
    print()
    return full_response

stream_response("写一个Python教程")

异步流式处理

python
async def async_stream_response(prompt: str):
    stream = await async_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    full_response = ""
    async for chunk in stream:
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            full_response += content
            yield content
    
    return full_response

async def process_stream():
    async for content in async_stream_response("讲一个故事"):
        print(content, end="", flush=True)

asyncio.run(process_stream())

流式输出到文件

python
async def stream_to_file(prompt: str, output_file: str):
    stream = await async_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    with open(output_file, 'w', encoding='utf-8') as f:
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                f.write(content)
                f.flush()

asyncio.run(stream_to_file(
    "写一篇关于AI的文章",
    "output.txt"
))

缓存策略

简单内存缓存

python
import hashlib
from functools import wraps

class SimpleCache:
    def __init__(self):
        self.cache = {}
    
    def _hash_key(self, messages: list) -> str:
        content = str(messages)
        return hashlib.md5(content.encode()).hexdigest()
    
    def get(self, messages: list):
        key = self._hash_key(messages)
        return self.cache.get(key)
    
    def set(self, messages: list, response: str):
        key = self._hash_key(messages)
        self.cache[key] = response
    
    def clear(self):
        self.cache.clear()

cache = SimpleCache()

def cached_chat(messages: list, use_cache: bool = True):
    if use_cache:
        cached_response = cache.get(messages)
        if cached_response:
            print("使用缓存响应")
            return cached_response
    
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages
    )
    
    result = response.choices[0].message.content
    cache.set(messages, result)
    
    return result

Redis缓存

python
import redis
import json
import hashlib

class RedisCache:
    def __init__(self, host='localhost', port=6379, db=0):
        self.client = redis.Redis(host=host, port=port, db=db)
        self.ttl = 3600
    
    def _hash_key(self, messages: list) -> str:
        content = json.dumps(messages, ensure_ascii=False)
        return f"llm_cache:{hashlib.md5(content.encode()).hexdigest()}"
    
    def get(self, messages: list):
        key = self._hash_key(messages)
        cached = self.client.get(key)
        
        if cached:
            return json.loads(cached)
        return None
    
    def set(self, messages: list, response: str):
        key = self._hash_key(messages)
        self.client.setex(key, self.ttl, json.dumps(response, ensure_ascii=False))
    
    def delete(self, messages: list):
        key = self._hash_key(messages)
        self.client.delete(key)

redis_cache = RedisCache()

def redis_cached_chat(messages: list):
    cached = redis_cache.get(messages)
    if cached:
        return cached
    
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages
    )
    
    result = response.choices[0].message.content
    redis_cache.set(messages, result)
    
    return result

语义缓存

python
import numpy as np
from typing import List, Tuple

class SemanticCache:
    def __init__(self, similarity_threshold: float = 0.95):
        self.cache: List[Tuple[np.ndarray, str]] = []
        self.similarity_threshold = similarity_threshold
    
    def cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
        return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
    
    def get_embedding(self, text: str) -> np.ndarray:
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return np.array(response.data[0].embedding)
    
    def get(self, query: str) -> str:
        query_embedding = self.get_embedding(query)
        
        for cached_embedding, cached_response in self.cache:
            similarity = self.cosine_similarity(query_embedding, cached_embedding)
            
            if similarity >= self.similarity_threshold:
                print(f"找到相似缓存 (相似度: {similarity:.4f})")
                return cached_response
        
        return None
    
    def set(self, query: str, response: str):
        embedding = self.get_embedding(query)
        self.cache.append((embedding, response))

semantic_cache = SemanticCache()

def semantically_cached_chat(query: str):
    cached = semantic_cache.get(query)
    if cached:
        return cached
    
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": query}]
    )
    
    result = response.choices[0].message.content
    semantic_cache.set(query, result)
    
    return result

超时与重试

超时设置

python
import httpx

client_with_timeout = OpenAI(
    timeout=httpx.Timeout(60.0, connect=5.0)
)

try:
    response = client_with_timeout.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "你好"}]
    )
except Exception as e:
    print(f"请求超时: {e}")

重试机制

python
import time
from functools import wraps

def retry(max_retries: int = 3, delay: float = 1.0, backoff: float = 2.0):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            current_delay = delay
            
            while retries < max_retries:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    retries += 1
                    
                    if retries >= max_retries:
                        raise e
                    
                    print(f"第{retries}次重试,等待{current_delay}秒...")
                    time.sleep(current_delay)
                    current_delay *= backoff
            
            return None
        return wrapper
    return decorator

@retry(max_retries=3, delay=1.0, backoff=2.0)
def robust_chat(messages: list):
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages
    )
    return response.choices[0].message.content

指数退避

python
import random

def exponential_backoff(
    func,
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0
):
    def wrapper(*args, **kwargs):
        for attempt in range(max_retries):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                if attempt == max_retries - 1:
                    raise e
                
                delay = min(base_delay * (2 ** attempt), max_delay)
                jitter = random.uniform(0, delay * 0.1)
                total_delay = delay + jitter
                
                print(f"重试 {attempt + 1}/{max_retries}, 等待 {total_delay:.2f}秒")
                time.sleep(total_delay)
    
    return wrapper

监控与日志

请求日志

python
import logging
from datetime import datetime

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def logged_chat(messages: list, **kwargs):
    start_time = time.time()
    
    logger.info(f"请求开始 - 消息数: {len(messages)}")
    
    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            **kwargs
        )
        
        elapsed = time.time() - start_time
        usage = response.usage
        
        logger.info(
            f"请求成功 - "
            f"耗时: {elapsed:.2f}s, "
            f"输入Token: {usage.prompt_tokens}, "
            f"输出Token: {usage.completion_tokens}"
        )
        
        return response.choices[0].message.content
    
    except Exception as e:
        elapsed = time.time() - start_time
        logger.error(f"请求失败 - 耗时: {elapsed:.2f}s, 错误: {e}")
        raise

性能监控

python
from dataclasses import dataclass
from typing import List
import statistics

@dataclass
class RequestMetric:
    timestamp: float
    duration: float
    input_tokens: int
    output_tokens: int
    success: bool

class PerformanceMonitor:
    def __init__(self):
        self.metrics: List[RequestMetric] = []
    
    def record(self, metric: RequestMetric):
        self.metrics.append(metric)
    
    def get_stats(self, window_seconds: int = 3600):
        now = time.time()
        recent_metrics = [
            m for m in self.metrics
            if now - m.timestamp <= window_seconds
        ]
        
        if not recent_metrics:
            return None
        
        durations = [m.duration for m in recent_metrics]
        success_rate = sum(1 for m in recent_metrics if m.success) / len(recent_metrics)
        
        return {
            "total_requests": len(recent_metrics),
            "success_rate": success_rate,
            "avg_duration": statistics.mean(durations),
            "median_duration": statistics.median(durations),
            "p95_duration": statistics.quantiles(durations, n=20)[18] if len(durations) >= 20 else max(durations),
            "total_input_tokens": sum(m.input_tokens for m in recent_metrics),
            "total_output_tokens": sum(m.output_tokens for m in recent_metrics)
        }

monitor = PerformanceMonitor()

def monitored_chat(messages: list):
    start_time = time.time()
    
    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages
        )
        
        metric = RequestMetric(
            timestamp=start_time,
            duration=time.time() - start_time,
            input_tokens=response.usage.prompt_tokens,
            output_tokens=response.usage.completion_tokens,
            success=True
        )
        monitor.record(metric)
        
        return response.choices[0].message.content
    
    except Exception as e:
        metric = RequestMetric(
            timestamp=start_time,
            duration=time.time() - start_time,
            input_tokens=0,
            output_tokens=0,
            success=False
        )
        monitor.record(metric)
        raise

成本控制

Token预算控制

python
class BudgetController:
    def __init__(self, daily_budget: float, price_per_1k_tokens: float = 0.00015):
        self.daily_budget = daily_budget
        self.price_per_1k_tokens = price_per_1k_tokens
        self.daily_usage = 0.0
        self.last_reset = time.time()
    
    def check_budget(self, estimated_tokens: int) -> bool:
        if time.time() - self.last_reset > 86400:
            self.daily_usage = 0.0
            self.last_reset = time.time()
        
        estimated_cost = (estimated_tokens / 1000) * self.price_per_1k_tokens
        
        return (self.daily_usage + estimated_cost) <= self.daily_budget
    
    def record_usage(self, tokens: int):
        cost = (tokens / 1000) * self.price_per_1k_tokens
        self.daily_usage += cost
    
    def get_remaining_budget(self) -> float:
        return max(0, self.daily_budget - self.daily_usage)

budget = BudgetController(daily_budget=10.0)

def budget_aware_chat(messages: list, estimated_tokens: int = 1000):
    if not budget.check_budget(estimated_tokens):
        raise Exception(f"超出预算限制,剩余预算: ${budget.get_remaining_budget():.4f}")
    
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages
    )
    
    total_tokens = response.usage.prompt_tokens + response.usage.completion_tokens
    budget.record_usage(total_tokens)
    
    return response.choices[0].message.content

模型降级

python
class ModelSelector:
    def __init__(self):
        self.models = [
            {"name": "gpt-4o", "cost_tier": 3, "capability": "high"},
            {"name": "gpt-4o-mini", "cost_tier": 2, "capability": "medium"},
            {"name": "gpt-3.5-turbo", "cost_tier": 1, "capability": "low"}
        ]
    
    def select_model(self, task_complexity: str, budget_remaining: float):
        if task_complexity == "high" and budget_remaining > 5.0:
            return "gpt-4o"
        elif task_complexity in ["high", "medium"] and budget_remaining > 1.0:
            return "gpt-4o-mini"
        else:
            return "gpt-3.5-turbo"

selector = ModelSelector()

def adaptive_chat(messages: list, task_complexity: str = "medium"):
    model = selector.select_model(task_complexity, budget.get_remaining_budget())
    
    print(f"选择模型: {model}")
    
    response = client.chat.completions.create(
        model=model,
        messages=messages
    )
    
    return response.choices[0].message.content

小结

本章介绍了API调用的最佳实践:

  1. 请求优化 - 合理设置参数、精简Prompt、批量处理
  2. 并发控制 - 限流器、信号量、队列处理
  3. 流式输出 - 同步/异步流式处理
  4. 缓存策略 - 内存缓存、Redis缓存、语义缓存
  5. 超时与重试 - 超时设置、重试机制、指数退避
  6. 监控与日志 - 请求日志、性能监控
  7. 成本控制 - 预算控制、模型降级

这些实践能够帮助你构建稳定、高效、经济的AI应用。下一章将详细介绍错误处理与重试策略。

参考资源