| 'use strict'; |
| |
| const common = require('../common'); |
| const assert = require('assert'); |
| const { Readable, Writable, Transform, pipeline, PassThrough } = require('stream'); |
| const { pipeline: pipelinePromise } = require('stream/promises'); |
| const { ReadableStream, WritableStream, TransformStream } = require('stream/web'); |
| const http = require('http'); |
| |
| { |
| const values = []; |
| let c; |
| const rs = new ReadableStream({ |
| start(controller) { |
| c = controller; |
| } |
| }); |
| const ws = new WritableStream({ |
| write(chunk) { |
| values.push(chunk); |
| } |
| }); |
| |
| pipeline(rs, ws, common.mustSucceed(() => { |
| assert.deepStrictEqual(values, ['hello', 'world']); |
| })); |
| |
| c.enqueue('hello'); |
| c.enqueue('world'); |
| c.close(); |
| } |
| |
| { |
| let c; |
| const rs = new ReadableStream({ |
| start(controller) { |
| c = controller; |
| } |
| }); |
| |
| const ws = new WritableStream({ |
| write() { } |
| }); |
| |
| pipeline(rs, ws, common.mustCall((err) => { |
| assert.strictEqual(err?.message, 'kaboom'); |
| })); |
| |
| c.error(new Error('kaboom')); |
| } |
| |
| { |
| let c; |
| const values = []; |
| const rs = new ReadableStream({ |
| start(controller) { |
| c = controller; |
| } |
| }); |
| |
| const ts = new TransformStream({ |
| transform(chunk, controller) { |
| controller.enqueue(chunk?.toString().toUpperCase()); |
| } |
| }); |
| |
| const ws = new WritableStream({ |
| write(chunk) { |
| values.push(chunk?.toString()); |
| } |
| }); |
| |
| pipeline(rs, ts, ws, common.mustSucceed(() => { |
| assert.deepStrictEqual(values, ['HELLO', 'WORLD']); |
| })); |
| |
| c.enqueue('hello'); |
| c.enqueue('world'); |
| c.close(); |
| } |
| |
| { |
| function makeTransformStream() { |
| return new TransformStream({ |
| transform(chunk, controller) { |
| controller.enqueue(chunk?.toString()); |
| } |
| }); |
| } |
| |
| let c; |
| const rs = new ReadableStream({ |
| start(controller) { |
| c = controller; |
| } |
| }); |
| |
| const ws = new WritableStream({ |
| write() { } |
| }); |
| |
| pipeline(rs, |
| makeTransformStream(), |
| makeTransformStream(), |
| makeTransformStream(), |
| makeTransformStream(), |
| ws, |
| common.mustCall((err) => { |
| assert.strictEqual(err?.message, 'kaboom'); |
| })); |
| |
| c.error(new Error('kaboom')); |
| } |
| |
| { |
| const values = []; |
| |
| const r = new Readable({ |
| read() { } |
| }); |
| |
| const ws = new WritableStream({ |
| write(chunk) { |
| values.push(chunk?.toString()); |
| } |
| }); |
| |
| pipeline(r, ws, common.mustSucceed(() => { |
| assert.deepStrictEqual(values, ['helloworld']); |
| })); |
| |
| r.push('hello'); |
| r.push('world'); |
| r.push(null); |
| } |
| |
| { |
| const values = []; |
| let c; |
| const rs = new ReadableStream({ |
| start(controller) { |
| c = controller; |
| } |
| }); |
| |
| const w = new Writable({ |
| write(chunk, encoding, callback) { |
| values.push(chunk?.toString()); |
| callback(); |
| } |
| }); |
| |
| pipeline(rs, w, common.mustSucceed(() => { |
| assert.deepStrictEqual(values, ['hello', 'world']); |
| })); |
| |
| c.enqueue('hello'); |
| c.enqueue('world'); |
| c.close(); |
| } |
| |
| { |
| const values = []; |
| let c; |
| const rs = new ReadableStream({ |
| start(controller) { |
| c = controller; |
| } |
| }); |
| |
| const ws = new WritableStream({ |
| write(chunk) { |
| values.push(chunk?.toString()); |
| } |
| }); |
| |
| const t = new Transform({ |
| transform(chunk, encoding, callback) { |
| callback(null, chunk?.toString().toUpperCase()); |
| } |
| }); |
| |
| pipeline(rs, t, ws, common.mustSucceed(() => { |
| assert.deepStrictEqual(values, ['HELLOWORLD']); |
| })); |
| |
| c.enqueue('hello'); |
| c.enqueue('world'); |
| c.close(); |
| } |
| |
| { |
| const server = http.createServer((req, res) => { |
| const rs = new ReadableStream({ |
| start(controller) { |
| controller.enqueue('hello'); |
| controller.enqueue('world'); |
| controller.close(); |
| } |
| }); |
| pipeline(rs, res, common.mustSucceed(() => {})); |
| }); |
| |
| server.listen(0, common.mustCall(() => { |
| const req = http.request({ |
| port: server.address().port |
| }); |
| req.end(); |
| const values = []; |
| req.on('response', common.mustCall((res) => { |
| res.on('data', (chunk) => { |
| values.push(chunk?.toString()); |
| }); |
| res.on('end', common.mustCall(() => { |
| assert.deepStrictEqual(values, ['hello', 'world']); |
| server.close(); |
| })); |
| })); |
| })); |
| } |
| |
| { |
| const values = []; |
| const server = http.createServer(common.mustCall((req, res) => { |
| const ts = new TransformStream({ |
| transform(chunk, controller) { |
| controller.enqueue(chunk?.toString().toUpperCase()); |
| } |
| }); |
| pipeline(req, ts, res, common.mustSucceed()); |
| })); |
| |
| server.listen(0, common.mustCall(() => { |
| const req = http.request({ |
| port: server.address().port, |
| method: 'POST', |
| }); |
| |
| |
| const rs = new ReadableStream({ |
| start(controller) { |
| controller.enqueue('hello'); |
| controller.close(); |
| } |
| }); |
| |
| pipeline(rs, req, common.mustSucceed()); |
| |
| req.on('response', common.mustCall((res) => { |
| res.on('data', (chunk) => { |
| values.push(chunk?.toString()); |
| } |
| ); |
| res.on('end', common.mustCall(() => { |
| assert.deepStrictEqual(values, ['HELLO']); |
| server.close(); |
| })); |
| })); |
| })); |
| } |
| |
| { |
| const values = []; |
| let c; |
| const rs = new ReadableStream({ |
| start(controller) { |
| c = controller; |
| } |
| }); |
| const ws = new WritableStream({ |
| write(chunk) { |
| values.push(chunk?.toString()); |
| } |
| }); |
| |
| pipelinePromise(rs, ws).then(common.mustCall(() => { |
| assert.deepStrictEqual(values, ['hello', 'world']); |
| })); |
| |
| c.enqueue('hello'); |
| c.enqueue('world'); |
| c.close(); |
| } |
| |
| { |
| let c; |
| const rs = new ReadableStream({ |
| start(controller) { |
| c = controller; |
| } |
| }); |
| |
| const ws = new WritableStream({ |
| write() { } |
| }); |
| |
| pipelinePromise(rs, ws).then(common.mustNotCall()).catch(common.mustCall((err) => { |
| assert.strictEqual(err?.message, 'kaboom'); |
| })); |
| |
| c.error(new Error('kaboom')); |
| } |
| |
| { |
| const values = []; |
| let c; |
| const rs = new ReadableStream({ |
| start(controller) { |
| c = controller; |
| } |
| }); |
| |
| pipeline(rs, async function(source) { |
| for await (const chunk of source) { |
| values.push(chunk?.toString()); |
| } |
| }, common.mustSucceed(() => { |
| assert.deepStrictEqual(values, ['hello', 'world']); |
| })); |
| |
| c.enqueue('hello'); |
| c.enqueue('world'); |
| c.close(); |
| } |
| |
| { |
| const rs = new ReadableStream({ |
| start() {} |
| }); |
| |
| pipeline(rs, async function(source) { |
| throw new Error('kaboom'); |
| }, common.mustCall((err) => { |
| assert.strictEqual(err?.message, 'kaboom'); |
| })); |
| } |
| |
| { |
| const values = []; |
| let c; |
| const rs = new ReadableStream({ |
| start(controller) { |
| c = controller; |
| } |
| }); |
| |
| const ts = new TransformStream({ |
| transform(chunk, controller) { |
| controller.enqueue(chunk?.toString().toUpperCase()); |
| } |
| }); |
| |
| pipeline(rs, ts, async function(source) { |
| for await (const chunk of source) { |
| values.push(chunk?.toString()); |
| } |
| }, common.mustSucceed(() => { |
| assert.deepStrictEqual(values, ['HELLO', 'WORLD']); |
| })); |
| |
| c.enqueue('hello'); |
| c.enqueue('world'); |
| c.close(); |
| } |
| |
| { |
| const values = []; |
| let c; |
| const rs = new ReadableStream({ |
| start(controller) { |
| c = controller; |
| } |
| }); |
| |
| const ws = new WritableStream({ |
| write(chunk) { |
| values.push(chunk?.toString()); |
| } |
| }); |
| |
| pipeline(rs, async function* (source) { |
| for await (const chunk of source) { |
| yield chunk?.toString().toUpperCase(); |
| } |
| }, ws, common.mustSucceed(() => { |
| assert.deepStrictEqual(values, ['HELLO', 'WORLD']); |
| })); |
| |
| c.enqueue('hello'); |
| c.enqueue('world'); |
| c.close(); |
| } |
| |
| { |
| let c; |
| const rs = new ReadableStream({ |
| start(controller) { |
| c = controller; |
| } |
| }); |
| |
| const ws = new WritableStream({ |
| write(chunk) { } |
| }, { highWaterMark: 0 }); |
| |
| pipeline(rs, ws, common.mustNotCall()); |
| |
| for (let i = 0; i < 10; i++) { |
| c.enqueue(`${i}`); |
| } |
| c.close(); |
| } |
| |
| { |
| const rs = new ReadableStream({ |
| start(controller) { |
| controller.close(); |
| } |
| }); |
| |
| pipeline(rs, new PassThrough(), common.mustSucceed()); |
| } |