diff --git a/lib/internal/streams/iter/consumers.js b/lib/internal/streams/iter/consumers.js index 442fe95b8e1b85..bcfe5d5ab29749 100644 --- a/lib/internal/streams/iter/consumers.js +++ b/lib/internal/streams/iter/consumers.js @@ -8,6 +8,7 @@ // ondrain() - backpressure drain utility const { + ArrayBufferIsView, ArrayBufferPrototypeGetByteLength, ArrayBufferPrototypeSlice, ArrayPrototypeMap, @@ -51,9 +52,12 @@ const { const { drainableProtocol, + toAsyncStreamable, + toStreamable, } = require('internal/streams/iter/types'); const { + isAnyArrayBuffer, isSharedArrayBuffer, } = require('internal/util/types'); @@ -65,8 +69,12 @@ function isMergeOptions(value) { return ( value !== null && typeof value === 'object' && + !ArrayBufferIsView(value) && + typeof value[toStreamable] !== 'function' && + typeof value[toAsyncStreamable] !== 'function' && !isAsyncIterable(value) && - !isSyncIterable(value) + !isSyncIterable(value) && + !isAnyArrayBuffer(value) ); } diff --git a/test/parallel/test-stream-iter-consumers-merge.js b/test/parallel/test-stream-iter-consumers-merge.js index ad047c0ffd7ed4..c5b18be042d874 100644 --- a/test/parallel/test-stream-iter-consumers-merge.js +++ b/test/parallel/test-stream-iter-consumers-merge.js @@ -9,6 +9,8 @@ const { push, merge, text, + toAsyncStreamable, + toStreamable, } = require('stream/iter'); // ============================================================================= @@ -162,6 +164,27 @@ async function testMergeStringSources() { assert.ok(combined.includes('world')); } +// merge() accepts object-like sources that are normalized via from() +async function testMergeObjectLikeSources() { + const arrayBuffer = new TextEncoder().encode('abc').buffer; + const dataView = new DataView(new TextEncoder().encode('def').buffer); + const streamable = { + [toStreamable]() { + return 'ghi'; + }, + }; + const asyncStreamable = { + [toAsyncStreamable]() { + return Promise.resolve('jkl'); + }, + }; + + assert.strictEqual(await text(merge(arrayBuffer)), 'abc'); + assert.strictEqual(await text(merge(dataView)), 'def'); + assert.strictEqual(await text(merge(streamable)), 'ghi'); + assert.strictEqual(await text(merge(asyncStreamable)), 'jkl'); +} + Promise.all([ testMergeTwoSources(), testMergeSingleSource(), @@ -172,4 +195,5 @@ Promise.all([ testMergeConsumerBreak(), testMergeSignalMidIteration(), testMergeStringSources(), + testMergeObjectLikeSources(), ]).then(common.mustCall());