zudo-cloudflare-wisdom
GitHub リポジトリ

検索したい単語を入力

いつでも検索バーを開ける

Workers AI ストリーミング SSE プロキシ

Workers AI のストリームを SSE に変換し、クライアント切断時に中断し、ランタイムのキルスイッチでガードする

概要

このレシピでは、Workers AI のチャットモデルをプロキシし、そのレスポンスを Server-Sent Events (SSE) としてブラウザにストリーミングする Worker エンドポイントを 構築します。単に「env.AI.run を呼ぶだけ」のサンプルでは省かれがちな、3 つの本番運用上の 課題を扱います。

  • Workers AI の ReadableStreamtext/event-stream レスポンスに変換する。

  • クライアントが切断したら処理中の AI リクエストを中断し、閉じられたタブが Workers AI の クォータを消費し続けないようにする。

  • ランタイムのキルスイッチ(ダッシュボードで設定する単なる環境変数)で、再デプロイなしに 503 を返す。

本記事は zudo-text 同期サーバの POST /api/v1/chat ハンドラを下敷きにしています。

AI モデルのバインディング

Workers AI は wrangler.toml[ai] バインディング経由で公開されます。

[ai]
binding = "AI"

これにより Worker 内で env.AI が使えるようになります。

Warning

[ai] binding = "AI" [env.staging.ai] binding = "AI"

すべての名前付き環境に、それぞれ専用の [env.<name>.ai] ブロックが必要です。 :::

ストリーミングでモデルを呼び出す

モデル ID は @cf/... 形式で渡し、stream: true を指定します。ストリーミングを有効に すると、run() は完成した JSON オブジェクトではなく ReadableStream を返します。

const KIMI_MODEL = "@cf/moonshotai/kimi-k2.6"; const kimiStream = await env.AI.run(KIMI_MODEL, { messages, stream: true, });

このストリームは SSE 形式のフレーム、すなわち data: {...}\n\n という行を送出し、最後に 終端の data: [DONE]\n\n で終わります。

AI ストリームを SSE に変換する

Workers AI のストリームをそのまま素通しすることはできません。フレームをパースし、自前の SSE イベントとして再送出することで、ブラウザがきれいな text/event-stream を受け取れる ようにします。下記のイベント名(content_block_deltamessage_stopdoneerror)は このアプリ独自の取り決めです。フロントエンドのコンシューマが期待する任意の名前を使って かまいません。

const encoder = new TextEncoder(); function encodeSseEvent(event: string, data: unknown): Uint8Array { const payload = typeof data === "string" ? data : JSON.stringify(data); return encoder.encode(`event: ${event}\ndata: ${payload}\n\n`); } const stream = new ReadableStream<Uint8Array>({ async start(controller) { const send = (event: string, data: unknown) => { controller.enqueue(encodeSseEvent(event, data)); }; const kimiStream = await env.AI.run(KIMI_MODEL, { messages, stream: true }); const reader = kimiStream.getReader(); const dec = new TextDecoder(); let buf = ""; let index = 0; while (true) { const { value, done } = await reader.read(); if (done) break; buf += dec.decode(value, { stream: true }); // Split on SSE frame boundaries (double newline). const parts = buf.split("\n\n"); buf = parts.pop() ?? ""; for (const part of parts) { const line = part.replace(/^data:\s*/m, "").trim(); if (!line || line === "[DONE]") continue; let parsed: unknown; try { parsed = JSON.parse(line); } catch { continue; // malformed chunk — skip } // Workers AI chat-completions shape: //   { choices: [{ delta: { content: "..." } }] } // Older models use { response: "..." } — accept both. let text: string | null = null; if (parsed && typeof parsed === "object" && "choices" in parsed) { const choices = (parsed as { choices: Array<{ delta?: { content?: string } }> }).choices; text = choices?.[0]?.delta?.content ?? null; } else if (parsed && typeof parsed === "object" && "response" in parsed) { const r = (parsed as { response: unknown }).response; text = typeof r === "string" ? r : null; } if (text !== null && text !== "") { send("content_block_delta", { index, text }); index++; } } } send("message_stop", {}); send("done", {}); controller.close(); }, }); return new Response(stream, { status: 200, headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache, no-transform", }, });

:::note[no-transform を付ける理由]Cache-Control: no-cache, no-transform を指定すると、Cloudflare がストリームを バッファリングしたり書き換えたりしなくなります。これがないと中間装置がチャンクを溜め込み、 「ストリーミングらしさ」が失われることがあります。 :::

クライアント切断時に中断する

ストリーミングの途中でユーザーがタブを閉じた場合、処理中の AI リクエストは最後まで走らせて 誰も読まないレスポンスにクォータを請求するのではなく、すぐに止めたいはずです。補完的な 2 つのフックがあり、両方を使うべきです。

  1. 受信リクエストの signal はクライアントがいなくなると発火します。これをリッスンします。

  2. ReadableStreamcancel() コールバックは、プラットフォームが送出側レスポンスを 破棄したときに発火します。これが実際に破棄を保証する側です。

両方を 1 つの AbortController に束ね、その signal を読み取りループ内でチェックします。

const aborter = new AbortController(); // (1) Client request signal → abort. const clientSignal: AbortSignal | undefined = req.signal; const onClientAbort = () => aborter.abort(); clientSignal?.addEventListener("abort", onClientAbort, { once: true }); const stream = new ReadableStream<Uint8Array>({ async start(controller) { const send = (event: string, data: unknown) => { if (aborter.signal.aborted) return; controller.enqueue(encodeSseEvent(event, data)); }; const reader = kimiStream.getReader(); while (true) { if (aborter.signal.aborted) break; // stop reading from Workers AI const { value, done } = await reader.read(); if (done) break; // ...parse and send as above... } clientSignal?.removeEventListener("abort", onClientAbort); controller.close(); }, // (2) Response torn down by the platform → abort. cancel() { aborter.abort(); }, });

aborter が発火すると、次の if (aborter.signal.aborted) break でループを抜け、reader が 解放されるため、Workers AI からそれ以上フレームが引き出されることはありません。

ランタイムのキルスイッチ

コストの急増、不正利用、あるいはモデルの不調など、エンドポイントを今すぐ止めたい場面が あります。デプロイを出さずにそれを実現するのが、リクエスト時に読み取る単なる環境変数です。 これがサーキットブレーカーになります。

// Kill switch — short-circuit before any heavy work. // "1" = disabled; any other value (including unset) = enabled. if (env.AI_CHAT_DISABLED === "1") { return Response.json({ error: "ai_unavailable" }, { status: 503 }); }

Cloudflare ダッシュボードの Settings → Variables and Secrets で設定します。

AI_CHAT_DISABLED = "1"

リクエストごとに読み取られるため、ダッシュボードで切り替えると次のリクエストから反映され、再デプロイは不要です。チェックがちょうど === "1" である点に注意してください。未設定や その他の値ではエンドポイントは有効なままなので、デフォルト(変数なし)は「オン」です。

:::tip[wrangler.toml に記しておく] 値自体はダッシュボードに置かれますが、wrangler.toml にコメントを残しておくと、次に読む人が この変数の存在と "1" の意味を把握できます。

# Kill switch for the chat endpoint. # Set AI_CHAT_DISABLED = "1" in the Cloudflare dashboard (Settings → # Variables and Secrets) to return 503 without a redeploy. Unset or any # other value leaves the endpoint enabled. # [vars] # AI_CHAT_DISABLED = "0"

関連

Revision History

Takeshi Takatsudo作成: 2026-05-29T05:33:39+09:00更新: 2026-05-29T05:33:39+09:00