APIを悪用から守るための防御的な実装パターンを解説します。
レート制限は、APIの乱用・DoS攻撃・スクレイピングからシステムを保護するための基本的な防御策です。
| アルゴリズム | 特徴 | メリット | デメリット |
|---|---|---|---|
| Fixed Window | 固定時間枠でカウント | 実装が簡単、メモリ効率が良い | ウィンドウ境界でバースト発生 |
| Sliding Window Log | リクエスト時刻を記録 | 正確な制御 | メモリ消費大 |
| Sliding Window Counter | 前後ウィンドウの加重平均 | バランスが良い | やや複雑 |
| Token Bucket | トークンを消費してアクセス | バーストを許容しつつ制限 | パラメータ調整が必要 |
| Leaky Bucket | 一定速度でリクエスト処理 | 出力レートが安定 | バースト対応が苦手 |
const rateLimit = require('express-rate-limit'); const RedisStore = require('rate-limit-redis'); // グローバル制限: 全APIに適用 const globalLimiter = rateLimit({ windowMs: 15 * 60 * 1000, // 15分 max: 100, standardHeaders: true, // RateLimit-* ヘッダー返却 legacyHeaders: false, message: { error: 'Too many requests, please try again later.' }, }); // 認証エンドポイント用: より厳しい制限 const authLimiter = rateLimit({ windowMs: 15 * 60 * 1000, max: 5, // ログイン試行は15分で5回まで skipSuccessfulRequests: true, // 成功時はカウントしない }); // 分散環境向け: Redis バックエンド const distributedLimiter = rateLimit({ store: new RedisStore({ sendCommand: (...args) => redisClient.sendCommand(args) }), windowMs: 60 * 1000, max: 30, }); app.use(globalLimiter); app.use('/api/auth', authLimiter);
RateLimit-Limit: 100 RateLimit-Remaining: 42 RateLimit-Reset: 1672531200 Retry-After: 30
IPアドレスだけでなく、APIキー・ユーザーID・エンドポイント単位など複合的なキーでレート制限を行うことで、正当なユーザーへの影響を最小限に抑えられます。
すべてのAPI入力は悪意あるデータを含む可能性があります。サーバーサイドで必ず検証してください。
許可する値のリストを定義。Denylist(禁止リスト)より安全。新しい攻撃パターンにも対応可能。
データ型、最大文字数、数値の上下限を厳密にチェック。バッファオーバーフロー対策の基本。
不正入力は「修正」よりも「拒否」が安全。サニタイズは予期しない変換を生む可能性がある。
const Ajv = require('ajv'); const addFormats = require('ajv-formats'); const ajv = new Ajv({ allErrors: true, removeAdditional: true }); addFormats(ajv); // ユーザー作成APIのスキーマ const createUserSchema = { type: 'object', required: ['name', 'email'], additionalProperties: false, properties: { name: { type: 'string', minLength: 1, maxLength: 100, pattern: '^[a-zA-Z0-9\\s\\-]+$', // 許可文字を限定 }, email: { type: 'string', format: 'email', maxLength: 254, }, age: { type: 'integer', minimum: 0, maximum: 150, }, }, }; // バリデーションミドルウェア function validateBody(schema) { const validate = ajv.compile(schema); return (req, res, next) => { if (!validate(req.body)) { return res.status(400).json({ error: 'Validation failed', details: validate.errors, }); } next(); }; } app.post('/api/users', validateBody(createUserSchema), createUser);
| 攻撃 | 対策 | 例 |
|---|---|---|
| SQLインジェクション | パラメータ化クエリ、ORM使用 | db.query('SELECT * FROM users WHERE id = ?', [id]) |
| NoSQLインジェクション | 型チェック、$演算子のサニタイズ | 入力が文字列であることを確認(Object拒否) |
| XSS(APIレスポンス経由) | Content-Type指定、エスケープ | Content-Type: application/json |
| パストラバーサル | 入力からパスセパレータを除去 | path.basename()で正規化 |
| XXE(XML External Entity) | 外部エンティティ解決を無効化 | XMLパーサー設定で無効化 |
CORSはブラウザのSame-Originポリシーを制御する仕組みです。設定ミスは重大なセキュリティリスクになります。
Access-Control-Allow-Origin: * と Access-Control-Allow-Credentials: true は同時に使えません。ワイルドカードの使用は公開APIのみに限定してください。
const cors = require('cors'); // 許可するオリジンを明示的に指定 const allowedOrigins = [ 'https://app.example.com', 'https://admin.example.com', ]; app.use(cors({ origin(origin, callback) { // サーバー間通信(originなし)は許可 if (!origin || allowedOrigins.includes(origin)) { callback(null, true); } else { callback(new Error('Not allowed by CORS')); } }, methods: ['GET', 'POST', 'PUT', 'DELETE'], allowedHeaders: ['Content-Type', 'Authorization', 'X-API-Key'], credentials: true, maxAge: 86400, // プリフライトキャッシュ: 24時間 }));
| ヘッダー | 用途 | 推奨値 |
|---|---|---|
Access-Control-Allow-Origin | 許可するオリジン | 明示的なドメイン指定 |
Access-Control-Allow-Methods | 許可するHTTPメソッド | 必要最小限 |
Access-Control-Allow-Headers | 許可するリクエストヘッダー | 必要最小限 |
Access-Control-Allow-Credentials | Cookie送信の許可 | true(認証必要時のみ) |
Access-Control-Max-Age | プリフライトキャッシュ秒数 | 86400(24時間) |
Access-Control-Expose-Headers | JSから読めるレスポンスヘッダー | RateLimit-*等 必要なもののみ |
LLM APIは、レート制限に新たな考慮事項をもたらします。Token消費量、リクエストあたりのコスト、計算集約型の推論処理への対応が必要です。
| 観点 | 従来のAPI | LLM API |
|---|---|---|
| リクエストあたりのコスト | 低コスト、予測可能 | 可変、Token数に応じて100倍以上になる場合あり |
| レート制限の単位 | 時間枠あたりのリクエスト数 | TPM(Tokens per minute)+ RPM(Requests per minute) |
| 悪用パターン | スクレイピング、ブルートフォース、DDoS | Prompt injection、リソース枯渇、denial-of-wallet |
| 適合するアルゴリズム | 固定ウィンドウ / スライディングウィンドウ | Token bucket(Token数による重み付け) |
import tiktoken import time from collections import defaultdict class TokenRateLimiter: """Rate limiter that counts tokens, not just requests.""" def __init__(self, tokens_per_minute=100_000, requests_per_minute=60): self.tpm_limit = tokens_per_minute self.rpm_limit = requests_per_minute self.usage = defaultdict(lambda: {"tokens": [], "requests": []}) self.encoder = tiktoken.encoding_for_model("gpt-4") def count_tokens(self, text: str) -> int: return len(self.encoder.encode(text)) def check_limit(self, user_id: str, prompt: str) -> dict: now = time.time() window = now - 60 # 1分間のスライディングウィンドウ user = self.usage[user_id] # 期限切れエントリのクリーンアップ user["tokens"] = [(t, c) for t, c in user["tokens"] if t > window] user["requests"] = [t for t in user["requests"] if t > window] # RPMのチェック if len(user["requests"]) >= self.rpm_limit: return {"allowed": False, "reason": "RPM limit exceeded"} # TPMのチェック token_count = self.count_tokens(prompt) used_tokens = sum(c for _, c in user["tokens"]) if used_tokens + token_count > self.tpm_limit: return {"allowed": False, "reason": "TPM limit exceeded"} # 使用量を記録 user["tokens"].append((now, token_count)) user["requests"].append(now) return {"allowed": True, "tokens_used": token_count}
Prompt injectionはLLMアプリケーションにおける最大のリスクです。入力検証、構造的分離、出力検証による多層防御を実施してください。
ユーザー入力をプロンプトに含める前に、特殊Token、命令のようなパターン、制御文字を除去またはエスケープしてください。
デリミタ、XMLタグ、またはメッセージロールの分離を使用して、システム命令とユーザー提供コンテンツを明確に隔離してください。
LLMの応答を期待されるスキーマに対して検証してください。レンダリング前に、データ漏洩、命令の追従、悪意あるコンテンツの有無を確認してください。
| 攻撃 | 対策 | 例 |
|---|---|---|
| 直接的Prompt Injection | 入力サニタイズ + 命令/データの分離 | 「前の指示を無視して...」 |
| 間接的Prompt Injection | RAG結果のサニタイズ + カナリアToken | 取得ドキュメントに隠された悪意ある命令 |
| コンテキストスタッフィング | Token制限 + 入力の切り詰め | コンテキストウィンドウを埋め尽くしてシステム命令を押し出す |
| パラメータ改ざん | スキーマバリデーション + 型付きパラメータ | temperature、max_tokens、モデルパラメータの操作 |
from pydantic import BaseModel, Field, field_validator import re class LLMRequest(BaseModel): """Validated LLM request with prompt injection defenses.""" user_message: str = Field(..., max_length=4000) max_tokens: int = Field(default=1000, ge=1, le=4096) temperature: float = Field(default=0.7, ge=0.0, le=2.0) @field_validator("user_message") @classmethod def sanitize_prompt(cls, v: str) -> str: # 既知のインジェクションパターンをブロック patterns = [ r"(?i)ignore\s+(previous|above|all)\s+(instructions?|prompts?)", r"(?i)you\s+are\s+now\s+", r"(?i)system\s*:\s*", r"(?i)\[INST\]|\[\/INST\]|<\|im_start\|>", ] for pattern in patterns: if re.search(pattern, v): raise ValueError("Input contains disallowed patterns") return v # FastAPIでの使用例 @app.post("/api/chat") async def chat(request: LLMRequest): # システム命令とユーザー入力を分離 messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": f"<user_input>{request.user_message}</user_input>"}, ] return await call_llm(messages, request.max_tokens, request.temperature)
関連: LLM01: Prompt Injection、LLM02: 安全でない出力処理、ASI02: ツール結果を介したPrompt Injection
# コンテンツタイプの自動判別を無効化 X-Content-Type-Options: nosniff # iframe埋め込みを禁止 X-Frame-Options: DENY # HTTPS強制 Strict-Transport-Security: max-age=31536000; includeSubDomains # CSP: APIはJSONのみ返すためscript実行を全て禁止 Content-Security-Policy: default-src 'none'; frame-ancestors 'none' # Referrer情報の制限 Referrer-Policy: no-referrer # ブラウザの機能制限 Permissions-Policy: geolocation=(), camera=(), microphone=()