Workers AI ストリーミング SSE プロキシ
Workers AI のストリームを SSE に変換し、クライアント切断時に中断し、ランタイムのキルスイッチでガードする
概要
このレシピでは、Workers AI のチャットモデルをプロキシし、そのレスポンスを
Server-Sent Events (SSE) としてブラウザにストリーミングする Worker エンドポイントを
構築します。単に「env.AI.run を呼ぶだけ」のサンプルでは省かれがちな、3 つの本番運用上の
課題を扱います。
Workers AI の
ReadableStreamをtext/event-streamレスポンスに変換する。クライアントが切断したら処理中の AI リクエストを中断し、閉じられたタブが Workers AI の クォータを消費し続けないようにする。
ランタイムのキルスイッチ(ダッシュボードで設定する単なる環境変数)で、再デプロイなしに
503を返す。
本記事は zudo-text 同期サーバの POST / ハンドラを下敷きにしています。
AI モデルのバインディング
Workers AI は wrangler.toml の [ai] バインディング経由で公開されます。
[ai]
binding = "AI"これにより Worker 内で env.AI が使えるようになります。
Warning
[ai] binding = "AI" [env.staging.ai] binding = "AI"すべての名前付き環境に、それぞれ専用の [env.<name>.ai] ブロックが必要です。 :::
ストリーミングでモデルを呼び出す
モデル ID は @cf/... 形式で渡し、stream: true を指定します。ストリーミングを有効に すると、run() は完成した JSON オブジェクトではなく ReadableStream を返します。
const KIMI_ MODEL = "@cf/ moonshotai/ kimi- k2. 6"; const kimiStream = await env. AI. run(KIMI_ MODEL, { messages, stream: true, });このストリームは SSE 形式のフレーム、すなわち data: {...}\n\n という行を送出し、最後に 終端の data: [DONE]\n\n で終わります。
AI ストリームを SSE に変換する
Workers AI のストリームをそのまま素通しすることはできません。フレームをパースし、自前の SSE イベントとして再送出することで、ブラウザがきれいな text/event-stream を受け取れる ようにします。下記のイベント名(content_block_delta、message_stop、done、error)は このアプリ独自の取り決めです。フロントエンドのコンシューマが期待する任意の名前を使って かまいません。
const encoder = new TextEncoder(); function encodeSseEvent(event: string, data: unknown): Uint8Array { const payload = typeof data = = = "string" ? data : JSON. stringify(data); return encoder. encode(`event: ${event}\ ndata: ${payload}\ n\ n`); } const stream = new ReadableStream<Uint8Array>({ async start(controller) { const send = (event: string, data: unknown) = > { controller. enqueue(encodeSseEvent(event, data)); }; const kimiStream = await env. AI. run(KIMI_ MODEL, { messages, stream: true }); const reader = kimiStream. getReader(); const dec = new TextDecoder(); let buf = ""; let index = 0; while (true) { const { value, done } = await reader. read(); if (done) break; buf += dec. decode(value, { stream: true }); / / Split on SSE frame boundaries (double newline). const parts = buf. split("\ n\ n"); buf = parts. pop() ? ? ""; for (const part of parts) { const line = part. replace(/ ^data: \ s*/ m, ""). trim(); if (!line || line = = = "[DONE]") continue; let parsed: unknown; try { parsed = JSON. parse(line); } catch { continue; / / malformed chunk — skip } / / Workers AI chat- completions shape: / / { choices: [{ delta: { content: ". . . " } }] } / / Older models use { response: ". . . " } — accept both. let text: string | null = null; if (parsed & & typeof parsed = = = "object" & & "choices" in parsed) { const choices = (parsed as { choices: Array<{ delta? : { content? : string } }> }). choices; text = choices? . [0]? . delta? . content ? ? null; } else if (parsed & & typeof parsed = = = "object" & & "response" in parsed) { const r = (parsed as { response: unknown }). response; text = typeof r = = = "string" ? r : null; } if (text != = null & & text != = "") { send("content_ block_ delta", { index, text }); index++; } } } send("message_ stop", {}); send("done", {}); controller. close(); }, }); return new Response(stream, { status: 200, headers: { "Content- Type": "text/ event- stream", "Cache- Control": "no- cache, no- transform", }, });:::note[no-transform を付ける理由]Cache-Control: no-cache, no-transform を指定すると、Cloudflare がストリームを バッファリングしたり書き換えたりしなくなります。これがないと中間装置がチャンクを溜め込み、 「ストリーミングらしさ」が失われることがあります。 :::
クライアント切断時に中断する
ストリーミングの途中でユーザーがタブを閉じた場合、処理中の AI リクエストは最後まで走らせて 誰も読まないレスポンスにクォータを請求するのではなく、すぐに止めたいはずです。補完的な 2 つのフックがあり、両方を使うべきです。
受信リクエストの
signalはクライアントがいなくなると発火します。これをリッスンします。ReadableStreamのcancel()コールバックは、プラットフォームが送出側レスポンスを 破棄したときに発火します。これが実際に破棄を保証する側です。
両方を 1 つの AbortController に束ね、その signal を読み取りループ内でチェックします。
const aborter = new AbortController(); / / (1) Client request signal → abort. const clientSignal: AbortSignal | undefined = req. signal; const onClientAbort = () = > aborter. abort(); clientSignal? . addEventListener("abort", onClientAbort, { once: true }); const stream = new ReadableStream<Uint8Array>({ async start(controller) { const send = (event: string, data: unknown) = > { if (aborter. signal. aborted) return; controller. enqueue(encodeSseEvent(event, data)); }; const reader = kimiStream. getReader(); while (true) { if (aborter. signal. aborted) break; / / stop reading from Workers AI const { value, done } = await reader. read(); if (done) break; / / . . . parse and send as above. . . } clientSignal? . removeEventListener("abort", onClientAbort); controller. close(); }, / / (2) Response torn down by the platform → abort. cancel() { aborter. abort(); }, });aborter が発火すると、次の if (aborter.signal.aborted) break でループを抜け、reader が 解放されるため、Workers AI からそれ以上フレームが引き出されることはありません。
ランタイムのキルスイッチ
コストの急増、不正利用、あるいはモデルの不調など、エンドポイントを今すぐ止めたい場面が あります。デプロイを出さずにそれを実現するのが、リクエスト時に読み取る単なる環境変数です。 これがサーキットブレーカーになります。
/ / Kill switch — short- circuit before any heavy work. / / "1" = disabled; any other value (including unset) = enabled. if (env. AI_ CHAT_ DISABLED = = = "1") { return Response. json({ error: "ai_ unavailable" }, { status: 503 }); }Cloudflare ダッシュボードの Settings → Variables and Secrets で設定します。
AI_CHAT_DISABLED = "1"リクエストごとに読み取られるため、ダッシュボードで切り替えると次のリクエストから反映され、再デプロイは不要です。チェックがちょうど === "1" である点に注意してください。未設定や その他の値ではエンドポイントは有効なままなので、デフォルト(変数なし)は「オン」です。
:::tip[wrangler.toml に記しておく] 値自体はダッシュボードに置かれますが、wrangler.toml にコメントを残しておくと、次に読む人が この変数の存在と "1" の意味を把握できます。
# Kill switch for the chat endpoint. # Set AI_CHAT_DISABLED = "1" in the Cloudflare dashboard (Settings → # Variables and Secrets) to return 503 without a redeploy. Unset or any # other value leaves the endpoint enabled. # [vars] # AI_CHAT_DISABLED = "0"