速度限制

速率限制是保护系统免受 API 滥用、DoS 攻击和刮擦的基本防御机制。

算法比较

算法特点优点缺点
Fixed Window固定时间窗口内的计数实施简单,内存效率高爆破发生在窗口边界
Sliding Window Log记录申请时间戳精确控制内存消耗大
Sliding Window Counter上一个窗口和当前窗口的加权平均数良好的平衡有点复杂
Token Bucket访问时消耗令牌允许突发,同时执行限制需要调整参数
Leaky Bucket以恒定速度处理请求稳定的输出率处理突发事件能力差

在 Express.js 中实现速率限制

JavaScript (Express)多层速率限制
const rateLimit = require('express-rate-limit');
const RedisStore = require('rate-limit-redis');

// 全局限制:适用于所有应用程序接口
const globalLimiter = rateLimit({
  windowMs: 15 * 60 * 1000, // 15 分钟
  max: 100,
  standardHeaders: true,    // 返回 RateLimit-* 标头
  legacyHeaders: false,
  message: { error: 'Too many requests, please try again later.' },
});

// 对于身份验证端点:更严格的限制
const authLimiter = rateLimit({
  windowMs: 15 * 60 * 1000,
  max: 5,  // 登录次数限制为每 15 分钟 5 次
  skipSuccessfulRequests: true, // 不计算成功的请求
});

// 用于分布式环境:Redis 后端
const distributedLimiter = rateLimit({
  store: new RedisStore({ sendCommand: (...args) => redisClient.sendCommand(args) }),
  windowMs: 60 * 1000,
  max: 30,
});

app.use(globalLimiter);
app.use('/api/auth', authLimiter);

响应标头

HTTP Response HeadersRFC 6585 / draft-ietf-httpapi-ratelimit-headers
RateLimit-Limit: 100
RateLimit-Remaining: 42
RateLimit-Reset: 1672531200
Retry-After: 30
速率限制键

除 IP 地址外,通过使用 API 密钥、用户 ID 或端点等复合密钥应用速率限制,可以最大限度地减少对合法用户的影响。

🔍 输入验证

所有 API 输入都可能包含恶意数据。请始终在服务器端进行验证。

验证原则

允许列表法

定义允许值列表。比拒绝列表(屏蔽列表)更安全。可处理新的攻击模式。

类型、长度和范围

严格检查数据类型、最大字符数和数字上下限。从根本上防止缓冲区溢出。

消毒与剔除

拒绝无效输入比 "修复 "输入更安全。净化可能会产生意想不到的转换。

使用 JSON 模式进行验证

JavaScript (Express + Ajv)模式验证
const Ajv = require('ajv');
const addFormats = require('ajv-formats');

const ajv = new Ajv({ allErrors: true, removeAdditional: true });
addFormats(ajv);

// 用户创建应用程序接口的模式
const createUserSchema = {
  type: 'object',
  required: ['name', 'email'],
  additionalProperties: false,
  properties: {
    name: {
      type: 'string',
      minLength: 1,
      maxLength: 100,
      pattern: '^[a-zA-Z0-9\\s\\-]+$', // 限制在允许的字符范围内
    },
    email: {
      type: 'string',
      format: 'email',
      maxLength: 254,
    },
    age: {
      type: 'integer',
      minimum: 0,
      maximum: 150,
    },
  },
};

// 验证中间件
function validateBody(schema) {
  const validate = ajv.compile(schema);
  return (req, res, next) => {
    if (!validate(req.body)) {
      return res.status(400).json({
        error: 'Validation failed',
        details: validate.errors,
      });
    }
    next();
  };
}

app.post('/api/users', validateBody(createUserSchema), createUser);

常见攻击和验证对策

攻击对策示例
SQL 注入参数化查询,使用 ORMdb.query('SELECT * FROM users WHERE id = ?', [id])
NoSQL 注入类型检查,清除 $ 操作符确保输入为字符串(拒绝对象)
XSS(通过应用程序接口响应)指定内容类型,转义输出Content-Type: application/json
路径遍历删除输入中的路径分隔符使用 path.basename() 进行规范化处理
XXE(XML 外部实体)禁用外部实体解析在 XML 解析器设置中禁用

🌐 CORS(跨源资源共享)

CORS 是一种控制浏览器同源策略的机制。错误配置会导致严重的安全风险。

危险配置

Access-Control-Allow-Origin:*Access-Control-Allow-Credentials: true 不能同时使用。仅限公共 API 使用通配符。

安全 CORS 配置

JavaScript (Express)CORS 配置
const cors = require('cors');

// 明确指定允许的来源
const allowedOrigins = [
  'https://app.example.com',
  'https://admin.example.com',
];

app.use(cors({
  origin(origin, callback) {
    // 允许服务器到服务器通信(无原点)
    if (!origin || allowedOrigins.includes(origin)) {
      callback(null, true);
    } else {
      callback(new Error('Not allowed by CORS'));
    }
  },
  methods: ['GET', 'POST', 'PUT', 'DELETE'],
  allowedHeaders: ['Content-Type', 'Authorization', 'X-API-Key'],
  credentials: true,
  maxAge: 86400,  // 飞行前缓存:24 小时
}));

CORS 核对表

CORS 标头参考

页眉目的建议值
Access-Control-Allow-Origin允许的来源明确的领域规范
Access-Control-Allow-Methods允许的 HTTP 方法最低要求
Access-Control-Allow-Headers允许的请求标头最低要求
Access-Control-Allow-Credentials允许传输 cookietrue(仅在需要验证时)
Access-Control-Max-Age预飞缓存持续时间(秒86400(24 小时)
Access-Control-Expose-HeadersJavaScript 可读取的响应标头只有必要的标头,如 RateLimit-*

🤖 AI / LLM API Rate Limiting

LLM APIs introduce new dimensions to rate limiting: token consumption, cost per request, and compute-intensive inference.

Traditional API vs. LLM API Rate Limiting

DimensionTraditional APILLM API
Cost per requestLow, predictableVariable, can be 100x+ (based on tokens)
Rate limit unitRequests per time windowTokens per minute (TPM) + Requests per minute (RPM)
Abuse patternScraping, brute force, DDoSPrompt injection, resource exhaustion, denial-of-wallet
Algorithm fitFixed/Sliding windowToken bucket (weighted by token count)

Token-Aware Rate Limiting (Python)

PythonToken-Based Rate Limiter
import tiktoken
import time
from collections import defaultdict

class TokenRateLimiter:
    """Rate limiter that counts tokens, not just requests."""

    def __init__(self, tokens_per_minute=100_000, requests_per_minute=60):
        self.tpm_limit = tokens_per_minute
        self.rpm_limit = requests_per_minute
        self.usage = defaultdict(lambda: {"tokens": [], "requests": []})
        self.encoder = tiktoken.encoding_for_model("gpt-4")

    def count_tokens(self, text: str) -> int:
        return len(self.encoder.encode(text))

    def check_limit(self, user_id: str, prompt: str) -> dict:
        now = time.time()
        window = now - 60  # 1-minute sliding window
        user = self.usage[user_id]

        # Clean up expired entries
        user["tokens"] = [(t, c) for t, c in user["tokens"] if t > window]
        user["requests"] = [t for t in user["requests"] if t > window]

        # Check RPM
        if len(user["requests"]) >= self.rpm_limit:
            return {"allowed": False, "reason": "RPM limit exceeded"}

        # Check TPM
        token_count = self.count_tokens(prompt)
        used_tokens = sum(c for _, c in user["tokens"])
        if used_tokens + token_count > self.tpm_limit:
            return {"allowed": False, "reason": "TPM limit exceeded"}

        # Record usage
        user["tokens"].append((now, token_count))
        user["requests"].append(now)
        return {"allowed": True, "tokens_used": token_count}
OWASP References

Related: LLM10: Model Denial of Service, ASI04: Cascading Hallucination Attacks

🛡 Prompt Injection & AI Input Validation

Prompt injection is the #1 risk for LLM applications. Apply defense-in-depth with input validation, structural separation, and output verification.

Prompt Sanitization

Strip or escape special tokens, instruction-like patterns, and control characters from user input before including in prompts.

Structural Input Separation

Use delimiters, XML tags, or separate message roles to clearly isolate system instructions from user-provided content.

Output Verification

Validate LLM responses against expected schemas. Check for data leakage, instruction following, and malicious content before rendering.

Prompt Injection Attack Vectors & Mitigations

攻击对策示例
Direct Prompt InjectionInput sanitization + instruction/data separation"Ignore previous instructions and..."
Indirect Prompt InjectionSanitize RAG results + canary tokensMalicious instructions hidden in retrieved documents
Context StuffingToken limits + input truncationOverloading context window to push out system instructions
Parameter TamperingSchema validation + typed parametersManipulating temperature, max_tokens, or model parameters

Structured Input Validation (Python / Pydantic)

PythonPydantic Validation for LLM Requests
from pydantic import BaseModel, Field, field_validator
import re

class LLMRequest(BaseModel):
    """Validated LLM request with prompt injection defenses."""
    user_message: str = Field(..., max_length=4000)
    max_tokens: int = Field(default=1000, ge=1, le=4096)
    temperature: float = Field(default=0.7, ge=0.0, le=2.0)

    @field_validator("user_message")
    @classmethod
    def sanitize_prompt(cls, v: str) -> str:
        # Block known injection patterns
        patterns = [
            r"(?i)ignore\s+(previous|above|all)\s+(instructions?|prompts?)",
            r"(?i)you\s+are\s+now\s+",
            r"(?i)system\s*:\s*",
            r"(?i)\[INST\]|\[\/INST\]|<\|im_start\|>",
        ]
        for pattern in patterns:
            if re.search(pattern, v):
                raise ValueError("Input contains disallowed patterns")
        return v

# Usage with FastAPI
@app.post("/api/chat")
async def chat(request: LLMRequest):
    # Separate system instructions from user input
    messages = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": f"<user_input>{request.user_message}</user_input>"},
    ]
    return await call_llm(messages, request.max_tokens, request.temperature)
OWASP References

Related: LLM01: Prompt Injection, LLM02: Insecure Output Handling, ASI02: Prompt Injection via Tool Results

🔒 其他推荐的安全标头

HTTP Response Headers建议设置
# 禁用自动内容类型检测
X-Content-Type-Options: nosniff

# 防止 iframe 嵌入
X-Frame-Options: DENY

# 执行 HTTPS
Strict-Transport-Security: max-age=31536000; includeSubDomains

# CSP:由于应用程序接口只返回 JSON,因此阻止所有脚本的执行
Content-Security-Policy: default-src 'none'; frame-ancestors 'none'

# 限制推荐人信息
Referrer-Policy: no-referrer

# 限制浏览器功能
Permissions-Policy: geolocation=(), camera=(), microphone=()