diff --git a/lib/internal/streams/iter/broadcast.js b/lib/internal/streams/iter/broadcast.js index 9b3ccebff9ac89..50ad88c4454881 100644 --- a/lib/internal/streams/iter/broadcast.js +++ b/lib/internal/streams/iter/broadcast.js @@ -92,7 +92,7 @@ class BroadcastImpl { #options; #writer = null; #cachedMinCursor = 0; - #minCursorDirty = false; + #cachedMinCursorConsumers = 0; constructor(options) { this.#options = options; @@ -150,13 +150,13 @@ class BroadcastImpl { }; this.#consumers.add(state); - // New consumer starts at buffer start; recalculate min cursor - // since this consumer may now be the slowest. if (this.#consumers.size === 1) { this.#cachedMinCursor = state.cursor; - this.#minCursorDirty = false; + this.#cachedMinCursorConsumers = 1; + } else if (state.cursor === this.#cachedMinCursor) { + this.#cachedMinCursorConsumers++; } else { - this.#minCursorDirty = true; + this.#recomputeMinCursor(); } const self = this; @@ -167,9 +167,9 @@ class BroadcastImpl { state.detached = true; state.resolve = null; state.reject = null; - self.#consumers.delete(state); - self.#minCursorDirty = true; - self.#tryTrimBuffer(); + if (self.#deleteConsumer(state)) { + self.#tryTrimBuffer(); + } } return { @@ -186,19 +186,19 @@ class BroadcastImpl { const bufferIndex = state.cursor - self.#bufferStart; if (bufferIndex < self.#buffer.length) { const chunk = self.#buffer.get(bufferIndex); - // If this consumer was at the min cursor, mark dirty - if (state.cursor <= self.#cachedMinCursor) { - self.#minCursorDirty = true; - } + const cursor = state.cursor; state.cursor++; - self.#tryTrimBuffer(); + if (cursor === self.#cachedMinCursor && + --self.#cachedMinCursorConsumers === 0) { + self.#tryTrimBuffer(); + } return PromiseResolve( { __proto__: null, done: false, value: chunk }); } if (self.#error) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); return PromiseReject(self.#error); } @@ -253,6 +253,7 @@ class BroadcastImpl { consumer.detached = true; } this.#consumers.clear(); + this.#cachedMinCursorConsumers = 0; } [SymbolDispose]() { @@ -274,9 +275,11 @@ class BroadcastImpl { this.#bufferStart++; for (const consumer of this.#consumers) { if (consumer.cursor < this.#bufferStart) { + this.#deleteConsumerFromMin(consumer); consumer.cursor = this.#bufferStart; } } + this.#recomputeMinCursor(); break; case 'drop-newest': return true; @@ -297,7 +300,12 @@ class BroadcastImpl { const bufferIndex = consumer.cursor - this.#bufferStart; if (bufferIndex < this.#buffer.length) { const chunk = this.#buffer.get(bufferIndex); + const cursor = consumer.cursor; consumer.cursor++; + if (cursor === this.#cachedMinCursor && + --this.#cachedMinCursorConsumers === 0) { + this.#tryTrimBuffer(); + } consumer.resolve({ __proto__: null, done: false, value: chunk }); } else { consumer.resolve({ __proto__: null, done: true, value: undefined }); @@ -323,6 +331,7 @@ class BroadcastImpl { consumer.detached = true; } this.#consumers.clear(); + this.#cachedMinCursorConsumers = 0; } [kGetDesiredSize]() { @@ -343,14 +352,14 @@ class BroadcastImpl { // Private methods #recomputeMinCursor() { - const { minCursor } = getMinCursor( + const { minCursor, minCursorConsumers } = getMinCursor( this.#consumers, this.#bufferStart + this.#buffer.length); this.#cachedMinCursor = minCursor; - this.#minCursorDirty = false; + this.#cachedMinCursorConsumers = minCursorConsumers; } #tryTrimBuffer() { - if (this.#minCursorDirty) { + if (this.#cachedMinCursorConsumers === 0) { this.#recomputeMinCursor(); } const trimCount = this.#cachedMinCursor - this.#bufferStart; @@ -377,10 +386,12 @@ class BroadcastImpl { const bufferIndex = consumer.cursor - this.#bufferStart; if (bufferIndex < this.#buffer.length) { const chunk = this.#buffer.get(bufferIndex); - if (consumer.cursor <= this.#cachedMinCursor) { - this.#minCursorDirty = true; - } + const cursor = consumer.cursor; consumer.cursor++; + if (cursor === this.#cachedMinCursor && + --this.#cachedMinCursorConsumers === 0) { + this.#tryTrimBuffer(); + } const resolve = consumer.resolve; consumer.resolve = null; consumer.reject = null; @@ -392,6 +403,21 @@ class BroadcastImpl { } } } + + #deleteConsumerFromMin(consumer) { + if (consumer.cursor === this.#cachedMinCursor) { + this.#cachedMinCursorConsumers--; + return this.#cachedMinCursorConsumers === 0; + } + return false; + } + + #deleteConsumer(consumer) { + if (this.#consumers.delete(consumer)) { + return this.#deleteConsumerFromMin(consumer); + } + return false; + } } // ============================================================================= diff --git a/test/parallel/test-stream-iter-broadcast-coverage.js b/test/parallel/test-stream-iter-broadcast-coverage.js index bc86f44867dcc5..7aa71062ab197b 100644 --- a/test/parallel/test-stream-iter-broadcast-coverage.js +++ b/test/parallel/test-stream-iter-broadcast-coverage.js @@ -96,6 +96,33 @@ async function testRingbufferGrow() { } } +// Multiple consumers at the minimum cursor should trim only after the last +// one advances or detaches. +async function testFanOutMinCursorTrimming() { + const { writer, broadcast: bc } = broadcast({ highWaterMark: 4 }); + const iter1 = bc.push()[Symbol.asyncIterator](); + const iter2 = bc.push()[Symbol.asyncIterator](); + + writer.writeSync(new Uint8Array([1])); + writer.writeSync(new Uint8Array([2])); + assert.strictEqual(bc.bufferSize, 2); + + assert.strictEqual((await iter1.next()).done, false); + assert.strictEqual(bc.bufferSize, 2); + + assert.strictEqual((await iter2.next()).done, false); + assert.strictEqual(bc.bufferSize, 1); + + await iter1.return(); + assert.strictEqual(bc.bufferSize, 1); + + assert.strictEqual((await iter2.next()).done, false); + assert.strictEqual(bc.bufferSize, 0); + + writer.endSync(); + assert.strictEqual((await iter2.next()).done, true); +} + // Broadcast drainableProtocol after close returns null async function testDrainableAfterClose() { const { drainableProtocol } = require('stream/iter'); @@ -111,5 +138,6 @@ Promise.all([ testBroadcastFromSyncIterable(), testBroadcastFromSyncIterableStrings(), testRingbufferGrow(), + testFanOutMinCursorTrimming(), testDrainableAfterClose(), ]).then(common.mustCall());