| 'use strict'; |
| |
| const common = require('../common'); |
| const assert = require('assert'); |
| const { ReadableStream, WritableStream } = require('stream/web'); |
| const { finished } = require('stream'); |
| const { finished: finishedPromise } = require('stream/promises'); |
| |
| { |
| const rs = new ReadableStream({ |
| start(controller) { |
| controller.enqueue('asd'); |
| controller.close(); |
| }, |
| }); |
| finished(rs, common.mustSucceed()); |
| async function test() { |
| const values = []; |
| for await (const chunk of rs) { |
| values.push(chunk); |
| } |
| assert.deepStrictEqual(values, ['asd']); |
| } |
| test(); |
| } |
| |
| { |
| const rs = new ReadableStream({ |
| start(controller) { |
| controller.error(new Error('asd')); |
| } |
| }); |
| |
| finished(rs, common.mustCall((err) => { |
| assert.strictEqual(err?.message, 'asd'); |
| })); |
| } |
| |
| { |
| const rs = new ReadableStream({ |
| async start(controller) { |
| throw new Error('asd'); |
| } |
| }); |
| |
| finished(rs, common.mustCall((err) => { |
| assert.strictEqual(err?.message, 'asd'); |
| })); |
| } |
| |
| { |
| const rs = new ReadableStream({ |
| start(controller) { |
| controller.enqueue('asd'); |
| controller.close(); |
| } |
| }); |
| |
| async function test() { |
| const values = []; |
| for await (const chunk of rs) { |
| values.push(chunk); |
| } |
| assert.deepStrictEqual(values, ['asd']); |
| } |
| |
| finishedPromise(rs).then(common.mustSucceed()); |
| |
| test(); |
| } |
| |
| { |
| const rs = new ReadableStream({ |
| start(controller) { |
| controller.error(new Error('asd')); |
| } |
| }); |
| |
| finishedPromise(rs).then(common.mustNotCall()).catch(common.mustCall((err) => { |
| assert.strictEqual(err?.message, 'asd'); |
| })); |
| } |
| |
| { |
| const rs = new ReadableStream({ |
| async start(controller) { |
| throw new Error('asd'); |
| } |
| }); |
| |
| finishedPromise(rs).then(common.mustNotCall()).catch(common.mustCall((err) => { |
| assert.strictEqual(err?.message, 'asd'); |
| })); |
| } |
| |
| { |
| const rs = new ReadableStream({ |
| start(controller) { |
| controller.enqueue('asd'); |
| controller.close(); |
| } |
| }); |
| |
| const { 0: s1, 1: s2 } = rs.tee(); |
| |
| finished(s1, common.mustSucceed()); |
| finished(s2, common.mustSucceed()); |
| |
| async function test(stream) { |
| const values = []; |
| for await (const chunk of stream) { |
| values.push(chunk); |
| } |
| assert.deepStrictEqual(values, ['asd']); |
| } |
| |
| Promise.all([ |
| test(s1), |
| test(s2), |
| ]).then(common.mustCall()); |
| } |
| |
| { |
| const rs = new ReadableStream({ |
| start(controller) { |
| controller.error(new Error('asd')); |
| } |
| }); |
| |
| const { 0: s1, 1: s2 } = rs.tee(); |
| |
| finished(s1, common.mustCall((err) => { |
| assert.strictEqual(err?.message, 'asd'); |
| })); |
| |
| finished(s2, common.mustCall((err) => { |
| assert.strictEqual(err?.message, 'asd'); |
| })); |
| } |
| |
| { |
| const rs = new ReadableStream({ |
| start(controller) { |
| controller.enqueue('asd'); |
| controller.close(); |
| } |
| }); |
| |
| finished(rs, common.mustSucceed()); |
| |
| rs.cancel(); |
| } |
| |
| { |
| let str = ''; |
| const ws = new WritableStream({ |
| write(chunk) { |
| str += chunk; |
| } |
| }); |
| |
| finished(ws, common.mustSucceed(() => { |
| assert.strictEqual(str, 'asd'); |
| })); |
| |
| const writer = ws.getWriter(); |
| writer.write('asd'); |
| writer.close(); |
| } |
| |
| { |
| const ws = new WritableStream({ |
| async write(chunk) { |
| throw new Error('asd'); |
| } |
| }); |
| |
| finished(ws, common.mustCall((err) => { |
| assert.strictEqual(err?.message, 'asd'); |
| })); |
| |
| const writer = ws.getWriter(); |
| writer.write('asd').catch((err) => { |
| assert.strictEqual(err?.message, 'asd'); |
| }); |
| } |
| |
| { |
| let str = ''; |
| const ws = new WritableStream({ |
| write(chunk) { |
| str += chunk; |
| } |
| }); |
| |
| finishedPromise(ws).then(common.mustSucceed(() => { |
| assert.strictEqual(str, 'asd'); |
| })); |
| |
| const writer = ws.getWriter(); |
| writer.write('asd'); |
| writer.close(); |
| } |
| |
| { |
| const ws = new WritableStream({ |
| write(chunk) { } |
| }); |
| finished(ws, common.mustCall((err) => { |
| assert.strictEqual(err?.message, 'asd'); |
| })); |
| |
| const writer = ws.getWriter(); |
| writer.abort(new Error('asd')); |
| } |
| |
| { |
| const ws = new WritableStream({ |
| async write(chunk) { |
| throw new Error('asd'); |
| } |
| }); |
| |
| finishedPromise(ws).then(common.mustNotCall()).catch(common.mustCall((err) => { |
| assert.strictEqual(err?.message, 'asd'); |
| })); |
| |
| const writer = ws.getWriter(); |
| writer.write('asd').catch((err) => { |
| assert.strictEqual(err?.message, 'asd'); |
| }); |
| } |
| |
| { |
| // Check pre-cancelled |
| const signal = new EventTarget(); |
| signal.aborted = true; |
| |
| const rs = new ReadableStream({ |
| start() {} |
| }); |
| finished(rs, { signal }, common.mustCall((err) => { |
| assert.strictEqual(err.name, 'AbortError'); |
| })); |
| } |
| |
| { |
| // Check cancelled before the stream ends sync. |
| const ac = new AbortController(); |
| const { signal } = ac; |
| |
| const rs = new ReadableStream({ |
| start() {} |
| }); |
| finished(rs, { signal }, common.mustCall((err) => { |
| assert.strictEqual(err.name, 'AbortError'); |
| })); |
| |
| ac.abort(); |
| } |
| |
| { |
| // Check cancelled before the stream ends async. |
| const ac = new AbortController(); |
| const { signal } = ac; |
| |
| const rs = new ReadableStream({ |
| start() {} |
| }); |
| setTimeout(() => ac.abort(), 1); |
| finished(rs, { signal }, common.mustCall((err) => { |
| assert.strictEqual(err.name, 'AbortError'); |
| })); |
| } |
| |
| { |
| // Check cancelled after doesn't throw. |
| const ac = new AbortController(); |
| const { signal } = ac; |
| |
| const rs = new ReadableStream({ |
| start(controller) { |
| controller.enqueue('asd'); |
| controller.close(); |
| } |
| }); |
| finished(rs, { signal }, common.mustSucceed()); |
| |
| rs.getReader().read().then(common.mustCall((chunk) => { |
| assert.strictEqual(chunk.value, 'asd'); |
| setImmediate(() => ac.abort()); |
| })); |
| } |
| |
| { |
| // Promisified abort works |
| async function run() { |
| const ac = new AbortController(); |
| const { signal } = ac; |
| const rs = new ReadableStream({ |
| start() {} |
| }); |
| setImmediate(() => ac.abort()); |
| await finishedPromise(rs, { signal }); |
| } |
| |
| assert.rejects(run, { name: 'AbortError' }).then(common.mustCall()); |
| } |
| |
| { |
| // Promisified pre-aborted works |
| async function run() { |
| const signal = new EventTarget(); |
| signal.aborted = true; |
| const rs = new ReadableStream({ |
| start() {} |
| }); |
| await finishedPromise(rs, { signal }); |
| } |
| |
| assert.rejects(run, { name: 'AbortError' }).then(common.mustCall()); |
| } |