Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | 11x 5x 5x 5x 11x 11x 13x 1x 12x 11x 11x 11x 2x 8x 14x 4x 4x 8x 8x 2x 4x 2x 4x 2x 4x 4x 4x 4x 2x | /**
* This module provides a pair of classes for creating readable and writable streams
* over a [MessagePort](https://developer.mozilla.org/en-US/docs/Web/API/MessagePort).
* The classes are naive passthrough mechanisms for data that assume exclusive access
* to their ports. The lifetime of the underlying message port is expected to be
* coextensive with "the other side".
*
* At the time of writing, there is no ergonomic way to detect the closure of a port. For
* this reason, ports have to be ended manually via `.return()` or `.throw()`. Ending a
* {@link MessagePortWriter} will end any {@link MessagePortReader} reading from the
* remote port and close the entangled ports, but it will not affect any other streams
* connected to the remote or local port, which must also be ended manually. Use
* {@link MessagePortDuplexStream} to create a duplex stream over a single port.
*
* Regarding limitations around detecting `MessagePort` closure, see:
* - https://github.com/fergald/explainer-messageport-close
* - https://github.com/whatwg/html/issues/10201
*
* @module MessagePort streams
*/
import type { OnMessage } from './utils.ts';
import {
BaseDuplexStream,
makeDuplexStreamInputValidator,
} from '../BaseDuplexStream.ts';
import type {
BaseReaderArgs,
BaseWriterArgs,
ValidateInput,
} from '../BaseStream.ts';
import { BaseReader, BaseWriter } from '../BaseStream.ts';
import type { Dispatchable } from '../utils.ts';
/**
* A readable stream over a {@link MessagePort}.
*
* This class is a naive passthrough mechanism for data over a pair of linked message
* ports. Ignores message events dispatched on its port that contain ports, but
* otherwise expects {@link Dispatchable} values to be posted to its port.
*
* @see
* - {@link MessagePortWriter} for the corresponding writable stream.
* - The module-level documentation for more details.
*/
export class MessagePortReader<Read> extends BaseReader<Read> {
/**
* Constructs a new {@link MessagePortReader}.
*
* @param port - The message port to read from.
* @param options - Options bag for configuring the reader.
* @param options.validateInput - A function that validates input from the transport.
* @param options.onEnd - A function that is called when the stream ends.
*/
constructor(
port: MessagePort,
{ validateInput, onEnd }: BaseReaderArgs<Read> = {},
) {
super({
validateInput,
onEnd: async (error) => {
// eslint-disable-next-line @typescript-eslint/no-use-before-define
port.removeEventListener('message', onMessage);
port.close();
await onEnd?.(error);
},
});
const receiveInput = super.getReceiveInput();
const onMessage: OnMessage = (messageEvent) => {
if (messageEvent.ports.length > 0) {
return;
}
receiveInput(messageEvent.data).catch(async (error) => this.throw(error));
};
port.addEventListener('message', onMessage);
port.start();
harden(this);
}
}
harden(MessagePortReader);
/**
* A writable stream over a {@link MessagePort}.
*
* @see
* - {@link MessagePortReader} for the corresponding readable stream.
* - The module-level documentation for more details.
*/
export class MessagePortWriter<Write> extends BaseWriter<Write> {
/**
* Constructs a new {@link MessagePortWriter}.
*
* @param port - The message port to write to.
* @param options - Options bag for configuring the writer.
* @param options.name - The name of the stream, for logging purposes.
* @param options.onEnd - A function that is called when the stream ends.
*/
constructor(
port: MessagePort,
{ name, onEnd }: Omit<BaseWriterArgs<Write>, 'onDispatch'> = {},
) {
super({
name,
onDispatch: (value: Dispatchable<Write>) => port.postMessage(value),
onEnd: async (error) => {
port.close();
await onEnd?.(error);
},
});
port.start();
harden(this);
}
}
harden(MessagePortWriter);
/**
* A duplex stream over a {@link MessagePort}.
*/
export class MessagePortDuplexStream<
Read,
Write = Read,
> extends BaseDuplexStream<
Read,
MessagePortReader<Read>,
Write,
MessagePortWriter<Write>
> {
/**
* Constructs a new {@link MessagePortDuplexStream}.
*
* @param port - The message port to use for bidirectional communication.
* @param validateInput - A function that validates input from the transport.
*/
constructor(port: MessagePort, validateInput?: ValidateInput<Read>) {
let writer: MessagePortWriter<Write>; // eslint-disable-line prefer-const
const reader = new MessagePortReader<Read>(port, {
name: 'MessagePortDuplexStream',
validateInput: makeDuplexStreamInputValidator(validateInput),
onEnd: async () => {
await writer.return();
},
});
writer = new MessagePortWriter<Write>(port, {
name: 'MessagePortDuplexStream',
onEnd: async () => {
await reader.return();
},
});
super(reader, writer);
}
/**
* Creates and synchronizes a new {@link MessagePortDuplexStream}.
*
* @param port - The message port to use for bidirectional communication.
* @param validateInput - A function that validates input from the transport.
* @returns A synchronized duplex stream.
*/
static async make<Read, Write = Read>(
port: MessagePort,
validateInput?: ValidateInput<Read>,
): Promise<MessagePortDuplexStream<Read, Write>> {
const stream = new MessagePortDuplexStream<Read, Write>(
port,
validateInput,
);
await stream.synchronize();
return stream;
}
}
harden(MessagePortDuplexStream);
|