All files / kernel-ui/src/services stream.ts

0% Statements 0/16
0% Branches 0/4
0% Functions 0/6
0% Lines 0/16

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68                                                                                                                                       
import {
  rpcMethodSpecs,
  connectToKernel,
} from '@metamask/kernel-browser-runtime';
import type { KernelControlMethod } from '@metamask/kernel-browser-runtime';
import { RpcClient } from '@metamask/kernel-rpc-methods';
import type {
  ExtractParams,
  ExtractResult,
} from '@metamask/kernel-rpc-methods';
 
import { logger } from './logger.ts';
 
export type CallKernelMethod = <Method extends KernelControlMethod>(command: {
  method: Method;
  params: ExtractParams<Method, typeof rpcMethodSpecs>;
}) => Promise<ExtractResult<Method, typeof rpcMethodSpecs>>;
 
/**
 * Setup the stream for sending and receiving messages.
 *
 * @returns A function for sending messages.
 */
export async function setupStream(): Promise<{
  callKernelMethod: CallKernelMethod;
}> {
  const kernelStream = await connectToKernel({ label: 'ui-instance', logger });
 
  const rpcClient = new RpcClient(
    rpcMethodSpecs,
    async (request) => {
      await kernelStream.write(request);
    },
    'panel',
  );
 
  const cleanup = (): void => {
    rpcClient.rejectAll(new Error('Stream disconnected'));
    // Explicitly _do not_ return the stream, as the connection will be
    // re-established when the panel is reloaded. If we return the stream,
    // the remote end will be closed and the connection irrevocably lost.
  };
 
  window.addEventListener('unload', cleanup);
 
  kernelStream
    .drain(async (response) => {
      if (typeof response.id !== 'string') {
        throw new Error('Invalid response id');
      }
 
      rpcClient.handleResponse(response.id, response);
    })
    .catch((error) => {
      logger.error('error draining kernel stream', error);
    })
    .finally(cleanup);
 
  const callKernelMethod: CallKernelMethod = async (payload) => {
    if (payload.method !== 'getStatus') {
      logger.log('sending message', payload);
    }
    return await rpcClient.call(payload.method, payload.params);
  };
 
  return { callKernelMethod };
}