Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | 129x 24x 129x 129x 1904x 129x 63x 127x 769x 23x 127x 63x 124x 22x 124x 22x 124x 7x 7x 7x 63x | /**
* @module Node Worker streams
*/
import {
BaseDuplexStream,
makeDuplexStreamInputValidator,
} from '../BaseDuplexStream.ts';
import type {
BaseReaderArgs,
BaseWriterArgs,
ValidateInput,
} from '../BaseStream.ts';
import { BaseReader, BaseWriter } from '../BaseStream.ts';
import type { Dispatchable } from '../utils.ts';
export type OnMessage = (message: unknown) => void;
export type NodePort = {
on: (event: 'message', listener: OnMessage) => void;
postMessage: (message: unknown) => void;
};
/**
* A readable stream over a {@link NodePort}.
*
* @see
* - {@link NodeWorkerWriter} for the corresponding writable stream.
* - The module-level documentation for more details.
*/
export class NodeWorkerReader<Read> extends BaseReader<Read> {
/**
* Constructs a new {@link NodeWorkerReader}.
*
* @param port - The node worker port to read from.
* @param options - Options bag for configuring the reader.
* @param options.validateInput - A function that validates input from the transport.
* @param options.onEnd - A function that is called when the stream ends.
*/
constructor(
port: NodePort,
{ validateInput, onEnd }: BaseReaderArgs<Read> = {},
) {
super({
validateInput,
onEnd: async () => await onEnd?.(),
});
const receiveInput = super.getReceiveInput();
port.on('message', (data) => {
receiveInput(data).catch(async (error) => this.throw(error));
});
harden(this);
}
}
harden(NodeWorkerReader);
/**
* A writable stream over a {@link NodeWorker}.
*
* @see
* - {@link NodeWorkerReader} for the corresponding readable stream.
* - The module-level documentation for more details.
*/
export class NodeWorkerWriter<Write> extends BaseWriter<Write> {
/**
* Constructs a new {@link NodeWorkerWriter}.
*
* @param port - The node worker port to write to.
* @param options - Options bag for configuring the writer.
* @param options.name - The name of the stream, for logging purposes.
* @param options.onEnd - A function that is called when the stream ends.
*/
constructor(
port: NodePort,
{ name, onEnd }: Omit<BaseWriterArgs<Write>, 'onDispatch'> = {},
) {
super({
name,
onDispatch: (value: Dispatchable<Write>) => port.postMessage(value),
onEnd: async () => {
await onEnd?.();
},
});
harden(this);
}
}
harden(NodeWorkerWriter);
/**
* A duplex stream over a Node worker port.
*/
export class NodeWorkerDuplexStream<
Read,
Write = Read,
> extends BaseDuplexStream<
Read,
NodeWorkerReader<Read>,
Write,
NodeWorkerWriter<Write>
> {
/**
* Constructs a new {@link NodeWorkerDuplexStream}.
*
* @param port - The node worker port for bidirectional communication.
* @param validateInput - A function that validates input from the transport.
*/
constructor(port: NodePort, validateInput?: ValidateInput<Read>) {
let writer: NodeWorkerWriter<Write>; // eslint-disable-line prefer-const
const reader = new NodeWorkerReader<Read>(port, {
name: 'NodeWorkerDuplexStream',
validateInput: makeDuplexStreamInputValidator(validateInput),
onEnd: async () => {
await writer.return();
},
});
writer = new NodeWorkerWriter<Write>(port, {
name: 'NodeWorkerDuplexStream',
onEnd: async () => {
await reader.return();
},
});
super(reader, writer);
}
/**
* Creates and synchronizes a new {@link NodeWorkerDuplexStream}.
*
* @param port - The node worker port for bidirectional communication.
* @param validateInput - A function that validates input from the transport.
* @returns A synchronized duplex stream.
*/
static async make<Read, Write = Read>(
port: NodePort,
validateInput?: ValidateInput<Read>,
): Promise<NodeWorkerDuplexStream<Read, Write>> {
const stream = new NodeWorkerDuplexStream<Read, Write>(port, validateInput);
await stream.synchronize();
return stream;
}
}
harden(NodeWorkerDuplexStream);
|