Explains defensive implementation patterns to protect your API from abuse.
Rate limiting is a fundamental defense mechanism to protect your system from API abuse, DoS attacks, and scraping.
| Algorithm | Characteristics | Pros | Cons |
|---|---|---|---|
| Fixed Window | Counts within a fixed time window | Simple to implement, memory efficient | Bursts occur at window boundaries |
| Sliding Window Log | Records request timestamps | Precise control | High memory consumption |
| Sliding Window Counter | Weighted average of previous and current windows | Good balance | Somewhat complex |
| Token Bucket | Consumes tokens for access | Allows bursts while enforcing limits | Requires parameter tuning |
| Leaky Bucket | Processes requests at a constant rate | Stable output rate | Poor at handling bursts |
const rateLimit = require('express-rate-limit'); const RedisStore = require('rate-limit-redis'); // Global limit: applied to all APIs const globalLimiter = rateLimit({ windowMs: 15 * 60 * 1000, // 15 minutes max: 100, standardHeaders: true, // Returns RateLimit-* headers legacyHeaders: false, message: { error: 'Too many requests, please try again later.' }, }); // For authentication endpoints: stricter limits const authLimiter = rateLimit({ windowMs: 15 * 60 * 1000, max: 5, // Login attempts limited to 5 per 15 minutes skipSuccessfulRequests: true, // Successful requests are not counted }); // For distributed environments: Redis backend const distributedLimiter = rateLimit({ store: new RedisStore({ sendCommand: (...args) => redisClient.sendCommand(args) }), windowMs: 60 * 1000, max: 30, }); app.use(globalLimiter); app.use('/api/auth', authLimiter);
RateLimit-Limit: 100 RateLimit-Remaining: 42 RateLimit-Reset: 1672531200 Retry-After: 30
By applying rate limits using composite keys such as API key, user ID, or endpoint in addition to IP address, you can minimize the impact on legitimate users.
All API inputs may contain malicious data. Always validate on the server side.
Define a list of permitted values. Safer than a denylist (blocklist). Can handle new attack patterns.
Strictly check data types, maximum character counts, and numeric upper/lower bounds. A fundamental defense against buffer overflows.
Rejecting invalid input is safer than "fixing" it. Sanitization may produce unexpected transformations.
const Ajv = require('ajv'); const addFormats = require('ajv-formats'); const ajv = new Ajv({ allErrors: true, removeAdditional: true }); addFormats(ajv); // Schema for user creation API const createUserSchema = { type: 'object', required: ['name', 'email'], additionalProperties: false, properties: { name: { type: 'string', minLength: 1, maxLength: 100, pattern: '^[a-zA-Z0-9\\s\\-]+$', // Restrict to allowed characters }, email: { type: 'string', format: 'email', maxLength: 254, }, age: { type: 'integer', minimum: 0, maximum: 150, }, }, }; // Validation middleware function validateBody(schema) { const validate = ajv.compile(schema); return (req, res, next) => { if (!validate(req.body)) { return res.status(400).json({ error: 'Validation failed', details: validate.errors, }); } next(); }; } app.post('/api/users', validateBody(createUserSchema), createUser);
| Attack | Countermeasure | Example |
|---|---|---|
| SQL Injection | Parameterized queries, use an ORM | db.query('SELECT * FROM users WHERE id = ?', [id]) |
| NoSQL Injection | Type checking, sanitize $ operators | Ensure input is a string (reject Objects) |
| XSS (via API response) | Specify Content-Type, escape output | Content-Type: application/json |
| Path Traversal | Remove path separators from input | Normalize with path.basename() |
| XXE (XML External Entity) | Disable external entity resolution | Disable in XML parser settings |
CORS is a mechanism that controls the browser's Same-Origin Policy. Misconfiguration can lead to serious security risks.
Access-Control-Allow-Origin: * and Access-Control-Allow-Credentials: true cannot be used together. Restrict wildcard usage to public APIs only.
const cors = require('cors'); // Explicitly specify allowed origins const allowedOrigins = [ 'https://app.example.com', 'https://admin.example.com', ]; app.use(cors({ origin(origin, callback) { // Allow server-to-server communication (no origin) if (!origin || allowedOrigins.includes(origin)) { callback(null, true); } else { callback(new Error('Not allowed by CORS')); } }, methods: ['GET', 'POST', 'PUT', 'DELETE'], allowedHeaders: ['Content-Type', 'Authorization', 'X-API-Key'], credentials: true, maxAge: 86400, // Preflight cache: 24 hours }));
| Header | Purpose | Recommended Value |
|---|---|---|
Access-Control-Allow-Origin | Allowed origins | Explicit domain specification |
Access-Control-Allow-Methods | Allowed HTTP methods | Minimum required only |
Access-Control-Allow-Headers | Allowed request headers | Minimum required only |
Access-Control-Allow-Credentials | Allow cookie transmission | true (only when authentication is required) |
Access-Control-Max-Age | Preflight cache duration in seconds | 86400 (24 hours) |
Access-Control-Expose-Headers | Response headers readable by JavaScript | Only necessary headers such as RateLimit-* |
LLM APIs introduce new dimensions to rate limiting: token consumption, cost per request, and compute-intensive inference.
| Dimension | Traditional API | LLM API |
|---|---|---|
| Cost per request | Low, predictable | Variable, can be 100x+ (based on tokens) |
| Rate limit unit | Requests per time window | Tokens per minute (TPM) + Requests per minute (RPM) |
| Abuse pattern | Scraping, brute force, DDoS | Prompt injection, resource exhaustion, denial-of-wallet |
| Algorithm fit | Fixed/Sliding window | Token bucket (weighted by token count) |
import tiktoken import time from collections import defaultdict class TokenRateLimiter: """Rate limiter that counts tokens, not just requests.""" def __init__(self, tokens_per_minute=100_000, requests_per_minute=60): self.tpm_limit = tokens_per_minute self.rpm_limit = requests_per_minute self.usage = defaultdict(lambda: {"tokens": [], "requests": []}) self.encoder = tiktoken.encoding_for_model("gpt-4") def count_tokens(self, text: str) -> int: return len(self.encoder.encode(text)) def check_limit(self, user_id: str, prompt: str) -> dict: now = time.time() window = now - 60 # 1-minute sliding window user = self.usage[user_id] # Clean up expired entries user["tokens"] = [(t, c) for t, c in user["tokens"] if t > window] user["requests"] = [t for t in user["requests"] if t > window] # Check RPM if len(user["requests"]) >= self.rpm_limit: return {"allowed": False, "reason": "RPM limit exceeded"} # Check TPM token_count = self.count_tokens(prompt) used_tokens = sum(c for _, c in user["tokens"]) if used_tokens + token_count > self.tpm_limit: return {"allowed": False, "reason": "TPM limit exceeded"} # Record usage user["tokens"].append((now, token_count)) user["requests"].append(now) return {"allowed": True, "tokens_used": token_count}
Related: LLM10: Model Denial of Service, ASI04: Cascading Hallucination Attacks
Prompt injection is the #1 risk for LLM applications. Apply defense-in-depth with input validation, structural separation, and output verification.
Strip or escape special tokens, instruction-like patterns, and control characters from user input before including in prompts.
Use delimiters, XML tags, or separate message roles to clearly isolate system instructions from user-provided content.
Validate LLM responses against expected schemas. Check for data leakage, instruction following, and malicious content before rendering.
| Attack | Countermeasure | Example |
|---|---|---|
| Direct Prompt Injection | Input sanitization + instruction/data separation | "Ignore previous instructions and..." |
| Indirect Prompt Injection | Sanitize RAG results + canary tokens | Malicious instructions hidden in retrieved documents |
| Context Stuffing | Token limits + input truncation | Overloading context window to push out system instructions |
| Parameter Tampering | Schema validation + typed parameters | Manipulating temperature, max_tokens, or model parameters |
from pydantic import BaseModel, Field, field_validator import re class LLMRequest(BaseModel): """Validated LLM request with prompt injection defenses.""" user_message: str = Field(..., max_length=4000) max_tokens: int = Field(default=1000, ge=1, le=4096) temperature: float = Field(default=0.7, ge=0.0, le=2.0) @field_validator("user_message") @classmethod def sanitize_prompt(cls, v: str) -> str: # Block known injection patterns patterns = [ r"(?i)ignore\s+(previous|above|all)\s+(instructions?|prompts?)", r"(?i)you\s+are\s+now\s+", r"(?i)system\s*:\s*", r"(?i)\[INST\]|\[\/INST\]|<\|im_start\|>", ] for pattern in patterns: if re.search(pattern, v): raise ValueError("Input contains disallowed patterns") return v # Usage with FastAPI @app.post("/api/chat") async def chat(request: LLMRequest): # Separate system instructions from user input messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": f"<user_input>{request.user_message}</user_input>"}, ] return await call_llm(messages, request.max_tokens, request.temperature)
Related: LLM01: Prompt Injection, LLM02: Insecure Output Handling, ASI02: Prompt Injection via Tool Results
# Disable automatic content type detection X-Content-Type-Options: nosniff # Prevent iframe embedding X-Frame-Options: DENY # Enforce HTTPS Strict-Transport-Security: max-age=31536000; includeSubDomains # CSP: Since APIs only return JSON, block all script execution Content-Security-Policy: default-src 'none'; frame-ancestors 'none' # Restrict Referrer information Referrer-Policy: no-referrer # Restrict browser features Permissions-Policy: geolocation=(), camera=(), microphone=()