Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | 135x 22x 135x 135x 1934x 135x 68x 133x 780x 21x 133x 68x 130x 20x 130x 20x 130x 7x 7x 7x 68x | /**
* @module Node Worker streams
*/
import {
BaseDuplexStream,
makeDuplexStreamInputValidator,
} from '../BaseDuplexStream.ts';
import type {
BaseReaderArgs,
BaseWriterArgs,
ValidateInput,
} from '../BaseStream.ts';
import { BaseReader, BaseWriter } from '../BaseStream.ts';
import type { Dispatchable } from '../utils.ts';
export type OnMessage = (message: unknown) => void;
export type NodePort = {
on: (event: 'message', listener: OnMessage) => void;
postMessage: (message: unknown) => void;
};
/**
* A readable stream over a {@link NodePort}.
*
* @see
* - {@link NodeWorkerWriter} for the corresponding writable stream.
* - The module-level documentation for more details.
*/
export class NodeWorkerReader<Read> extends BaseReader<Read> {
/**
* Constructs a new {@link NodeWorkerReader}.
*
* @param port - The node worker port to read from.
* @param options - Options bag for configuring the reader.
* @param options.validateInput - A function that validates input from the transport.
* @param options.onEnd - A function that is called when the stream ends.
*/
constructor(
port: NodePort,
{ validateInput, onEnd }: BaseReaderArgs<Read> = {},
) {
super({
validateInput,
onEnd: async () => await onEnd?.(),
});
const receiveInput = super.getReceiveInput();
port.on('message', (data) => {
receiveInput(data).catch(async (error) => this.throw(error));
});
harden(this);
}
}
harden(NodeWorkerReader);
/**
* A writable stream over a {@link NodeWorker}.
*
* @see
* - {@link NodeWorkerReader} for the corresponding readable stream.
* - The module-level documentation for more details.
*/
export class NodeWorkerWriter<Write> extends BaseWriter<Write> {
/**
* Constructs a new {@link NodeWorkerWriter}.
*
* @param port - The node worker port to write to.
* @param options - Options bag for configuring the writer.
* @param options.name - The name of the stream, for logging purposes.
* @param options.onEnd - A function that is called when the stream ends.
*/
constructor(
port: NodePort,
{ name, onEnd }: Omit<BaseWriterArgs<Write>, 'onDispatch'> = {},
) {
super({
name,
onDispatch: (value: Dispatchable<Write>) => port.postMessage(value),
onEnd: async () => {
await onEnd?.();
},
});
harden(this);
}
}
harden(NodeWorkerWriter);
/**
* A duplex stream over a Node worker port.
*/
export class NodeWorkerDuplexStream<
Read,
Write = Read,
> extends BaseDuplexStream<
Read,
NodeWorkerReader<Read>,
Write,
NodeWorkerWriter<Write>
> {
/**
* Constructs a new {@link NodeWorkerDuplexStream}.
*
* @param port - The node worker port for bidirectional communication.
* @param validateInput - A function that validates input from the transport.
*/
constructor(port: NodePort, validateInput?: ValidateInput<Read>) {
let writer: NodeWorkerWriter<Write>; // eslint-disable-line prefer-const
const reader = new NodeWorkerReader<Read>(port, {
name: 'NodeWorkerDuplexStream',
validateInput: makeDuplexStreamInputValidator(validateInput),
onEnd: async () => {
await writer.return();
},
});
writer = new NodeWorkerWriter<Write>(port, {
name: 'NodeWorkerDuplexStream',
onEnd: async () => {
await reader.return();
},
});
super(reader, writer);
}
/**
* Creates and synchronizes a new {@link NodeWorkerDuplexStream}.
*
* @param port - The node worker port for bidirectional communication.
* @param validateInput - A function that validates input from the transport.
* @returns A synchronized duplex stream.
*/
static async make<Read, Write = Read>(
port: NodePort,
validateInput?: ValidateInput<Read>,
): Promise<NodeWorkerDuplexStream<Read, Write>> {
const stream = new NodeWorkerDuplexStream<Read, Write>(port, validateInput);
await stream.synchronize();
return stream;
}
}
harden(NodeWorkerDuplexStream);
|