Skip to content

Commit

Permalink
Fix stream events bug when errors are thrown too quickly during itera…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
jacoblee93 committed Jan 29, 2025
1 parent bdab4a3 commit c4df945
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
10 changes: 9 additions & 1 deletion langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -914,19 +914,26 @@ export abstract class Runnable<
// eslint-disable-next-line no-param-reassign
config.callbacks = copiedCallbacks;
}
const abortController = new AbortController();
// Call the runnable in streaming mode,
// add each chunk to the output stream
const outerThis = this;
async function consumeRunnableStream() {
try {
const runnableStream = await outerThis.stream(input, config);
const runnableStream = await outerThis.stream(input, {
...config,
signal: options?.signal
? AbortSignal.any([options.signal, abortController.signal])
: abortController.signal,
});
const tappedStream = eventStreamer.tapOutputIterable(
runId,
runnableStream
);
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _ of tappedStream) {
// Just iterate so that the callback handler picks up events
if (abortController.signal.aborted) break;
}
} finally {
await eventStreamer.finish();
Expand Down Expand Up @@ -959,6 +966,7 @@ export abstract class Runnable<
yield event;
}
} finally {
abortController.abort();
await runnableStreamConsumePromise;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2263,3 +2263,22 @@ test("Runnable streamEvents method should respect passed signal", async () => {
}
}).rejects.toThrowError();
});

test("streamEvents method handles errors", async () => {
let caughtError: unknown;
const model = new FakeListChatModel({
responses: ["abc"],
});

try {
for await (const _ of model.streamEvents("Hello! Tell me about yourself.", {

Check failure on line 2274 in langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts

View workflow job for this annotation

GitHub Actions / Check linting

Invalid loop. Its body allows only one iteration
version: "v2",
})) {
throw new Error("should catch this error");
}
} catch (e) {
caughtError = e;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
expect((caughtError as any)?.message).toEqual("should catch this error");
});

0 comments on commit c4df945

Please sign in to comment.