解释防御性实现模式,以保护您的应用程序接口不被滥用。
速率限制是保护系统免受 API 滥用、DoS 攻击和刮擦的基本防御机制。
| 算法 | 特点 | 优点 | 缺点 |
|---|---|---|---|
| Fixed Window | 固定时间窗口内的计数 | 实施简单,内存效率高 | 爆破发生在窗口边界 |
| Sliding Window Log | 记录申请时间戳 | 精确控制 | 内存消耗大 |
| Sliding Window Counter | 上一个窗口和当前窗口的加权平均数 | 良好的平衡 | 有点复杂 |
| Token Bucket | 访问时消耗令牌 | 允许突发,同时执行限制 | 需要调整参数 |
| Leaky Bucket | 以恒定速度处理请求 | 稳定的输出率 | 处理突发事件能力差 |
const rateLimit = require('express-rate-limit'); const RedisStore = require('rate-limit-redis'); // 全局限制:适用于所有应用程序接口 const globalLimiter = rateLimit({ windowMs: 15 * 60 * 1000, // 15 分钟 max: 100, standardHeaders: true, // 返回 RateLimit-* 标头 legacyHeaders: false, message: { error: 'Too many requests, please try again later.' }, }); // 对于身份验证端点:更严格的限制 const authLimiter = rateLimit({ windowMs: 15 * 60 * 1000, max: 5, // 登录次数限制为每 15 分钟 5 次 skipSuccessfulRequests: true, // 不计算成功的请求 }); // 用于分布式环境:Redis 后端 const distributedLimiter = rateLimit({ store: new RedisStore({ sendCommand: (...args) => redisClient.sendCommand(args) }), windowMs: 60 * 1000, max: 30, }); app.use(globalLimiter); app.use('/api/auth', authLimiter);
RateLimit-Limit: 100 RateLimit-Remaining: 42 RateLimit-Reset: 1672531200 Retry-After: 30
除 IP 地址外,通过使用 API 密钥、用户 ID 或端点等复合密钥应用速率限制,可以最大限度地减少对合法用户的影响。
所有 API 输入都可能包含恶意数据。请始终在服务器端进行验证。
定义允许值列表。比拒绝列表(屏蔽列表)更安全。可处理新的攻击模式。
严格检查数据类型、最大字符数和数字上下限。从根本上防止缓冲区溢出。
拒绝无效输入比 "修复 "输入更安全。净化可能会产生意想不到的转换。
const Ajv = require('ajv'); const addFormats = require('ajv-formats'); const ajv = new Ajv({ allErrors: true, removeAdditional: true }); addFormats(ajv); // 用户创建应用程序接口的模式 const createUserSchema = { type: 'object', required: ['name', 'email'], additionalProperties: false, properties: { name: { type: 'string', minLength: 1, maxLength: 100, pattern: '^[a-zA-Z0-9\\s\\-]+$', // 限制在允许的字符范围内 }, email: { type: 'string', format: 'email', maxLength: 254, }, age: { type: 'integer', minimum: 0, maximum: 150, }, }, }; // 验证中间件 function validateBody(schema) { const validate = ajv.compile(schema); return (req, res, next) => { if (!validate(req.body)) { return res.status(400).json({ error: 'Validation failed', details: validate.errors, }); } next(); }; } app.post('/api/users', validateBody(createUserSchema), createUser);
| 攻击 | 对策 | 示例 |
|---|---|---|
| SQL 注入 | 参数化查询,使用 ORM | db.query('SELECT * FROM users WHERE id = ?', [id]) |
| NoSQL 注入 | 类型检查,清除 $ 操作符 | 确保输入为字符串(拒绝对象) |
| XSS(通过应用程序接口响应) | 指定内容类型,转义输出 | Content-Type: application/json |
| 路径遍历 | 删除输入中的路径分隔符 | 使用 path.basename() 进行规范化处理 |
| XXE(XML 外部实体) | 禁用外部实体解析 | 在 XML 解析器设置中禁用 |
CORS 是一种控制浏览器同源策略的机制。错误配置会导致严重的安全风险。
Access-Control-Allow-Origin:* 和 Access-Control-Allow-Credentials: true 不能同时使用。仅限公共 API 使用通配符。
const cors = require('cors'); // 明确指定允许的来源 const allowedOrigins = [ 'https://app.example.com', 'https://admin.example.com', ]; app.use(cors({ origin(origin, callback) { // 允许服务器到服务器通信(无原点) if (!origin || allowedOrigins.includes(origin)) { callback(null, true); } else { callback(new Error('Not allowed by CORS')); } }, methods: ['GET', 'POST', 'PUT', 'DELETE'], allowedHeaders: ['Content-Type', 'Authorization', 'X-API-Key'], credentials: true, maxAge: 86400, // 飞行前缓存:24 小时 }));
| 页眉 | 目的 | 建议值 |
|---|---|---|
Access-Control-Allow-Origin | 允许的来源 | 明确的领域规范 |
Access-Control-Allow-Methods | 允许的 HTTP 方法 | 最低要求 |
Access-Control-Allow-Headers | 允许的请求标头 | 最低要求 |
Access-Control-Allow-Credentials | 允许传输 cookie | true(仅在需要验证时) |
Access-Control-Max-Age | 预飞缓存持续时间(秒 | 86400(24 小时) |
Access-Control-Expose-Headers | JavaScript 可读取的响应标头 | 只有必要的标头,如 RateLimit-* |
LLM APIs introduce new dimensions to rate limiting: token consumption, cost per request, and compute-intensive inference.
| Dimension | Traditional API | LLM API |
|---|---|---|
| Cost per request | Low, predictable | Variable, can be 100x+ (based on tokens) |
| Rate limit unit | Requests per time window | Tokens per minute (TPM) + Requests per minute (RPM) |
| Abuse pattern | Scraping, brute force, DDoS | Prompt injection, resource exhaustion, denial-of-wallet |
| Algorithm fit | Fixed/Sliding window | Token bucket (weighted by token count) |
import tiktoken import time from collections import defaultdict class TokenRateLimiter: """Rate limiter that counts tokens, not just requests.""" def __init__(self, tokens_per_minute=100_000, requests_per_minute=60): self.tpm_limit = tokens_per_minute self.rpm_limit = requests_per_minute self.usage = defaultdict(lambda: {"tokens": [], "requests": []}) self.encoder = tiktoken.encoding_for_model("gpt-4") def count_tokens(self, text: str) -> int: return len(self.encoder.encode(text)) def check_limit(self, user_id: str, prompt: str) -> dict: now = time.time() window = now - 60 # 1-minute sliding window user = self.usage[user_id] # Clean up expired entries user["tokens"] = [(t, c) for t, c in user["tokens"] if t > window] user["requests"] = [t for t in user["requests"] if t > window] # Check RPM if len(user["requests"]) >= self.rpm_limit: return {"allowed": False, "reason": "RPM limit exceeded"} # Check TPM token_count = self.count_tokens(prompt) used_tokens = sum(c for _, c in user["tokens"]) if used_tokens + token_count > self.tpm_limit: return {"allowed": False, "reason": "TPM limit exceeded"} # Record usage user["tokens"].append((now, token_count)) user["requests"].append(now) return {"allowed": True, "tokens_used": token_count}
Related: LLM10: Model Denial of Service, ASI04: Cascading Hallucination Attacks
Prompt injection is the #1 risk for LLM applications. Apply defense-in-depth with input validation, structural separation, and output verification.
Strip or escape special tokens, instruction-like patterns, and control characters from user input before including in prompts.
Use delimiters, XML tags, or separate message roles to clearly isolate system instructions from user-provided content.
Validate LLM responses against expected schemas. Check for data leakage, instruction following, and malicious content before rendering.
| 攻击 | 对策 | 示例 |
|---|---|---|
| Direct Prompt Injection | Input sanitization + instruction/data separation | "Ignore previous instructions and..." |
| Indirect Prompt Injection | Sanitize RAG results + canary tokens | Malicious instructions hidden in retrieved documents |
| Context Stuffing | Token limits + input truncation | Overloading context window to push out system instructions |
| Parameter Tampering | Schema validation + typed parameters | Manipulating temperature, max_tokens, or model parameters |
from pydantic import BaseModel, Field, field_validator import re class LLMRequest(BaseModel): """Validated LLM request with prompt injection defenses.""" user_message: str = Field(..., max_length=4000) max_tokens: int = Field(default=1000, ge=1, le=4096) temperature: float = Field(default=0.7, ge=0.0, le=2.0) @field_validator("user_message") @classmethod def sanitize_prompt(cls, v: str) -> str: # Block known injection patterns patterns = [ r"(?i)ignore\s+(previous|above|all)\s+(instructions?|prompts?)", r"(?i)you\s+are\s+now\s+", r"(?i)system\s*:\s*", r"(?i)\[INST\]|\[\/INST\]|<\|im_start\|>", ] for pattern in patterns: if re.search(pattern, v): raise ValueError("Input contains disallowed patterns") return v # Usage with FastAPI @app.post("/api/chat") async def chat(request: LLMRequest): # Separate system instructions from user input messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": f"<user_input>{request.user_message}</user_input>"}, ] return await call_llm(messages, request.max_tokens, request.temperature)
Related: LLM01: Prompt Injection, LLM02: Insecure Output Handling, ASI02: Prompt Injection via Tool Results
# 禁用自动内容类型检测 X-Content-Type-Options: nosniff # 防止 iframe 嵌入 X-Frame-Options: DENY # 执行 HTTPS Strict-Transport-Security: max-age=31536000; includeSubDomains # CSP:由于应用程序接口只返回 JSON,因此阻止所有脚本的执行 Content-Security-Policy: default-src 'none'; frame-ancestors 'none' # 限制推荐人信息 Referrer-Policy: no-referrer # 限制浏览器功能 Permissions-Policy: geolocation=(), camera=(), microphone=()