All files / nodejs/src/io socket-channel.ts

90.58% Statements 77/85
84.61% Branches 22/26
94.73% Functions 18/19
91.35% Lines 74/81

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192                                              15x 15x 15x 15x 15x 15x               8x 8x 2x   6x               17x 1x 1x                   7x 7x 7x 8x 8x 8x 8x                   12x 10x     2x   2x       2x 2x     15x 11x 1x               1x 1x         10x   10x 10x 10x   10x 10x 10x 10x       15x 15x         15x 15x 15x 15x 15x       15x   9x 1x   8x 8x 5x     3x 3x         3x 1x   2x 1x   1x 1x 1x 1x     1x             15x     15x 15x 15x 8x 8x   15x 15x     15x 15x             15x    
import type { IOChannel } from '@metamask/ocap-kernel';
import fs from 'node:fs/promises';
import * as net from 'node:net';
import { StringDecoder } from 'node:string_decoder';
 
type PendingReader = {
  resolve: (value: string | null) => void;
};
 
/**
 * Create an IOChannel backed by a Unix domain socket.
 *
 * Creates a `net.Server` listening on the configured socket path.
 * Accepts one connection at a time. Lines are `\n`-delimited.
 *
 * @param name - The channel name (for diagnostics).
 * @param socketPath - The file path for the Unix domain socket.
 * @returns A promise for the IOChannel, resolved once the server is listening.
 */
export async function makeSocketIOChannel(
  name: string,
  socketPath: string,
): Promise<IOChannel> {
  const lineQueue: string[] = [];
  const readerQueue: PendingReader[] = [];
  let currentSocket: net.Socket | null = null;
  let decoder = new StringDecoder('utf8');
  let buffer = '';
  let closed = false;
 
  /**
   * Deliver a line to a pending reader or enqueue it.
   *
   * @param line - The line to deliver.
   */
  function deliverLine(line: string): void {
    const reader = readerQueue.shift();
    if (reader) {
      reader.resolve(line);
    } else {
      lineQueue.push(line);
    }
  }
 
  /**
   * Handle the end of the input stream.
   */
  function deliverEOF(): void {
    while (readerQueue.length > 0) {
      const reader = readerQueue.shift();
      reader?.resolve(null);
    }
  }
 
  /**
   * Handle incoming data by splitting on newlines.
   *
   * @param data - The raw data buffer from the socket.
   */
  function handleData(data: Buffer): void {
    buffer += decoder.write(data);
    let newlineIndex = buffer.indexOf('\n');
    while (newlineIndex !== -1) {
      const line = buffer.slice(0, newlineIndex);
      buffer = buffer.slice(newlineIndex + 1);
      deliverLine(line);
      newlineIndex = buffer.indexOf('\n');
    }
  }
 
  /**
   * Handle the channel disconnecting.
   *
   * @param socket - The socket that disconnected.
   */
  function handleDisconnect(socket: net.Socket): void {
    if (currentSocket !== socket) {
      return;
    }
    // Flush any incomplete multi-byte sequence from the decoder
    buffer += decoder.end();
    // Deliver any remaining buffered data as a final line
    Iif (buffer.length > 0) {
      deliverLine(buffer);
      buffer = '';
    }
    currentSocket = null;
    deliverEOF();
  }
 
  const server = net.createServer((socket) => {
    if (currentSocket) {
      Iif (currentSocket.readableEnded || currentSocket.destroyed) {
        // Old connection is dead but events haven't been fully processed;
        // clean it up and accept the new connection.
        currentSocket.removeAllListeners();
        currentSocket.destroy();
        currentSocket = null;
      } else {
        // Existing active client — reject the new connection
        socket.destroy();
        return;
      }
    }
    // Drain stale data from any previous connection, but keep pending
    // readers alive so they can receive data from the new connection.
    lineQueue.length = 0;
 
    currentSocket = socket;
    decoder = new StringDecoder('utf8');
    buffer = '';
 
    socket.on('data', handleData);
    socket.on('end', () => handleDisconnect(socket));
    socket.on('error', () => handleDisconnect(socket));
    socket.on('close', () => handleDisconnect(socket));
  });
 
  // Remove stale socket file if it exists
  try {
    await fs.unlink(socketPath);
  } catch {
    // Ignore if it doesn't exist
  }
 
  await new Promise<void>((resolve, reject) => {
    server.on('error', reject);
    server.listen(socketPath, () => {
      server.removeListener('error', reject);
      resolve();
    });
  });
 
  const channel: IOChannel = {
    async read(): Promise<string | null> {
      if (closed) {
        return null;
      }
      const queued = lineQueue.shift();
      if (queued !== undefined) {
        return queued;
      }
      // Block until data arrives (from a current or future client connection)
      return new Promise<string | null>((resolve) => {
        readerQueue.push({ resolve });
      });
    },
 
    async write(data: string): Promise<void> {
      if (closed) {
        throw new Error(`IO channel "${name}" is closed`);
      }
      if (!currentSocket) {
        throw new Error(`IO channel "${name}" has no connected client`);
      }
      const socket = currentSocket;
      return new Promise<void>((resolve, reject) => {
        socket.write(`${data}\n`, (error) => {
          Iif (error) {
            reject(error);
          } else {
            resolve();
          }
        });
      });
    },
 
    async close(): Promise<void> {
      Iif (closed) {
        return;
      }
      closed = true;
      deliverEOF();
      if (currentSocket) {
        currentSocket.destroy();
        currentSocket = null;
      }
      await new Promise<void>((resolve) => {
        server.close(() => resolve());
      });
      // Clean up socket file
      try {
        await fs.unlink(socketPath);
      } catch {
        // Ignore
      }
    },
  };
 
  return channel;
}