Skip to content

Commit 79983d5

Browse files
committed
remove extra ticks from executeStreamAsyncIterator
1 parent f965bd5 commit 79983d5

File tree

2 files changed

+121
-141
lines changed

2 files changed

+121
-141
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1730,11 +1730,6 @@ describe('Execute: stream directive', () => {
17301730
items: [{ id: '1', name: 'Luke' }],
17311731
path: ['nestedObject', 'nestedFriendList', 0],
17321732
},
1733-
],
1734-
hasNext: true,
1735-
},
1736-
{
1737-
incremental: [
17381733
{
17391734
items: [{ id: '2', name: 'Han' }],
17401735
path: ['nestedObject', 'nestedFriendList', 1],
@@ -1793,15 +1788,6 @@ describe('Execute: stream directive', () => {
17931788
data: { scalarField: 'slow', nestedFriendList: [] },
17941789
path: ['nestedObject'],
17951790
},
1796-
],
1797-
hasNext: true,
1798-
},
1799-
done: false,
1800-
});
1801-
const result3 = await iterator.next();
1802-
expectJSON(result3).toDeepEqual({
1803-
value: {
1804-
incremental: [
18051791
{
18061792
items: [{ name: 'Luke' }],
18071793
path: ['nestedObject', 'nestedFriendList', 0],
@@ -1811,8 +1797,8 @@ describe('Execute: stream directive', () => {
18111797
},
18121798
done: false,
18131799
});
1814-
const result4 = await iterator.next();
1815-
expectJSON(result4).toDeepEqual({
1800+
const result3 = await iterator.next();
1801+
expectJSON(result3).toDeepEqual({
18161802
value: {
18171803
incremental: [
18181804
{
@@ -1824,8 +1810,8 @@ describe('Execute: stream directive', () => {
18241810
},
18251811
done: false,
18261812
});
1827-
const result5 = await iterator.next();
1828-
expectJSON(result5).toDeepEqual({
1813+
const result4 = await iterator.next();
1814+
expectJSON(result4).toDeepEqual({
18291815
value: undefined,
18301816
done: true,
18311817
});

src/execution/execute.ts

Lines changed: 117 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -1347,12 +1347,12 @@ async function completeAsyncIteratorValue(
13471347
index,
13481348
iterator,
13491349
exeContext,
1350+
itemType,
13501351
getStreamedFieldGroup(fieldGroup, streamUsage),
13511352
info,
1352-
itemType,
13531353
path,
1354-
streamContext,
13551354
deferMap,
1355+
streamContext,
13561356
parentRecords,
13571357
);
13581358
break;
@@ -2280,99 +2280,17 @@ function executeStreamField(
22802280
return currentParents;
22812281
}
22822282

2283-
async function executeStreamAsyncIteratorItem(
2284-
iterator: AsyncIterator<unknown>,
2285-
exeContext: ExecutionContext,
2286-
fieldGroup: FieldGroup,
2287-
info: GraphQLResolveInfo,
2288-
itemType: GraphQLOutputType,
2289-
streamRecord: StreamRecord,
2290-
itemPath: Path<FieldGroup>,
2291-
deferMap: Map<DeferUsage, DeferredFragmentRecord>,
2292-
parentRecords: Array<AsyncPayloadRecord>,
2293-
): Promise<IteratorResult<unknown>> {
2294-
let item;
2295-
try {
2296-
const iteration = await iterator.next();
2297-
if (!exeContext.streams.has(streamRecord.streamContext)) {
2298-
streamRecord.remove();
2299-
return { done: true, value: undefined };
2300-
}
2301-
const { value, done } = iteration;
2302-
if (done) {
2303-
streamRecord.remove();
2304-
return { done, value: undefined };
2305-
}
2306-
item = value;
2307-
} catch (rawError) {
2308-
handleFieldError(
2309-
rawError,
2310-
exeContext,
2311-
itemType,
2312-
fieldGroup,
2313-
itemPath,
2314-
deferMap,
2315-
streamRecord,
2316-
);
2317-
// don't continue if iterator throws
2318-
return { done: true, value: null };
2319-
}
2320-
let completedItem;
2321-
try {
2322-
completedItem = completeValue(
2323-
exeContext,
2324-
itemType,
2325-
fieldGroup,
2326-
info,
2327-
itemPath,
2328-
item,
2329-
deferMap,
2330-
streamRecord,
2331-
parentRecords,
2332-
);
2333-
2334-
if (isPromise(completedItem)) {
2335-
completedItem = completedItem.then(undefined, (rawError) => {
2336-
handleFieldError(
2337-
rawError,
2338-
exeContext,
2339-
itemType,
2340-
fieldGroup,
2341-
itemPath,
2342-
deferMap,
2343-
streamRecord,
2344-
);
2345-
filterSubsequentPayloads(exeContext, itemPath, parentRecords);
2346-
return null;
2347-
});
2348-
}
2349-
return { done: false, value: completedItem };
2350-
} catch (rawError) {
2351-
handleFieldError(
2352-
rawError,
2353-
exeContext,
2354-
itemType,
2355-
fieldGroup,
2356-
itemPath,
2357-
deferMap,
2358-
streamRecord,
2359-
);
2360-
filterSubsequentPayloads(exeContext, itemPath, parentRecords);
2361-
return { done: false, value: null };
2362-
}
2363-
}
2364-
23652283
async function executeStreamAsyncIterator(
23662284
initialIndex: number,
23672285
iterator: AsyncIterator<unknown>,
23682286
exeContext: ExecutionContext,
2287+
itemType: GraphQLOutputType,
23692288
fieldGroup: FieldGroup,
23702289
info: GraphQLResolveInfo,
2371-
itemType: GraphQLOutputType,
23722290
path: Path<FieldGroup>,
2373-
streamContext: StreamContext,
23742291
deferMap: Map<DeferUsage, DeferredFragmentRecord>,
2375-
parents?: Array<AsyncPayloadRecord> | undefined,
2292+
streamContext: StreamContext,
2293+
parents: Array<AsyncPayloadRecord> | undefined,
23762294
): Promise<void> {
23772295
let index = initialIndex;
23782296
let currentParents = parents;
@@ -2387,55 +2305,135 @@ async function executeStreamAsyncIterator(
23872305
});
23882306
currentParents = [streamRecord];
23892307

2390-
let iteration;
2308+
currentParents = [streamRecord];
2309+
2310+
let item;
23912311
try {
23922312
// eslint-disable-next-line no-await-in-loop
2393-
iteration = await executeStreamAsyncIteratorItem(
2394-
iterator,
2313+
const iteration = await iterator.next();
2314+
if (iteration.done) {
2315+
streamRecord.remove();
2316+
return;
2317+
}
2318+
item = iteration.value;
2319+
} catch (rawError) {
2320+
handleFieldError(
2321+
rawError,
23952322
exeContext,
2396-
fieldGroup,
2397-
info,
23982323
itemType,
2399-
streamRecord,
2324+
fieldGroup,
24002325
itemPath,
24012326
deferMap,
2402-
currentParents,
2327+
streamRecord,
24032328
);
2329+
// don't continue if iterator throws
2330+
streamRecord.addItems([null]);
2331+
exeContext.streams.delete(streamContext);
2332+
return;
2333+
}
2334+
2335+
try {
2336+
let completedItem;
2337+
try {
2338+
completedItem = completeValue(
2339+
exeContext,
2340+
itemType,
2341+
fieldGroup,
2342+
info,
2343+
itemPath,
2344+
item,
2345+
deferMap,
2346+
streamRecord,
2347+
currentParents,
2348+
);
2349+
2350+
if (isPromise(completedItem)) {
2351+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
2352+
handlePromisedStreamResult(
2353+
completedItem,
2354+
streamRecord,
2355+
exeContext,
2356+
itemType,
2357+
fieldGroup,
2358+
path,
2359+
itemPath,
2360+
deferMap,
2361+
streamContext,
2362+
);
2363+
} else {
2364+
streamRecord.addItems([completedItem]);
2365+
}
2366+
} catch (rawError) {
2367+
handleFieldError(
2368+
rawError,
2369+
exeContext,
2370+
itemType,
2371+
fieldGroup,
2372+
itemPath,
2373+
deferMap,
2374+
streamRecord,
2375+
);
2376+
filterSubsequentPayloads(exeContext, itemPath, currentParents);
2377+
streamRecord.addItems([null]);
2378+
}
24042379
} catch (error) {
2405-
streamRecord.errors.push(error);
2380+
if (fieldGroup.inInitialResult) {
2381+
streamRecord.errors.push(error);
2382+
}
2383+
returnStreamIteratorIgnoringError(streamContext);
2384+
exeContext.streams.delete(streamContext);
24062385
filterSubsequentPayloads(exeContext, path, currentParents);
24072386
streamRecord.addItems(null);
2408-
// entire stream has errored and bubbled upwards
2409-
if (iterator?.return) {
2410-
iterator.return().catch(() => {
2411-
// ignore errors
2412-
});
2413-
}
24142387
return;
24152388
}
24162389

2417-
const { done, value: completedItem } = iteration;
2418-
2419-
let completedItems: PromiseOrValue<Array<unknown> | null>;
2420-
if (isPromise(completedItem)) {
2421-
completedItems = completedItem.then(
2422-
(value) => [value],
2423-
(error) => {
2424-
streamRecord.errors.push(error);
2425-
filterSubsequentPayloads(exeContext, path, [streamRecord]);
2426-
return null;
2427-
},
2428-
);
2429-
} else {
2430-
completedItems = [completedItem];
2390+
if (!exeContext.streams.has(streamContext)) {
2391+
// stream was filtered
2392+
returnStreamIteratorIgnoringError(streamContext);
2393+
break;
24312394
}
24322395

2433-
streamRecord.addItems(completedItems);
2396+
index++;
2397+
}
2398+
}
24342399

2435-
if (done) {
2436-
break;
2400+
async function handlePromisedStreamResult(
2401+
result: Promise<unknown>,
2402+
streamRecord: StreamRecord,
2403+
exeContext: ExecutionContext,
2404+
itemType: GraphQLOutputType,
2405+
fieldGroup: FieldGroup,
2406+
path: Path<FieldGroup>,
2407+
itemPath: Path<FieldGroup>,
2408+
deferMap: Map<DeferUsage, DeferredFragmentRecord>,
2409+
streamContext: StreamContext,
2410+
): Promise<void> {
2411+
try {
2412+
let resolved;
2413+
try {
2414+
resolved = await result;
2415+
} catch (rawError) {
2416+
handleFieldError(
2417+
rawError,
2418+
exeContext,
2419+
itemType,
2420+
fieldGroup,
2421+
itemPath,
2422+
deferMap,
2423+
streamRecord,
2424+
);
2425+
filterSubsequentPayloads(exeContext, itemPath, [streamRecord]);
2426+
resolved = null;
24372427
}
2438-
index++;
2428+
streamRecord.addItems([resolved]);
2429+
} catch (error) {
2430+
if (fieldGroup.inInitialResult) {
2431+
streamRecord.errors.push(error);
2432+
}
2433+
returnStreamIteratorIgnoringError(streamContext);
2434+
exeContext.streams.delete(streamContext);
2435+
filterSubsequentPayloads(exeContext, path, [streamRecord]);
2436+
streamRecord.addItems(null);
24392437
}
24402438
}
24412439

@@ -2533,10 +2531,6 @@ function yieldSubsequentPayloads(
25332531
const incremental = getCompletedIncrementalResults(exeContext);
25342532
const hasNext = exeContext.subsequentPayloads.size > 0;
25352533

2536-
if (!incremental.length && hasNext) {
2537-
return next();
2538-
}
2539-
25402534
if (!hasNext) {
25412535
isDone = true;
25422536
}

0 commit comments

Comments
 (0)