diff --git a/langchain-core/src/runnables/base.ts b/langchain-core/src/runnables/base.ts index 09c8c7162ee9..2dacfd55ed19 100644 --- a/langchain-core/src/runnables/base.ts +++ b/langchain-core/src/runnables/base.ts @@ -914,12 +914,18 @@ export abstract class Runnable< // eslint-disable-next-line no-param-reassign config.callbacks = copiedCallbacks; } + const abortController = new AbortController(); // Call the runnable in streaming mode, // add each chunk to the output stream const outerThis = this; async function consumeRunnableStream() { try { - const runnableStream = await outerThis.stream(input, config); + const runnableStream = await outerThis.stream(input, { + ...config, + signal: options?.signal + ? AbortSignal.any([options.signal, abortController.signal]) + : abortController.signal, + }); const tappedStream = eventStreamer.tapOutputIterable( runId, runnableStream @@ -927,6 +933,7 @@ export abstract class Runnable< // eslint-disable-next-line @typescript-eslint/no-unused-vars for await (const _ of tappedStream) { // Just iterate so that the callback handler picks up events + if (abortController.signal.aborted) break; } } finally { await eventStreamer.finish(); @@ -959,6 +966,7 @@ export abstract class Runnable< yield event; } } finally { + abortController.abort(); await runnableStreamConsumePromise; } } diff --git a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts index 10c0e035c955..7b59e3597aa8 100644 --- a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts +++ b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts @@ -2263,3 +2263,22 @@ test("Runnable streamEvents method should respect passed signal", async () => { } }).rejects.toThrowError(); }); + +test("streamEvents method handles errors", async () => { + let caughtError: unknown; + const model = new FakeListChatModel({ + responses: ["abc"], + }); + + try { + for await (const _ of model.streamEvents("Hello! Tell me about yourself.", { + version: "v2", + })) { + throw new Error("should catch this error"); + } + } catch (e) { + caughtError = e; + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + expect((caughtError as any)?.message).toEqual("should catch this error"); +});