
こんにちは。カミナシで「カミナシ 設備保全」サービスの開発を行っている澤木です。
今回はReact Router v7でReadableStreamを利用したデータの逐次表示を実装する方法について紹介します。
現在私たちのチームではLLMを使った機能開発を行っているのですが、LLMは生成処理に時間がかかるため全てのデータ生成を待ってから表示するとユーザー体験が悪化してしまいます。 そのためストリーミングで逐次データを受け取り、生成された部分から少しずつ表示していくという実装が一般的かと思います。
React Router v7にもストリーミング機能は提供されているのですが、これはあくまで遅延読み込みを実現するためのものでありデータの一部分を読み取りながら表示していくユースケースには対応していません。
そこで今回はReact Router v7のストリーミング機能は使わず、ReadableStream APIを直接使用してストリーミング処理を実装しました。
React Router v7のストリーミング機能について
ReadableStreamを使ったストリーミングの実装に入る前に、React Router v7のストリーミング機能について簡単に説明します。
React Router v7ではloader関数から未解決のPromiseを返すことで、Promiseの解決を待たずにコンポーネントをレンダリングすることができます。コンポーネント側ではSuspenseでPromiseの解決を待つことで、読み込みに時間がかかるデータを遅延読み込みすることが可能になります。
しかしこの機能はあくまでPromise単位での遅延読み込みであり、チャンクでデータを少しずつ受け取るというユースケースには対応していないため別の方法で実装を行う必要があります。
実装方針
実装当初はReact Router v7の標準的な方法に則りpageにactionを実装してFormからsubmitする形で実装しようとしたのですが、useActionDataで取得できるデータはReact Router側でJSONとしてパースされてしまうためReadableStreamをそのまま扱うことができませんでした
export const action = async ({ request }: LoaderFunctionArgs) => { // ReadableStreamを返す return new Response(new ReadableStream({ ... })); }; export default function Page() { const actionData = useActionData(); // JSONとしてパースされてしまう // ReadableStreamを扱えないためストリーミング表示ができない }
このままではReadableStreamを直接扱うことができないため、pageのactionとして実装するのではなくResource Routesでactionを実装し、クライアント側ではfetch APIでResource Routesを直接呼び出す形で実装を行いました。
Resource RoutesはUIコンポーネントを持たないルートとして実装できるため、ストリーミングレスポンスを返すAPIエンドポイントとして利用することができます。
actionの実装
今回は例として、BedrockモデルのAPIを呼び出してReadableStreamとしてレスポンスを返すaction関数を実装します。
BedrockモデルをはじめとするLLM APIでは、ストリーミング形式のレスポンスがAsyncIterableとして返されることが多いため、これをReadableStreamに変換して返します。BedrockモデルのInvokeModelWithResponseStreamCommandではチャンク毎にJSON形式のデータが返されるため、コンテンツ内のテキスト部分だけを抽出してクライアント側に返却するようにしています。
このactionはUIコンポーネントを持たないResource Routesとして/invoke-modelに実装し、fetch APIでfetch("/invoke-model", { method: "POST" })のように呼び出せるようにしています。
import { BedrockRuntimeClient, InvokeModelWithResponseStreamCommand, } from "@aws-sdk/client-bedrock-runtime"; import type { LoaderFunctionArgs } from "react-router"; export const action = async ({ request }: LoaderFunctionArgs) => { // リクエストの取得 const formData = await request.formData(); const prompt = formData.get("prompt"); // bedrockモデルのinvoke const client = new BedrockRuntimeClient({ region: "ap-northeast-1", }); const payload = { anthropic_version: "bedrock-2023-05-31", max_tokens: 1000, messages: [ { role: "user", content: [{ type: "text", text: prompt }], }, ], }; const command = new InvokeModelWithResponseStreamCommand({ contentType: "application/json", body: JSON.stringify(payload), modelId: "apac.anthropic.claude-sonnet-4-20250514-v1:0", }); const apiResponse = await client.send(command); // ReadableStreamとしてレスポンスを返す return new Response( new ReadableStream({ async start(controller) { // AsyncIterableをReadableStreamに変換 for await (const item of apiResponse?.body ?? []) { const chunk = JSON.parse(new TextDecoder().decode(item.chunk?.bytes)); // 生成内容のテキストのみ取り出す if (chunk.type === "content_block_delta") { const text = chunk.delta.text; controller.enqueue(text); } } controller.close(); }, }), ); };
クライアント側の実装
クライアント側ではfetch APIを使ってactionを呼び出し、ReadableStreamを受け取ってチャンク毎にデータを処理します。先ほど実装したactionはResource Routes(/invoke-model)として実装しているため、fetch APIで直接呼び出すことができます。
読み出し側ではReadableStreamからreaderを取得しreader.read()でストリームが終了するまでチャンクを逐次読み取り、setMessageでメッセージにチャンクのテキストを順次追加していきます。messageのstateにはテキストが逐次追加されていくため、UI上では生成された部分から少しずつ表示されていくようになります。
import { useCallback, useRef, useState } from "react"; export const useReadableStream = () => { const [message, setMessage] = useState(""); const submit = useCallback((formData: FormData) => { fetch("/invoke-model", { method: "POST", body: formData, }) .then(async (response) => { if (!response.ok || !response.body) { return; } const reader = response.body.getReader(); const decoder = new TextDecoder(); while (true) { // チャンクの読み取り const { done, value } = await reader.read(); if (done) { break; } // チャンクのデコードとstateの更新 const chunk = decoder.decode(value, { stream: true }); setMessage((prev) => prev + chunk); } }) .catch((error) => { console.error("Fetch error:", error); }); }, []); return { submit, message, }; }; function Sample() { const { submit, message } = useReadableStream(); return ( <div> <form onSubmit={(e) => { e.preventDefault(); const formData = new FormData(e.currentTarget); submit(formData); }} > <textarea name="prompt" /> <button type="submit">Submit</button> </form> <p>{message}</p> </div> ); }
クライアント側でJSON形式のデータを扱う場合
Bedrockモデルのようにチャンク毎にJSON形式でデータが返される場合(またはクライアント側に構造化されたデータを返したい場合)、テキストとしてではなくJSONとしてパースして扱いたいケースがあるかと思います。そのような場合は、JSON Linesのように改行区切りでJSONを返し、クライアント側ではチャンクのテキストをバッファで保持しながら改行で分割してパースする実装が必要になります。
今回のBedrockモデルの例では、Bedrock側から返されるチャンクが1つのJSONとなっているため、各チャンク毎に改行文字を追加してクライアントへ返却し、クライアント側ではJSON形式で扱えるように実装を行います。
import { BedrockRuntimeClient, InvokeModelWithResponseStreamCommand, } from "@aws-sdk/client-bedrock-runtime"; import { useCallback, useRef, useState } from "react"; import type { LoaderFunctionArgs } from "react-router"; export const action = async ({ request }: LoaderFunctionArgs) => { // リクエストの取得 const formData = await request.formData(); const prompt = formData.get("prompt"); // bedrockモデルのinvoke const client = new BedrockRuntimeClient({ region: "ap-northeast-1", }); const payload = { anthropic_version: "bedrock-2023-05-31", max_tokens: 1000, messages: [ { role: "user", content: [{ type: "text", text: prompt }], }, ], }; const command = new InvokeModelWithResponseStreamCommand({ contentType: "application/json", body: JSON.stringify(payload), modelId: "apac.anthropic.claude-sonnet-4-20250514-v1:0", }); const apiResponse = await client.send(command); // エンコーダーとデコーダーの初期化 const encoder = new TextEncoder(); const decoder = new TextDecoder(); // ReadableStreamとしてレスポンスを返す return new Response( new ReadableStream({ async start(controller) { for await (const item of apiResponse?.body ?? []) { // チャンクをテキストとしてデコード const json = decoder.decode(item.chunk?.bytes); // 各チャンクごとに改行を追加して返却 controller.enqueue(encoder.encode(`${json}\n`)); } controller.close(); }, }), ); }; export const useReadableStream = () => { // JSONデータを配列で保持 const [data, setData] = useState<any[]>([]); // チャンクのバッファ const buffer = useRef(""); const submit = useCallback((formData: FormData) => { fetch("/invoke-model", { method: "POST", body: formData, }) .then(async (response) => { if (!response.ok || !response.body) { return; } buffer.current = ""; const reader = response.body.getReader(); const decoder = new TextDecoder(); while (true) { const { done, value } = await reader.read(); if (done) { // ストリーム終了時に残ったバッファを処理 if (buffer.current.trim() !== "") { try { const json = JSON.parse(buffer.current); setData((prev) => [...prev, json]); } catch (e) { console.log("JSON parse error:", e); } } break; } // チャンクをバッファに保持 const chunk = decoder.decode(value, { stream: true }); buffer.current += chunk; // 改行で分割して最終行のみをバッファに残す const lines = buffer.current.split("\n"); buffer.current = lines.pop() || ""; // 各行をJSONとしてパース for (const line of lines) { if (line.trim() === "") continue; try { const json = JSON.parse(line); setData((prev) => [...prev, json]); } catch (e) { console.error("JSON parse error:", e); } } } }) .catch((error) => { console.error("Fetch error:", error); }); }, []); return { submit, data, }; };
おわりに
React Router v7でReadableStreamを使ったストリーミング表示の実装方法について紹介しました。
useActionDataを使わないなどReact Router v7の標準的な使い方からは少し逸脱した実装になっていますが、Resource Routesを活用することで通常のReact Router v7のアーキテクチャから大きく外れることなくストリーミング処理を実装できたかと思います。
今回はLLMの呼び出しを例に挙げましたが、ストリーミング処理の実装パターンは大きなファイルのダウンロード進捗表示やリアルタイムデータの表示など、様々なユースケースに応用できるかと思います。