All files / nodejs/src/daemon rpc-socket-server.ts

0% Statements 0/60
0% Branches 0/28
0% Functions 0/18
0% Lines 0/60

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
import { RpcService } from '@metamask/kernel-rpc-methods';
import type { KernelDatabase } from '@metamask/kernel-store';
import type { Kernel } from '@metamask/ocap-kernel';
import { rpcHandlers } from '@metamask/ocap-kernel/rpc';
import { unlink } from 'node:fs/promises';
import { createServer } from 'node:net';
import type { Server } from 'node:net';
 
/**
 * Handle returned by {@link startRpcSocketServer}.
 */
export type RpcSocketServerHandle = {
  close: () => Promise<void>;
};
 
/**
 * Start a Unix socket server that processes JSON-RPC requests through RpcService.
 *
 * Each connection reads one newline-delimited JSON-RPC request, processes it
 * via the kernel's RPC handlers, writes a JSON-RPC response, and closes.
 *
 * The special `shutdown` method is intercepted before RPC dispatch and triggers
 * the provided {@link onShutdown} callback (if any) after responding to the client.
 *
 * @param options - Server options.
 * @param options.socketPath - The Unix socket path to listen on.
 * @param options.kernel - The kernel instance.
 * @param options.kernelDatabase - The kernel database instance.
 * @param options.onShutdown - Optional callback invoked when a `shutdown` RPC is received.
 * @returns A handle with a `close()` function for cleanup.
 */
export async function startRpcSocketServer({
  socketPath,
  kernel,
  kernelDatabase,
  onShutdown,
}: {
  socketPath: string;
  kernel: Kernel;
  kernelDatabase: KernelDatabase;
  onShutdown?: (() => Promise<void>) | undefined;
}): Promise<RpcSocketServerHandle> {
  const rpcService = new RpcService(rpcHandlers, {
    kernel,
    executeDBQuery: (sql: string) => kernelDatabase.executeQuery(sql),
  });
 
  const server = createServer((socket) => {
    let buffer = '';
 
    const onData = (data: Buffer): void => {
      buffer += data.toString();
      const idx = buffer.indexOf('\n');
      if (idx === -1) {
        return;
      }
 
      // One request per connection — stop listening for further data.
      socket.removeListener('data', onData);
 
      const line = buffer.slice(0, idx);
      const remaining = buffer.slice(idx + 1);
      buffer = '';
 
      if (remaining.length > 0) {
        socket.end(
          `${JSON.stringify({ jsonrpc: '2.0', id: null, error: { code: -32600, message: 'Only one request per connection is allowed' } })}\n`,
        );
        return;
      }
 
      handleRequest(rpcService, line, onShutdown)
        .then((response) => {
          socket.end(`${JSON.stringify(response)}\n`);
          return undefined;
        })
        .catch(() => {
          socket.end(
            `${JSON.stringify({ jsonrpc: '2.0', id: null, error: { code: -32603, message: 'Internal error' } })}\n`,
          );
        });
    };
    socket.on('data', onData);
 
    socket.on('error', () => {
      // Ignore client socket errors (e.g. broken pipe from probe connections)
    });
  });
 
  await listen(server, socketPath);
 
  return {
    close: async () => {
      await new Promise<void>((resolve, reject) => {
        server.close((error) => {
          if (error) {
            reject(error);
          } else {
            resolve();
          }
        });
      });
    },
  };
}
 
/**
 * Handle a single JSON-RPC request line, intercepting the `shutdown` method.
 *
 * If the method is `shutdown` and an `onShutdown` callback is provided, the
 * callback is scheduled (without awaiting) after a successful response is
 * returned. All other methods are delegated to {@link processRequest}.
 *
 * @param rpcService - The RPC service to execute methods against.
 * @param line - The raw JSON line from the socket.
 * @param onShutdown - Optional shutdown callback.
 * @returns A JSON-RPC response object.
 */
async function handleRequest(
  rpcService: RpcService<typeof rpcHandlers>,
  line: string,
  onShutdown?: () => Promise<void>,
): Promise<Record<string, unknown>> {
  try {
    const request = JSON.parse(line) as {
      id?: unknown;
      method?: string;
    };
 
    if (request.method === 'shutdown') {
      const id = request.id ?? null;
      // Schedule shutdown after responding to the client.
      if (onShutdown) {
        setTimeout(() => {
          onShutdown().catch(() => {
            // Best-effort shutdown — errors are logged by the caller.
          });
        }, 0);
      }
      return { jsonrpc: '2.0', id, result: { status: 'shutting down' } };
    }
  } catch {
    // Fall through to processRequest which handles parse errors.
  }
 
  return processRequest(rpcService, line);
}
 
/**
 * Process a single JSON-RPC request line and return a JSON-RPC response.
 *
 * @param rpcService - The RPC service to execute methods against.
 * @param line - The raw JSON line from the socket.
 * @returns A JSON-RPC response object.
 */
async function processRequest(
  rpcService: RpcService<typeof rpcHandlers>,
  line: string,
): Promise<Record<string, unknown>> {
  let id: unknown = null;
 
  try {
    const request = JSON.parse(line) as {
      jsonrpc?: string;
      id?: unknown;
      method?: string;
      params?: unknown;
    };
    id = request.id ?? null;
 
    const { method } = request;
    // Default to empty array when no params provided (handlers expect validated params)
    const params = request.params ?? [];
 
    if (typeof method !== 'string') {
      return {
        jsonrpc: '2.0',
        id,
        error: { code: -32600, message: 'Invalid request: missing method' },
      };
    }
 
    rpcService.assertHasMethod(method);
    const result = await rpcService.execute(method, params);
 
    return { jsonrpc: '2.0', id, result: result ?? null };
  } catch (error) {
    const code = isRpcError(error) ? error.code : -32603;
    const message = error instanceof Error ? error.message : 'Internal error';
 
    return { jsonrpc: '2.0', id, error: { code, message } };
  }
}
 
/**
 * Check if an error is an RPC error with a numeric code.
 *
 * @param error - The error to check.
 * @returns True if the error has a numeric code property.
 */
function isRpcError(error: unknown): error is { code: number } {
  return (
    typeof error === 'object' &&
    error !== null &&
    'code' in error &&
    typeof (error as { code: unknown }).code === 'number'
  );
}
 
/**
 * Start listening on a Unix socket path.
 *
 * @param server - The net.Server instance.
 * @param socketPath - The Unix socket path.
 */
async function listen(server: Server, socketPath: string): Promise<void> {
  // Remove stale socket file from a previous run, if any.
  try {
    await unlink(socketPath);
  } catch {
    // Ignore — file may not exist.
  }
 
  return new Promise((resolve, reject) => {
    server.on('error', reject);
    server.listen(socketPath, () => {
      server.removeListener('error', reject);
      resolve();
    });
  });
}