diff --git a/lib/internal/streams/iter/broadcast.js b/lib/internal/streams/iter/broadcast.js index 7b6fc3525d122f..8cbbf0bb501dc1 100644 --- a/lib/internal/streams/iter/broadcast.js +++ b/lib/internal/streams/iter/broadcast.js @@ -720,7 +720,7 @@ function broadcast(options = { __proto__: null }) { broadcastImpl.setWriter(writer); if (signal) { - onSignalAbort(signal, () => broadcastImpl.cancel()); + onSignalAbort(signal, () => broadcastImpl.cancel(signal.reason)); } return { __proto__: null, writer, broadcast: broadcastImpl }; diff --git a/lib/internal/streams/iter/share.js b/lib/internal/streams/iter/share.js index 0160bc7eace009..9cf169f8a5f3c1 100644 --- a/lib/internal/streams/iter/share.js +++ b/lib/internal/streams/iter/share.js @@ -144,6 +144,7 @@ class ShareImpl { // cursor must re-pull rather than terminating prematurely. for (;;) { if (state.detached) { + if (self.#sourceError) throw self.#sourceError; return { __proto__: null, done: true, value: undefined }; } @@ -657,7 +658,7 @@ function share(source, options = { __proto__: null }) { const shareImpl = new ShareImpl(normalized, opts); if (signal) { - onSignalAbort(signal, () => shareImpl.cancel()); + onSignalAbort(signal, () => shareImpl.cancel(signal.reason)); } return shareImpl; diff --git a/test/parallel/test-stream-iter-broadcast-from.js b/test/parallel/test-stream-iter-broadcast-from.js index 2f17b1a7de92fa..200d0a7022d09c 100644 --- a/test/parallel/test-stream-iter-broadcast-from.js +++ b/test/parallel/test-stream-iter-broadcast-from.js @@ -66,15 +66,13 @@ async function testBroadcastFromMultipleConsumers() { async function testAbortSignal() { const ac = new AbortController(); const { broadcast: bc } = broadcast({ signal: ac.signal }); - const consumer = bc.push(); + const iter = bc.push()[Symbol.asyncIterator](); + const read = iter.next(); + const rejected = assert.rejects(read, { name: 'AbortError' }); ac.abort(); - const batches = []; - for await (const batch of consumer) { - batches.push(batch); - } - assert.strictEqual(batches.length, 0); + await rejected; } async function testAlreadyAbortedSignal() { @@ -84,11 +82,12 @@ async function testAlreadyAbortedSignal() { const { broadcast: bc } = broadcast({ signal: ac.signal }); const consumer = bc.push(); - const batches = []; - for await (const batch of consumer) { - batches.push(batch); - } - assert.strictEqual(batches.length, 0); + await assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of consumer) { + assert.fail('Should not reach here'); + } + }, { name: 'AbortError' }); } // ============================================================================= diff --git a/test/parallel/test-stream-iter-share-async.js b/test/parallel/test-stream-iter-share-async.js index 86b0eb9b273a34..f7a53a063cb3e0 100644 --- a/test/parallel/test-stream-iter-share-async.js +++ b/test/parallel/test-stream-iter-share-async.js @@ -134,16 +134,62 @@ async function testShareCancelWithReason() { async function testShareAbortSignal() { const ac = new AbortController(); - const shared = share(from('data'), { signal: ac.signal }); - const consumer = shared.pull(); + const enc = new TextEncoder(); + async function* source() { + yield [enc.encode('a')]; + yield [enc.encode('b')]; + } + const shared = share(source(), { + highWaterMark: 1, + backpressure: 'block', + signal: ac.signal, + }); + const fast = shared.pull()[Symbol.asyncIterator](); + shared.pull(); + + await fast.next(); + const read = fast.next(); + const rejected = assert.rejects(read, { name: 'AbortError' }); + ac.abort(); + + await rejected; +} +async function testShareAbortSignalWhileSourcePullPending() { + const ac = new AbortController(); + let resume; + let sourceStarted; + const sourceStartedPromise = new Promise((resolve) => { + sourceStarted = resolve; + }); + const source = { + __proto__: null, + [Symbol.asyncIterator]() { + return { + __proto__: null, + async next() { + await new Promise((resolve) => { + resume = resolve; + sourceStarted(); + }); + return { __proto__: null, done: true, value: undefined }; + }, + }; + }, + }; + const shared = share(source, { signal: ac.signal }); + const iter1 = shared.pull()[Symbol.asyncIterator](); + const iter2 = shared.pull()[Symbol.asyncIterator](); + const read1 = iter1.next(); + const read2 = iter2.next(); + const rejected1 = assert.rejects(read1, { name: 'AbortError' }); + const rejected2 = assert.rejects(read2, { name: 'AbortError' }); + + await sourceStartedPromise; ac.abort(); + resume(); - const batches = []; - for await (const batch of consumer) { - batches.push(batch); - } - assert.strictEqual(batches.length, 0); + await Promise.all([rejected1, rejected2]); } async function testShareAlreadyAborted() { @@ -153,11 +199,12 @@ async function testShareAlreadyAborted() { const shared = share(from('data'), { signal: ac.signal }); const consumer = shared.pull(); - const batches = []; - for await (const batch of consumer) { - batches.push(batch); - } - assert.strictEqual(batches.length, 0); + await assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of consumer) { + assert.fail('Should not reach here'); + } + }, { name: 'AbortError' }); } // ============================================================================= @@ -273,6 +320,7 @@ Promise.all([ testShareCancelMidIteration(), testShareCancelWithReason(), testShareAbortSignal(), + testShareAbortSignalWhileSourcePullPending(), testShareAlreadyAborted(), testShareSourceError(), testShareLateJoiningConsumer(),