All files / kernel-browser-runtime/src/internal-comms internal-connections.ts

100% Statements 50/50
100% Branches 12/12
100% Functions 13/13
100% Lines 50/50

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195                      3x                                                                     3x         2x 2x 2x   2x       2x   2x             2x       2x 1x 1x     1x     2x     3x     11x 11x       4x       11x 2x         11x                                                         3x           10x     10x 8x 8x   2x 2x 2x       10x 10x 13x 1x     1x         12x 12x 1x 1x   11x   11x 11x   11x 5x       5x 1x 5x 4x         2x           2x 2x        
import { stringify } from '@metamask/kernel-utils';
import type { JsonRpcCall } from '@metamask/kernel-utils';
import type { Logger } from '@metamask/logger';
import { PostMessageDuplexStream } from '@metamask/streams/browser';
import { isJsonRpcRequest, isJsonRpcResponse } from '@metamask/utils';
import type { JsonRpcResponse } from '@metamask/utils';
import { nanoid } from 'nanoid';
 
import { isCommsControlMessage } from './comms-control-message.ts';
import type { CommsControlMessage } from './comms-control-message.ts';
 
export const COMMS_CONTROL_CHANNEL_NAME = 'comms-control';
 
export type KernelRpcStream = PostMessageDuplexStream<
  JsonRpcCall,
  JsonRpcResponse
>;
 
export type KernelRpcReplyStream = PostMessageDuplexStream<
  JsonRpcResponse,
  JsonRpcCall
>;
 
type HandleInternalMessage = (
  request: JsonRpcCall,
) => Promise<JsonRpcResponse | void>;
 
type Options = {
  label: string;
  logger: Logger;
  controlChannelName?: string;
};
 
/**
 * Establishes a connection between an internal process, e.g. a UI instance, and the kernel.
 * Should be called exactly once per internal process, during initialization, after the
 * kernel has called {@link receiveInternalConnections}.
 *
 * @param options - The options for the connection.
 * @param options.logger - The logger instance.
 * @param options.controlChannelName - The name of the control channel. Must match
 * the name used by {@link receiveInternalConnections} on the other end.
 * @param options.label - The label of the internal process. Used to identify the internal
 * process in the logs.
 * @returns The kernel control reply stream.
 */
export const connectToKernel = async ({
  label,
  logger,
  controlChannelName = COMMS_CONTROL_CHANNEL_NAME,
}: Options): Promise<KernelRpcReplyStream> => {
  const commsControlChannel = new BroadcastChannel(controlChannelName);
  const commsChannelName = `${label}-${nanoid()}`;
  const commsChannel = new BroadcastChannel(commsChannelName);
 
  commsControlChannel.postMessage({
    method: 'init',
    params: { channelName: commsChannelName },
  } satisfies CommsControlMessage);
  commsControlChannel.close();
 
  const kernelStream = await PostMessageDuplexStream.make<
    JsonRpcResponse,
    JsonRpcCall
  >({
    validateInput: isJsonRpcResponse,
    messageTarget: commsChannel,
    onEnd: () => {
      commsChannel.close();
    },
  });
 
  commsChannel.onmessageerror = (event) => {
    logger.error(`Internal comms channel error: ${stringify(event.data)}`);
    kernelStream
      .throw(new Error(stringify(event.data)))
      .catch(/* istanbul ignore next */ () => undefined);
    commsChannel.close();
  };
 
  return kernelStream;
};
 
const connectToInternalProcess = async (
  channelName: string,
): Promise<KernelRpcStream> => {
  const channel = new BroadcastChannel(channelName);
  const stream: KernelRpcStream = await PostMessageDuplexStream.make({
    validateInput: isJsonRpcRequest,
    messageTarget: channel,
    onEnd: () => {
      channel.close();
    },
  });
 
  channel.onmessageerror = (event) => {
    stream
      .throw(new Error(stringify(event.data)))
      .catch(/* istanbul ignore next */ () => undefined);
  };
 
  return stream;
};
 
type ReceiveConnectionsOptions = Omit<Options, 'label'> &
  (
    | {
        handler: HandleInternalMessage;
        handlerPromise?: never;
      }
    | {
        handler?: never;
        handlerPromise: Promise<HandleInternalMessage>;
      }
  );
 
/**
 * Listens for connections between the kernel and an internal process, e.g. a UI instance.
 * Should be called exactly once in the kernel, during initialization, before any internal
 * processes have attempted to connect.
 *
 * @param options - The options for the connection.
 * @param options.handler - The function to handle internal messages. Mutually exclusive
 * with `handlerPromise`.
 * @param options.handlerPromise - A promise that resolves to the handler function.
 * Mutually exclusive with `handler`.
 * @param options.logger - The logger instance.
 * @param options.controlChannelName - The name of the control channel. Must match
 * the name used by {@link connectToKernel} on the other end.
 */
export const receiveInternalConnections = ({
  handler: directHandler,
  handlerPromise,
  logger,
  controlChannelName = COMMS_CONTROL_CHANNEL_NAME,
}: ReceiveConnectionsOptions): void => {
  let handler: HandleInternalMessage | null = null;
  let handlerReady: Promise<HandleInternalMessage>;
 
  if (handlerPromise === undefined) {
    handler = directHandler;
    handlerReady = Promise.resolve(directHandler);
  } else {
    handlerReady = handlerPromise.then((resolvedHandler) => {
      handler = resolvedHandler;
      return resolvedHandler;
    });
  }
 
  const seenChannels = new Set<string>();
  new BroadcastChannel(controlChannelName).onmessage = (event) => {
    if (!isCommsControlMessage(event.data)) {
      logger.error(
        `Received invalid internal comms control message: ${stringify(event.data)}`,
      );
      return;
    }
 
    const {
      params: { channelName },
    } = event.data;
    if (seenChannels.has(channelName)) {
      logger.error(`Already connected to internal process "${channelName}"`);
      return;
    }
    seenChannels.add(channelName);
 
    logger.debug(`Connecting to internal process "${channelName}"`);
    connectToInternalProcess(channelName)
      .then(async (kernelRpcStream) => {
        return kernelRpcStream.drain(async (message) => {
          logger.debug(
            `Received message from internal process "${channelName}": ${JSON.stringify(message)}`,
          );
 
          const messageHandler = handler ?? (await handlerReady);
          const reply = await messageHandler(message);
          if (reply !== undefined) {
            await kernelRpcStream.write(reply);
          }
        });
      })
      .catch((error) => {
        logger.error(
          `Error handling message from internal process "${channelName}":`,
          error,
        );
      })
      .finally(() => {
        logger.debug(`Closed connection to internal process "${channelName}"`);
        seenChannels.delete(channelName);
      });
  };
};