Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/internal/streams/iter/broadcast.js
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ function broadcast(options = { __proto__: null }) {
broadcastImpl.setWriter(writer);

if (signal) {
onSignalAbort(signal, () => broadcastImpl.cancel());
onSignalAbort(signal, () => broadcastImpl.cancel(signal.reason));
}

return { __proto__: null, writer, broadcast: broadcastImpl };
Expand Down
3 changes: 2 additions & 1 deletion lib/internal/streams/iter/share.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class ShareImpl {
// cursor must re-pull rather than terminating prematurely.
for (;;) {
if (state.detached) {
if (self.#sourceError) throw self.#sourceError;
return { __proto__: null, done: true, value: undefined };
}

Expand Down Expand Up @@ -657,7 +658,7 @@ function share(source, options = { __proto__: null }) {
const shareImpl = new ShareImpl(normalized, opts);

if (signal) {
onSignalAbort(signal, () => shareImpl.cancel());
onSignalAbort(signal, () => shareImpl.cancel(signal.reason));
}

return shareImpl;
Expand Down
21 changes: 10 additions & 11 deletions test/parallel/test-stream-iter-broadcast-from.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,13 @@ async function testBroadcastFromMultipleConsumers() {
async function testAbortSignal() {
const ac = new AbortController();
const { broadcast: bc } = broadcast({ signal: ac.signal });
const consumer = bc.push();
const iter = bc.push()[Symbol.asyncIterator]();
const read = iter.next();
const rejected = assert.rejects(read, { name: 'AbortError' });

ac.abort();

const batches = [];
for await (const batch of consumer) {
batches.push(batch);
}
assert.strictEqual(batches.length, 0);
await rejected;
}

async function testAlreadyAbortedSignal() {
Expand All @@ -84,11 +82,12 @@ async function testAlreadyAbortedSignal() {
const { broadcast: bc } = broadcast({ signal: ac.signal });
const consumer = bc.push();

const batches = [];
for await (const batch of consumer) {
batches.push(batch);
}
assert.strictEqual(batches.length, 0);
await assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of consumer) {
assert.fail('Should not reach here');
}
}, { name: 'AbortError' });
}

// =============================================================================
Expand Down
72 changes: 60 additions & 12 deletions test/parallel/test-stream-iter-share-async.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,62 @@ async function testShareCancelWithReason() {

async function testShareAbortSignal() {
const ac = new AbortController();
const shared = share(from('data'), { signal: ac.signal });
const consumer = shared.pull();
const enc = new TextEncoder();
async function* source() {
yield [enc.encode('a')];
yield [enc.encode('b')];
}
const shared = share(source(), {
highWaterMark: 1,
backpressure: 'block',
signal: ac.signal,
});
const fast = shared.pull()[Symbol.asyncIterator]();
shared.pull();

await fast.next();
const read = fast.next();
const rejected = assert.rejects(read, { name: 'AbortError' });
ac.abort();

await rejected;
}

async function testShareAbortSignalWhileSourcePullPending() {
const ac = new AbortController();
let resume;
let sourceStarted;
const sourceStartedPromise = new Promise((resolve) => {
sourceStarted = resolve;
});
const source = {
__proto__: null,
[Symbol.asyncIterator]() {
return {
__proto__: null,
async next() {
await new Promise((resolve) => {
resume = resolve;
sourceStarted();
});
return { __proto__: null, done: true, value: undefined };
},
};
},
};
const shared = share(source, { signal: ac.signal });
const iter1 = shared.pull()[Symbol.asyncIterator]();
const iter2 = shared.pull()[Symbol.asyncIterator]();
const read1 = iter1.next();
const read2 = iter2.next();
const rejected1 = assert.rejects(read1, { name: 'AbortError' });
const rejected2 = assert.rejects(read2, { name: 'AbortError' });

await sourceStartedPromise;
ac.abort();
resume();

const batches = [];
for await (const batch of consumer) {
batches.push(batch);
}
assert.strictEqual(batches.length, 0);
await Promise.all([rejected1, rejected2]);
}

async function testShareAlreadyAborted() {
Expand All @@ -153,11 +199,12 @@ async function testShareAlreadyAborted() {
const shared = share(from('data'), { signal: ac.signal });
const consumer = shared.pull();

const batches = [];
for await (const batch of consumer) {
batches.push(batch);
}
assert.strictEqual(batches.length, 0);
await assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of consumer) {
assert.fail('Should not reach here');
}
}, { name: 'AbortError' });
}

// =============================================================================
Expand Down Expand Up @@ -273,6 +320,7 @@ Promise.all([
testShareCancelMidIteration(),
testShareCancelWithReason(),
testShareAbortSignal(),
testShareAbortSignalWhileSourcePullPending(),
testShareAlreadyAborted(),
testShareSourceError(),
testShareLateJoiningConsumer(),
Expand Down
Loading