diff --git a/lib/internal/streams/iter/classic.js b/lib/internal/streams/iter/classic.js index fd5f811ea52d97..73c6bc48b954c8 100644 --- a/lib/internal/streams/iter/classic.js +++ b/lib/internal/streams/iter/classic.js @@ -57,6 +57,7 @@ const { const { toAsyncStreamable: kToAsyncStreamable, kValidatedSource, + kSyncWriteAccepted, drainableProtocol, } = require('internal/streams/iter/types'); @@ -764,13 +765,41 @@ function toWritable(writer) { const hasEndSync = hasEnd && typeof writer.endSync === 'function'; const hasFail = typeof writer.fail === 'function'; + const hasSyncWriteAccepted = + typeof writer[kSyncWriteAccepted] === 'function'; - // Try-sync-first pattern: attempt the synchronous method and - // fall back to the async method if it returns false (indicating - // the sync path was not accepted) or throws. When the sync path - // succeeds, the callback is deferred via queueMicrotask to - // preserve the async resolution contract that Writable internals - // expect from _write/_writev/_final callbacks. + function syncWriteAccepted() { + return hasSyncWriteAccepted && writer[kSyncWriteAccepted](); + } + + function finishAfterSyncBackpressure(cb) { + let ondrain; + try { + if (typeof writer[drainableProtocol] === 'function') { + ondrain = writer[drainableProtocol](); + } + } catch (err) { + cb(err); + return; + } + if (ondrain !== null && ondrain !== undefined) { + PromisePrototypeThen(ondrain, (drained) => { + if (drained === false) { + cb(new ERR_INVALID_STATE.TypeError('Stream closed by consumer')); + return; + } + cb(); + }, cb); + return; + } + queueMicrotask(cb); + } + + // Try-sync-first pattern: attempt the synchronous method and fall back to the + // async method if it returns false without accepting the data, or if it + // throws. When the sync path succeeds, the callback is deferred via + // queueMicrotask to preserve the async resolution contract that Writable + // internals expect from _write/_writev/_final callbacks. function _write(chunk, encoding, cb) { const bytes = typeof chunk === 'string' ? @@ -781,6 +810,11 @@ function toWritable(writer) { queueMicrotask(cb); return; } + if (syncWriteAccepted()) { + // The chunk was accepted; false only signaled backpressure. + finishAfterSyncBackpressure(cb); + return; + } } catch { // Sync path threw -- fall through to async. } @@ -805,6 +839,11 @@ function toWritable(writer) { queueMicrotask(cb); return; } + if (syncWriteAccepted()) { + // The chunks were accepted; false only signaled backpressure. + finishAfterSyncBackpressure(cb); + return; + } } catch { // Sync path threw -- fall through to async. } diff --git a/lib/internal/streams/iter/push.js b/lib/internal/streams/iter/push.js index c5b12663f83c24..1c367ff02bae71 100644 --- a/lib/internal/streams/iter/push.js +++ b/lib/internal/streams/iter/push.js @@ -32,6 +32,7 @@ const { const { drainableProtocol, + kSyncWriteAccepted, kSyncWriteAcceptedOnFalse, } = require('internal/streams/iter/types'); @@ -545,11 +546,16 @@ class PushQueue { class PushWriter { #queue; + #syncWriteAccepted = false; constructor(queue) { this.#queue = queue; } + [kSyncWriteAccepted]() { + return this.#syncWriteAccepted; + } + [drainableProtocol]() { const desired = this.desiredSize; if (desired === null) return null; @@ -589,6 +595,7 @@ class PushWriter { } writeSync(chunk) { + this.#syncWriteAccepted = false; const bytes = toUint8Array(chunk); const result = this.#queue.writeSync([bytes]); if (!result && this.#queue.backpressurePolicy === 'block' && @@ -596,12 +603,15 @@ class PushWriter { // Block policy: force-enqueue and return false as backpressure signal. // Data IS accepted; false tells caller to slow down. this.#queue.forceEnqueue([bytes]); + this.#syncWriteAccepted = true; return false; } + this.#syncWriteAccepted = result; return result; } writevSync(chunks) { + this.#syncWriteAccepted = false; if (!ArrayIsArray(chunks)) { throw new ERR_INVALID_ARG_TYPE('chunks', 'Array', chunks); } @@ -610,8 +620,10 @@ class PushWriter { if (!result && this.#queue.backpressurePolicy === 'block' && this.#queue.desiredSize === 0) { this.#queue.forceEnqueue(bytes); + this.#syncWriteAccepted = true; return false; } + this.#syncWriteAccepted = result; return result; } diff --git a/lib/internal/streams/iter/types.js b/lib/internal/streams/iter/types.js index 71112b1515c081..e72972d91ad243 100644 --- a/lib/internal/streams/iter/types.js +++ b/lib/internal/streams/iter/types.js @@ -64,11 +64,24 @@ const kValidatedTransform = Symbol('kValidatedTransform'); */ const kValidatedSource = Symbol('kValidatedSource'); +/** + * Internal sentinel for writers whose sync write methods can return false + * after accepting data as a backpressure signal. + */ +const kSyncWriteAccepted = Symbol('kSyncWriteAccepted'); + +/** + * Internal sentinel for writers whose sync write methods may return false + * after accepting data when backpressure is applied. Such writers must expose + * desiredSize so callers can distinguish accepted backpressure from a sync + * write that was not performed. + */ const kSyncWriteAcceptedOnFalse = Symbol('kSyncWriteAcceptedOnFalse'); module.exports = { broadcastProtocol, drainableProtocol, + kSyncWriteAccepted, kSyncWriteAcceptedOnFalse, kValidatedSource, kValidatedTransform, diff --git a/test/parallel/test-stream-iter-writable-from.js b/test/parallel/test-stream-iter-writable-from.js index fd922c5cf99537..51dc465b45d148 100644 --- a/test/parallel/test-stream-iter-writable-from.js +++ b/test/parallel/test-stream-iter-writable-from.js @@ -335,6 +335,53 @@ async function testRoundTrip() { assert.strictEqual(result, data); } +// ============================================================================= +// PushWriter writeSync false accepted as backpressure is not retried +// ============================================================================= + +async function testPushWriterBlockBackpressureNoDuplicate() { + const { writer, readable } = push({ highWaterMark: 1, backpressure: 'block' }); + const writable = toWritable(writer); + + await new Promise((resolve, reject) => { + writable.write('a', (err) => { + if (err) reject(err); + else resolve(); + }); + }); + + writable.write('b'); + writable.end(); + + const result = await text(readable); + assert.strictEqual(result, 'ab'); +} + +// ============================================================================= +// PushWriter writevSync false accepted as backpressure is not retried +// ============================================================================= + +async function testPushWriterBlockBackpressureWritevNoDuplicate() { + const { writer, readable } = push({ highWaterMark: 1, backpressure: 'block' }); + const writable = toWritable(writer); + + await new Promise((resolve, reject) => { + writable.write('a', (err) => { + if (err) reject(err); + else resolve(); + }); + }); + + writable.cork(); + writable.write('b'); + writable.write('c'); + writable.uncork(); + writable.end(); + + const result = await text(readable); + assert.strictEqual(result, 'abc'); +} + // ============================================================================= // Multiple sequential writes // ============================================================================= @@ -590,6 +637,8 @@ Promise.all([ testWriteThrowsSyncPropagation(), testEndThrowsSyncPropagation(), testRoundTrip(), + testPushWriterBlockBackpressureNoDuplicate(), + testPushWriterBlockBackpressureWritevNoDuplicate(), testSequentialWrites(), testSyncCallbackDeferred(), testMinimalWriter(),