All files / streams/src/node NodeWorkerStream.ts

100% Statements 22/22
100% Branches 2/2
100% Functions 11/11
100% Lines 21/21

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143                                                                                      135x   22x     135x 135x 1934x   135x     68x                                           133x   780x   21x     133x     68x                                           130x       20x     130x     20x     130x                           7x 7x 7x     68x  
/**
 * @module Node Worker streams
 */
 
import {
  BaseDuplexStream,
  makeDuplexStreamInputValidator,
} from '../BaseDuplexStream.ts';
import type {
  BaseReaderArgs,
  BaseWriterArgs,
  ValidateInput,
} from '../BaseStream.ts';
import { BaseReader, BaseWriter } from '../BaseStream.ts';
import type { Dispatchable } from '../utils.ts';
 
export type OnMessage = (message: unknown) => void;
 
export type NodePort = {
  on: (event: 'message', listener: OnMessage) => void;
  postMessage: (message: unknown) => void;
};
 
/**
 * A readable stream over a {@link NodePort}.
 *
 * @see
 * - {@link NodeWorkerWriter} for the corresponding writable stream.
 * - The module-level documentation for more details.
 */
export class NodeWorkerReader<Read> extends BaseReader<Read> {
  /**
   * Constructs a new {@link NodeWorkerReader}.
   *
   * @param port - The node worker port to read from.
   * @param options - Options bag for configuring the reader.
   * @param options.validateInput - A function that validates input from the transport.
   * @param options.onEnd - A function that is called when the stream ends.
   */
  constructor(
    port: NodePort,
    { validateInput, onEnd }: BaseReaderArgs<Read> = {},
  ) {
    super({
      validateInput,
      onEnd: async () => await onEnd?.(),
    });
 
    const receiveInput = super.getReceiveInput();
    port.on('message', (data) => {
      receiveInput(data).catch(async (error) => this.throw(error));
    });
    harden(this);
  }
}
harden(NodeWorkerReader);
 
/**
 * A writable stream over a {@link NodeWorker}.
 *
 * @see
 * - {@link NodeWorkerReader} for the corresponding readable stream.
 * - The module-level documentation for more details.
 */
export class NodeWorkerWriter<Write> extends BaseWriter<Write> {
  /**
   * Constructs a new {@link NodeWorkerWriter}.
   *
   * @param port - The node worker port to write to.
   * @param options - Options bag for configuring the writer.
   * @param options.name - The name of the stream, for logging purposes.
   * @param options.onEnd - A function that is called when the stream ends.
   */
  constructor(
    port: NodePort,
    { name, onEnd }: Omit<BaseWriterArgs<Write>, 'onDispatch'> = {},
  ) {
    super({
      name,
      onDispatch: (value: Dispatchable<Write>) => port.postMessage(value),
      onEnd: async () => {
        await onEnd?.();
      },
    });
    harden(this);
  }
}
harden(NodeWorkerWriter);
 
/**
 * A duplex stream over a Node worker port.
 */
export class NodeWorkerDuplexStream<
  Read,
  Write = Read,
> extends BaseDuplexStream<
  Read,
  NodeWorkerReader<Read>,
  Write,
  NodeWorkerWriter<Write>
> {
  /**
   * Constructs a new {@link NodeWorkerDuplexStream}.
   *
   * @param port - The node worker port for bidirectional communication.
   * @param validateInput - A function that validates input from the transport.
   */
  constructor(port: NodePort, validateInput?: ValidateInput<Read>) {
    let writer: NodeWorkerWriter<Write>; // eslint-disable-line prefer-const
    const reader = new NodeWorkerReader<Read>(port, {
      name: 'NodeWorkerDuplexStream',
      validateInput: makeDuplexStreamInputValidator(validateInput),
      onEnd: async () => {
        await writer.return();
      },
    });
    writer = new NodeWorkerWriter<Write>(port, {
      name: 'NodeWorkerDuplexStream',
      onEnd: async () => {
        await reader.return();
      },
    });
    super(reader, writer);
  }
 
  /**
   * Creates and synchronizes a new {@link NodeWorkerDuplexStream}.
   *
   * @param port - The node worker port for bidirectional communication.
   * @param validateInput - A function that validates input from the transport.
   * @returns A synchronized duplex stream.
   */
  static async make<Read, Write = Read>(
    port: NodePort,
    validateInput?: ValidateInput<Read>,
  ): Promise<NodeWorkerDuplexStream<Read, Write>> {
    const stream = new NodeWorkerDuplexStream<Read, Write>(port, validateInput);
    await stream.synchronize();
    return stream;
  }
}
harden(NodeWorkerDuplexStream);