| 'use strict'; |
| |
| const common = require('../common'); |
| const { |
| Readable, Transform, |
| } = require('stream'); |
| const assert = require('assert'); |
| |
| { |
| // with async generator |
| const stream = Readable.from(['a', 'b', 'c', 'd']).compose(async function *(stream) { |
| let str = ''; |
| for await (const chunk of stream) { |
| str += chunk; |
| |
| if (str.length === 2) { |
| yield str; |
| str = ''; |
| } |
| } |
| }); |
| const result = ['ab', 'cd']; |
| (async () => { |
| for await (const item of stream) { |
| assert.strictEqual(item, result.shift()); |
| } |
| })().then(common.mustCall()); |
| } |
| |
| { |
| // With Transformer |
| const stream = Readable.from(['a', 'b', 'c', 'd']).compose(new Transform({ |
| objectMode: true, |
| transform: common.mustCall((chunk, encoding, callback) => { |
| callback(null, chunk); |
| }, 4) |
| })); |
| const result = ['a', 'b', 'c', 'd']; |
| (async () => { |
| for await (const item of stream) { |
| assert.strictEqual(item, result.shift()); |
| } |
| })().then(common.mustCall()); |
| } |
| |
| { |
| // Throwing an error during `compose` (before waiting for data) |
| const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield |
| |
| throw new Error('boom'); |
| }); |
| |
| assert.rejects(async () => { |
| for await (const item of stream) { |
| assert.fail('should not reach here, got ' + item); |
| } |
| }, /boom/).then(common.mustCall()); |
| } |
| |
| { |
| // Throwing an error during `compose` (when waiting for data) |
| const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { |
| for await (const chunk of stream) { |
| if (chunk === 3) { |
| throw new Error('boom'); |
| } |
| yield chunk; |
| } |
| }); |
| |
| assert.rejects( |
| stream.toArray(), |
| /boom/, |
| ).then(common.mustCall()); |
| } |
| |
| { |
| // Throwing an error during `compose` (after finishing all readable data) |
| const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield |
| |
| // eslint-disable-next-line no-unused-vars,no-empty |
| for await (const chunk of stream) { |
| } |
| |
| throw new Error('boom'); |
| }); |
| assert.rejects( |
| stream.toArray(), |
| /boom/, |
| ).then(common.mustCall()); |
| } |
| |
| { |
| // AbortSignal |
| const ac = new AbortController(); |
| const stream = Readable.from([1, 2, 3, 4, 5]) |
| .compose(async function *(source) { |
| // Should not reach here |
| for await (const chunk of source) { |
| yield chunk; |
| } |
| }, { signal: ac.signal }); |
| |
| ac.abort(); |
| |
| assert.rejects(async () => { |
| for await (const item of stream) { |
| assert.fail('should not reach here, got ' + item); |
| } |
| }, { |
| name: 'AbortError', |
| }).then(common.mustCall()); |
| } |
| |
| { |
| assert.throws( |
| () => Readable.from(['a']).compose(Readable.from(['b'])), |
| { code: 'ERR_INVALID_ARG_VALUE' } |
| ); |
| } |
| |
| { |
| assert.throws( |
| () => Readable.from(['a']).compose(), |
| { code: 'ERR_INVALID_ARG_TYPE' } |
| ); |
| } |