| 'use strict'; |
| |
| const common = require('../common'); |
| const { |
| Stream, |
| Readable, |
| Transform, |
| PassThrough, |
| pipeline |
| } = require('stream'); |
| const assert = require('assert'); |
| const http = require('http'); |
| const fs = require('fs'); |
| |
| async function tests() { |
| { |
| // v1 stream |
| |
| const stream = new Stream(); |
| stream.destroy = common.mustCall(); |
| process.nextTick(() => { |
| stream.emit('data', 'hello'); |
| stream.emit('data', 'world'); |
| stream.emit('end'); |
| }); |
| |
| let res = ''; |
| stream[Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator]; |
| for await (const d of stream) { |
| res += d; |
| } |
| assert.strictEqual(res, 'helloworld'); |
| } |
| |
| { |
| // v1 stream error |
| |
| const stream = new Stream(); |
| stream.close = common.mustCall(); |
| process.nextTick(() => { |
| stream.emit('data', 0); |
| stream.emit('data', 1); |
| stream.emit('error', new Error('asd')); |
| }); |
| |
| const iter = Readable.prototype[Symbol.asyncIterator].call(stream); |
| await iter.next() |
| .then(common.mustNotCall()) |
| .catch(common.mustCall((err) => { |
| assert.strictEqual(err.message, 'asd'); |
| })); |
| } |
| |
| { |
| // Non standard stream cleanup |
| |
| const readable = new Readable({ autoDestroy: false, read() {} }); |
| readable.push('asd'); |
| readable.push('asd'); |
| readable.destroy = null; |
| readable.close = common.mustCall(() => { |
| readable.emit('close'); |
| }); |
| |
| await (async () => { |
| for await (const d of readable) { |
| return; |
| } |
| })(); |
| } |
| |
| { |
| const readable = new Readable({ objectMode: true, read() {} }); |
| readable.push(0); |
| readable.push(1); |
| readable.push(null); |
| |
| const iter = readable[Symbol.asyncIterator](); |
| assert.strictEqual((await iter.next()).value, 0); |
| for await (const d of iter) { |
| assert.strictEqual(d, 1); |
| } |
| } |
| |
| { |
| console.log('read without for..await'); |
| const max = 5; |
| const readable = new Readable({ |
| objectMode: true, |
| read() {} |
| }); |
| |
| const iter = readable[Symbol.asyncIterator](); |
| assert.strictEqual(iter.stream, readable); |
| const values = []; |
| for (let i = 0; i < max; i++) { |
| values.push(iter.next()); |
| } |
| Promise.all(values).then(common.mustCall((values) => { |
| values.forEach(common.mustCall( |
| (item, i) => assert.strictEqual(item.value, 'hello-' + i), 5)); |
| })); |
| |
| readable.push('hello-0'); |
| readable.push('hello-1'); |
| readable.push('hello-2'); |
| readable.push('hello-3'); |
| readable.push('hello-4'); |
| readable.push(null); |
| |
| const last = await iter.next(); |
| assert.strictEqual(last.done, true); |
| } |
| |
| { |
| console.log('read without for..await deferred'); |
| const readable = new Readable({ |
| objectMode: true, |
| read() {} |
| }); |
| |
| const iter = readable[Symbol.asyncIterator](); |
| assert.strictEqual(iter.stream, readable); |
| let values = []; |
| for (let i = 0; i < 3; i++) { |
| values.push(iter.next()); |
| } |
| |
| readable.push('hello-0'); |
| readable.push('hello-1'); |
| readable.push('hello-2'); |
| |
| let k = 0; |
| const results1 = await Promise.all(values); |
| results1.forEach(common.mustCall( |
| (item) => assert.strictEqual(item.value, 'hello-' + k++), 3)); |
| |
| values = []; |
| for (let i = 0; i < 2; i++) { |
| values.push(iter.next()); |
| } |
| |
| readable.push('hello-3'); |
| readable.push('hello-4'); |
| readable.push(null); |
| |
| const results2 = await Promise.all(values); |
| results2.forEach(common.mustCall( |
| (item) => assert.strictEqual(item.value, 'hello-' + k++), 2)); |
| |
| const last = await iter.next(); |
| assert.strictEqual(last.done, true); |
| } |
| |
| { |
| console.log('read without for..await with errors'); |
| const max = 3; |
| const readable = new Readable({ |
| objectMode: true, |
| read() {} |
| }); |
| |
| const iter = readable[Symbol.asyncIterator](); |
| assert.strictEqual(iter.stream, readable); |
| const values = []; |
| const errors = []; |
| let i; |
| for (i = 0; i < max; i++) { |
| values.push(iter.next()); |
| } |
| for (i = 0; i < 2; i++) { |
| errors.push(iter.next()); |
| } |
| |
| readable.push('hello-0'); |
| readable.push('hello-1'); |
| readable.push('hello-2'); |
| |
| const resolved = await Promise.all(values); |
| |
| resolved.forEach(common.mustCall( |
| (item, i) => assert.strictEqual(item.value, 'hello-' + i), max)); |
| |
| errors.slice(0, 1).forEach((promise) => { |
| promise.catch(common.mustCall((err) => { |
| assert.strictEqual(err.message, 'kaboom'); |
| })); |
| }); |
| |
| errors.slice(1).forEach((promise) => { |
| promise.then(common.mustCall(({ done, value }) => { |
| assert.strictEqual(done, true); |
| assert.strictEqual(value, undefined); |
| })); |
| }); |
| |
| readable.destroy(new Error('kaboom')); |
| } |
| |
| { |
| console.log('call next() after error'); |
| const readable = new Readable({ |
| read() {} |
| }); |
| const iterator = readable[Symbol.asyncIterator](); |
| |
| const err = new Error('kaboom'); |
| readable.destroy(err); |
| await assert.rejects(iterator.next.bind(iterator), err); |
| } |
| |
| { |
| console.log('read object mode'); |
| const max = 42; |
| let readed = 0; |
| let received = 0; |
| const readable = new Readable({ |
| objectMode: true, |
| read() { |
| this.push('hello'); |
| if (++readed === max) { |
| this.push(null); |
| } |
| } |
| }); |
| |
| for await (const k of readable) { |
| received++; |
| assert.strictEqual(k, 'hello'); |
| } |
| |
| assert.strictEqual(readed, received); |
| } |
| |
| { |
| console.log('destroy sync'); |
| const readable = new Readable({ |
| objectMode: true, |
| read() { |
| this.destroy(new Error('kaboom from read')); |
| } |
| }); |
| |
| let err; |
| try { |
| // eslint-disable-next-line no-unused-vars, no-empty |
| for await (const k of readable) { } |
| } catch (e) { |
| err = e; |
| } |
| assert.strictEqual(err.message, 'kaboom from read'); |
| } |
| |
| { |
| console.log('destroy async'); |
| const readable = new Readable({ |
| objectMode: true, |
| read() { |
| if (!this.pushed) { |
| this.push('hello'); |
| this.pushed = true; |
| |
| setImmediate(() => { |
| this.destroy(new Error('kaboom')); |
| }); |
| } |
| } |
| }); |
| |
| let received = 0; |
| |
| let err = null; |
| try { |
| // eslint-disable-next-line no-unused-vars |
| for await (const k of readable) { |
| received++; |
| } |
| } catch (e) { |
| err = e; |
| } |
| |
| assert.strictEqual(err.message, 'kaboom'); |
| assert.strictEqual(received, 1); |
| } |
| |
| { |
| console.log('destroyed by throw'); |
| const readable = new Readable({ |
| objectMode: true, |
| read() { |
| this.push('hello'); |
| } |
| }); |
| |
| let err = null; |
| try { |
| for await (const k of readable) { |
| assert.strictEqual(k, 'hello'); |
| throw new Error('kaboom'); |
| } |
| } catch (e) { |
| err = e; |
| } |
| |
| assert.strictEqual(err.message, 'kaboom'); |
| assert.strictEqual(readable.destroyed, true); |
| } |
| |
| { |
| console.log('destroyed sync after push'); |
| const readable = new Readable({ |
| objectMode: true, |
| read() { |
| this.push('hello'); |
| this.destroy(new Error('kaboom')); |
| } |
| }); |
| |
| let received = 0; |
| |
| let err = null; |
| try { |
| for await (const k of readable) { |
| assert.strictEqual(k, 'hello'); |
| received++; |
| } |
| } catch (e) { |
| err = e; |
| } |
| |
| assert.strictEqual(err.message, 'kaboom'); |
| assert.strictEqual(received, 1); |
| } |
| |
| { |
| console.log('destroyed will not deadlock'); |
| const readable = new Readable(); |
| readable.destroy(); |
| process.nextTick(async () => { |
| readable.on('close', common.mustNotCall()); |
| let received = 0; |
| let err = null; |
| try { |
| for await (const k of readable) { |
| // Just make linting pass. This should never run. |
| assert.strictEqual(k, 'hello'); |
| received++; |
| } |
| } catch (_err) { |
| err = _err; |
| } |
| assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); |
| assert.strictEqual(received, 0); |
| }); |
| } |
| |
| { |
| console.log('push async'); |
| const max = 42; |
| let readed = 0; |
| let received = 0; |
| const readable = new Readable({ |
| objectMode: true, |
| read() { |
| setImmediate(() => { |
| this.push('hello'); |
| if (++readed === max) { |
| this.push(null); |
| } |
| }); |
| } |
| }); |
| |
| for await (const k of readable) { |
| received++; |
| assert.strictEqual(k, 'hello'); |
| } |
| |
| assert.strictEqual(readed, received); |
| } |
| |
| { |
| console.log('push binary async'); |
| const max = 42; |
| let readed = 0; |
| const readable = new Readable({ |
| read() { |
| setImmediate(() => { |
| this.push('hello'); |
| if (++readed === max) { |
| this.push(null); |
| } |
| }); |
| } |
| }); |
| |
| let expected = ''; |
| readable.setEncoding('utf8'); |
| readable.pause(); |
| readable.on('data', (chunk) => { |
| expected += chunk; |
| }); |
| |
| let data = ''; |
| for await (const k of readable) { |
| data += k; |
| } |
| |
| assert.strictEqual(data, expected); |
| } |
| |
| { |
| console.log('.next() on destroyed stream'); |
| const readable = new Readable({ |
| read() { |
| // no-op |
| } |
| }); |
| |
| readable.destroy(); |
| |
| const it = await readable[Symbol.asyncIterator](); |
| const next = it.next(); |
| next |
| .then(common.mustNotCall()) |
| .catch(common.mustCall((err) => { |
| assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); |
| })); |
| } |
| |
| { |
| console.log('.next() on pipelined stream'); |
| const readable = new Readable({ |
| read() { |
| // no-op |
| } |
| }); |
| |
| const passthrough = new PassThrough(); |
| const err = new Error('kaboom'); |
| pipeline(readable, passthrough, common.mustCall((e) => { |
| assert.strictEqual(e, err); |
| })); |
| readable.destroy(err); |
| await assert.rejects( |
| readable[Symbol.asyncIterator]().next(), |
| (e) => { |
| assert.strictEqual(e, err); |
| return true; |
| } |
| ); |
| } |
| |
| { |
| console.log('iterating on an ended stream completes'); |
| const r = new Readable({ |
| objectMode: true, |
| read() { |
| this.push('asdf'); |
| this.push('hehe'); |
| this.push(null); |
| } |
| }); |
| // eslint-disable-next-line no-unused-vars, no-empty |
| for await (const a of r) { } |
| // eslint-disable-next-line no-unused-vars, no-empty |
| for await (const b of r) { } |
| } |
| |
| { |
| console.log('destroy mid-stream errors'); |
| const r = new Readable({ |
| objectMode: true, |
| read() { |
| this.push('asdf'); |
| this.push('hehe'); |
| } |
| }); |
| |
| let err = null; |
| try { |
| // eslint-disable-next-line no-unused-vars |
| for await (const a of r) { |
| r.destroy(null); |
| } |
| } catch (_err) { |
| err = _err; |
| } |
| assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); |
| } |
| |
| { |
| console.log('readable side of a transform stream pushes null'); |
| const transform = new Transform({ |
| objectMode: true, |
| transform: (chunk, enc, cb) => { cb(null, chunk); } |
| }); |
| transform.push(0); |
| transform.push(1); |
| process.nextTick(() => { |
| transform.push(null); |
| }); |
| |
| const mustReach = [ common.mustCall(), common.mustCall() ]; |
| |
| const iter = transform[Symbol.asyncIterator](); |
| assert.strictEqual((await iter.next()).value, 0); |
| |
| for await (const d of iter) { |
| assert.strictEqual(d, 1); |
| mustReach[0](); |
| } |
| mustReach[1](); |
| } |
| |
| { |
| console.log('all next promises must be resolved on end'); |
| const r = new Readable({ |
| objectMode: true, |
| read() { |
| } |
| }); |
| |
| const b = r[Symbol.asyncIterator](); |
| const c = b.next(); |
| const d = b.next(); |
| r.push(null); |
| assert.deepStrictEqual(await c, { done: true, value: undefined }); |
| assert.deepStrictEqual(await d, { done: true, value: undefined }); |
| } |
| |
| { |
| console.log('all next promises must be rejected on destroy'); |
| const r = new Readable({ |
| objectMode: true, |
| read() { |
| } |
| }); |
| |
| const b = r[Symbol.asyncIterator](); |
| const c = b.next(); |
| const d = b.next(); |
| r.destroy(); |
| c |
| .then(common.mustNotCall()) |
| .catch(common.mustCall((err) => { |
| assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); |
| })); |
| assert.deepStrictEqual(await d, { done: true, value: undefined }); |
| } |
| |
| { |
| console.log('all next promises must be resolved on destroy with error'); |
| const r = new Readable({ |
| objectMode: true, |
| read() { |
| } |
| }); |
| |
| const b = r[Symbol.asyncIterator](); |
| const c = b.next(); |
| const d = b.next(); |
| const err = new Error('kaboom'); |
| r.destroy(err); |
| |
| await Promise.all([(async () => { |
| let e; |
| try { |
| await c; |
| } catch (_e) { |
| e = _e; |
| } |
| assert.strictEqual(e, err); |
| })(), (async () => { |
| let e; |
| let x; |
| try { |
| x = await d; |
| } catch (_e) { |
| e = _e; |
| } |
| assert.strictEqual(e, undefined); |
| assert.strictEqual(x.done, true); |
| assert.strictEqual(x.value, undefined); |
| })()]); |
| } |
| |
| { |
| const _err = new Error('asd'); |
| const r = new Readable({ |
| read() { |
| }, |
| destroy(err, callback) { |
| setTimeout(() => callback(_err), 1); |
| } |
| }); |
| |
| r.destroy(); |
| const it = r[Symbol.asyncIterator](); |
| it.next().catch(common.mustCall((err) => { |
| assert.strictEqual(err, _err); |
| })); |
| } |
| |
| { |
| // Don't destroy if no auto destroy. |
| // https://github.com/nodejs/node/issues/35116 |
| |
| const r = new Readable({ |
| autoDestroy: false, |
| read() { |
| this.push('asd'); |
| this.push(null); |
| } |
| }); |
| |
| for await (const chunk of r) { } // eslint-disable-line no-unused-vars, no-empty |
| assert.strictEqual(r.destroyed, false); |
| } |
| |
| { |
| // Destroy if no auto destroy and premature break. |
| // https://github.com/nodejs/node/pull/35122/files#r485678318 |
| |
| const r = new Readable({ |
| autoDestroy: false, |
| read() { |
| this.push('asd'); |
| } |
| }); |
| |
| for await (const chunk of r) { // eslint-disable-line no-unused-vars |
| break; |
| } |
| assert.strictEqual(r.destroyed, true); |
| } |
| |
| { |
| // Don't destroy before 'end'. |
| |
| const r = new Readable({ |
| read() { |
| this.push('asd'); |
| this.push(null); |
| } |
| }).on('end', () => { |
| assert.strictEqual(r.destroyed, false); |
| }); |
| |
| for await (const chunk of r) { } // eslint-disable-line no-unused-vars, no-empty |
| assert.strictEqual(r.destroyed, true); |
| } |
| } |
| |
| { |
| // AsyncIterator return should end even when destroy |
| // does not implement the callback API. |
| |
| const r = new Readable({ |
| objectMode: true, |
| read() { |
| } |
| }); |
| |
| const originalDestroy = r.destroy; |
| r.destroy = (err) => { |
| originalDestroy.call(r, err); |
| }; |
| const it = r[Symbol.asyncIterator](); |
| const p = it.return(); |
| r.push(null); |
| p.then(common.mustCall()); |
| } |
| |
| |
| { |
| // AsyncIterator return should not error with |
| // premature close. |
| |
| const r = new Readable({ |
| objectMode: true, |
| read() { |
| } |
| }); |
| |
| const originalDestroy = r.destroy; |
| r.destroy = (err) => { |
| originalDestroy.call(r, err); |
| }; |
| const it = r[Symbol.asyncIterator](); |
| const p = it.return(); |
| r.emit('close'); |
| p.then(common.mustCall()).catch(common.mustNotCall()); |
| } |
| |
| { |
| // AsyncIterator should not finish correctly if destroyed. |
| |
| const r = new Readable({ |
| objectMode: true, |
| read() { |
| } |
| }); |
| |
| r.destroy(); |
| r.on('close', () => { |
| const it = r[Symbol.asyncIterator](); |
| const next = it.next(); |
| next |
| .then(common.mustNotCall()) |
| .catch(common.mustCall((err) => { |
| assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); |
| })); |
| }); |
| } |
| |
| { |
| // AsyncIterator should throw if prematurely closed |
| // before end has been emitted. |
| (async function() { |
| const readable = fs.createReadStream(__filename); |
| |
| try { |
| // eslint-disable-next-line no-unused-vars |
| for await (const chunk of readable) { |
| readable.close(); |
| } |
| |
| assert.fail('should have thrown'); |
| } catch (err) { |
| assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); |
| } |
| |
| assert.ok(readable.destroyed); |
| })().then(common.mustCall()); |
| } |
| |
| // AsyncIterator non-destroying iterator |
| { |
| function createReadable() { |
| return Readable.from((async function* () { |
| await Promise.resolve(); |
| yield 5; |
| await Promise.resolve(); |
| yield 7; |
| await Promise.resolve(); |
| })()); |
| } |
| |
| // Check explicit destroying on return |
| (async function() { |
| const readable = createReadable(); |
| for await (const chunk of readable.iterator({ destroyOnReturn: true })) { |
| assert.strictEqual(chunk, 5); |
| break; |
| } |
| |
| assert.ok(readable.destroyed); |
| })().then(common.mustCall()); |
| |
| // Check explicit non-destroy with return true |
| (async function() { |
| const readable = createReadable(); |
| const opts = { destroyOnReturn: false }; |
| for await (const chunk of readable.iterator(opts)) { |
| assert.strictEqual(chunk, 5); |
| break; |
| } |
| |
| assert.ok(!readable.destroyed); |
| |
| for await (const chunk of readable.iterator(opts)) { |
| assert.strictEqual(chunk, 7); |
| } |
| |
| assert.ok(readable.destroyed); |
| })().then(common.mustCall()); |
| |
| // Check non-object options. |
| { |
| const readable = createReadable(); |
| assert.throws( |
| () => readable.iterator(42), |
| { |
| code: 'ERR_INVALID_ARG_TYPE', |
| name: 'TypeError', |
| message: 'The "options" argument must be of type object. Received ' + |
| 'type number (42)', |
| } |
| ); |
| } |
| |
| // Check for dangling listeners |
| (async function() { |
| const readable = createReadable(); |
| const opts = { destroyOnReturn: false }; |
| while (readable.readable) { |
| // eslint-disable-next-line no-unused-vars |
| for await (const chunk of readable.iterator(opts)) { |
| break; |
| } |
| } |
| |
| assert.deepStrictEqual(readable.eventNames(), []); |
| })().then(common.mustCall()); |
| } |
| |
| { |
| let _req; |
| const server = http.createServer((request, response) => { |
| response.statusCode = 404; |
| response.write('never ends'); |
| }); |
| |
| server.listen(() => { |
| _req = http.request(`http://localhost:${server.address().port}`) |
| .on('response', common.mustCall(async (res) => { |
| setTimeout(() => { |
| _req.destroy(new Error('something happened')); |
| }, 100); |
| |
| res.on('error', common.mustCall()); |
| |
| let _err; |
| try { |
| // eslint-disable-next-line no-unused-vars, no-empty |
| for await (const chunk of res) { } |
| } catch (err) { |
| _err = err; |
| } |
| |
| assert.strictEqual(_err.code, 'ECONNRESET'); |
| server.close(); |
| })) |
| .on('error', common.mustCall()) |
| .end(); |
| }); |
| } |
| |
| { |
| async function getParsedBody(request) { |
| let body = ''; |
| |
| for await (const data of request) { |
| body += data; |
| } |
| |
| try { |
| return JSON.parse(body); |
| } catch { |
| return {}; |
| } |
| } |
| |
| const str = JSON.stringify({ asd: true }); |
| const server = http.createServer(async (request, response) => { |
| const body = await getParsedBody(request); |
| response.statusCode = 200; |
| assert.strictEqual(JSON.stringify(body), str); |
| response.end(JSON.stringify(body)); |
| }).listen(() => { |
| http |
| .request({ |
| method: 'POST', |
| hostname: 'localhost', |
| port: server.address().port, |
| }) |
| .end(str) |
| .on('response', async (res) => { |
| let body = ''; |
| for await (const chunk of res) { |
| body += chunk; |
| } |
| assert.strictEqual(body, str); |
| server.close(); |
| }); |
| }); |
| } |
| |
| // To avoid missing some tests if a promise does not resolve |
| tests().then(common.mustCall()); |