blob: ab2c81304ec2ac98605c1cee7092436029a6f108 [file] [edit]
// Flags: --experimental-stream-iter
'use strict';
const common = require('../common');
const assert = require('assert');
const { broadcast, text } = require('stream/iter');
// =============================================================================
// Basic broadcast
// =============================================================================
async function testBasicBroadcast() {
const { writer, broadcast: bc } = broadcast();
// Create two consumers
const consumer1 = bc.push();
const consumer2 = bc.push();
assert.strictEqual(bc.consumerCount, 2);
await writer.write('hello');
await writer.end();
const [data1, data2] = await Promise.all([
text(consumer1),
text(consumer2),
]);
assert.strictEqual(data1, 'hello');
assert.strictEqual(data2, 'hello');
}
async function testMultipleWrites() {
const { writer, broadcast: bc } = broadcast({ highWaterMark: 10 });
const consumer = bc.push();
await writer.write('a');
await writer.write('b');
await writer.write('c');
await writer.end();
const data = await text(consumer);
assert.strictEqual(data, 'abc');
}
async function testConsumerCount() {
const { broadcast: bc } = broadcast();
assert.strictEqual(bc.consumerCount, 0);
const c1 = bc.push();
assert.strictEqual(bc.consumerCount, 1);
bc.push();
assert.strictEqual(bc.consumerCount, 2);
bc.cancel();
// After cancel, consumer count drops to 0
assert.strictEqual(bc.consumerCount, 0);
// Consumers are detached and yield nothing
const batches = [];
for await (const batch of c1) {
batches.push(batch);
}
assert.strictEqual(batches.length, 0);
}
// =============================================================================
// Writer methods
// =============================================================================
async function testWriteSync() {
const { writer, broadcast: bc } = broadcast({ highWaterMark: 2 });
const consumer = bc.push();
assert.strictEqual(writer.writeSync('a'), true);
assert.strictEqual(writer.writeSync('b'), true);
// Buffer full (highWaterMark=2, strict policy)
assert.strictEqual(writer.writeSync('c'), false);
writer.endSync();
const data = await text(consumer);
assert.strictEqual(data, 'ab');
}
async function testWritevSync() {
const { writer, broadcast: bc } = broadcast({ highWaterMark: 10 });
const consumer = bc.push();
assert.strictEqual(writer.writevSync(['hello', ' ', 'world']), true);
writer.endSync();
const data = await text(consumer);
assert.strictEqual(data, 'hello world');
}
async function testWriterEnd() {
const { writer, broadcast: bc } = broadcast();
const consumer = bc.push();
await writer.write('data');
const totalBytes = await writer.end();
assert.strictEqual(totalBytes, 4); // 'data' = 4 UTF-8 bytes
const data = await text(consumer);
assert.strictEqual(data, 'data');
}
async function testWriterFail() {
const { writer, broadcast: bc } = broadcast();
const consumer = bc.push();
writer.fail(new Error('test error'));
await assert.rejects(
async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of consumer) {
assert.fail('Should not reach here');
}
},
{ message: 'test error' },
);
}
// =============================================================================
// Cancel
// =============================================================================
async function testCancelWithoutReason() {
const { broadcast: bc } = broadcast();
const consumer = bc.push();
bc.cancel();
const batches = [];
for await (const batch of consumer) {
batches.push(batch);
}
assert.strictEqual(batches.length, 0);
}
async function testCancelWithReason() {
const { broadcast: bc } = broadcast();
// Start a consumer that is waiting for data (promise pending)
const consumer = bc.push();
const resultPromise = text(consumer).catch((err) => err);
// Give the consumer time to enter the waiting state
await new Promise((resolve) => setImmediate(resolve));
bc.cancel(new Error('cancelled'));
const result = await resultPromise;
assert.ok(result instanceof Error);
assert.strictEqual(result.message, 'cancelled');
}
// =============================================================================
// Writer fail detaches consumers
// =============================================================================
async function testFailDetachesConsumers() {
const { writer, broadcast: bc } = broadcast();
const consumer1 = bc.push();
const consumer2 = bc.push();
assert.strictEqual(bc.consumerCount, 2);
// Write some data, then fail the writer
await writer.write('data');
await writer.fail(new Error('writer failed'));
// After fail, consumers are detached
assert.strictEqual(bc.consumerCount, 0);
// Both consumers should see the error
await assert.rejects(
async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of consumer1) {
assert.fail('Should not reach here');
}
},
{ message: 'writer failed' },
);
await assert.rejects(
async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of consumer2) {
assert.fail('Should not reach here');
}
},
{ message: 'writer failed' },
);
}
// =============================================================================
// Writer fail idempotent
// =============================================================================
async function testWriterFailIdempotent() {
const { writer, broadcast: bc } = broadcast();
const consumer = bc.push();
writer.writeSync('hello');
writer.fail(new Error('fail!'));
// Second call is a no-op (already errored)
writer.fail(new Error('fail2'));
await assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of consumer) { /* consume */ }
}, { message: 'fail!' });
}
// cancel() with falsy reason (0, "", false) should still treat as error
async function testCancelWithFalsyReason() {
const { broadcast: bc } = broadcast();
const consumer = bc.push();
const resultPromise = text(consumer).catch((err) => err);
await new Promise((resolve) => setImmediate(resolve));
bc.cancel(0);
const result = await resultPromise;
assert.strictEqual(result, 0);
}
// Late-joining consumer should read from oldest buffered entry
async function testLateJoinerSeesBufferedData() {
const { writer, broadcast: bc } = broadcast({ highWaterMark: 16 });
// Write data before any consumer joins
writer.writeSync('before-join');
writer.endSync();
// Consumer joins after data is written
const consumer = bc.push();
const result = await text(consumer);
assert.strictEqual(result, 'before-join');
}
Promise.all([
testBasicBroadcast(),
testMultipleWrites(),
testConsumerCount(),
testWriteSync(),
testWritevSync(),
testWriterEnd(),
testWriterFail(),
testCancelWithoutReason(),
testCancelWithReason(),
testCancelWithFalsyReason(),
testFailDetachesConsumers(),
testWriterFailIdempotent(),
testLateJoinerSeesBufferedData(),
]).then(common.mustCall());