All files / kernel-agents/src utils.ts

96.42% Statements 27/28
80% Branches 8/10
100% Functions 5/5
96.29% Lines 26/27

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95                          4x       5x 5x   5x                         4x                 6x 7x 7x 7x 7x 5x 5x     1x                         4x     3x   15x     15x 15x 17x 17x   5x 2x   3x     1x       2x 2x            
import type { Logger } from '@metamask/logger';
 
import type { SampleCollector } from './types.ts';
 
export { ifDefined } from '@metamask/kernel-utils';
 
/**
 * Await a promise, and call the abort callback when done or on error.
 *
 * @param abort - The function to call to abort the operation.
 * @param func - The function to call to perform the operation.
 * @returns The result of the operation.
 */
export const withAbort = async <Result>(
  abort: () => Promise<void>,
  func: () => Promise<Result>,
): Promise<Result> => {
  try {
    return await func();
  } finally {
    await abort();
  }
};
 
/**
 * Gather a streaming response from a stream of chunks.
 *
 * @param args - The arguments to gather the streaming response.
 * @param args.stream - The stream to gather from.
 * @param args.parse - The incremental parser to use to parse the response.
 * @param args.logger - The logger to use for the gather.
 * @returns The parsed response.
 */
export const gatherStreamingResponse = async <Result>({
  stream,
  parse,
  logger,
}: {
  stream: AsyncIterable<{ response: string }>;
  parse: SampleCollector<Result>;
  logger?: Logger;
}): Promise<Result> => {
  for await (const chunk of stream) {
    const delta = (chunk as { response: string }).response;
    logger?.info('delta:', delta);
    const parsed = parse(delta);
    if (parsed !== null) {
      logger?.info('parsed:', parsed);
      return parsed;
    }
  }
  throw new Error('Stream ended without a parse event');
};
 
/**
 * Retry a function up to a given number of times.
 *
 * @param func - The function to retry.
 * @param maxRetries - The maximum number of times to retry.
 * @param isRetryable - A function that determines if an error should be retried. Defaults to always retrying.
 * @returns The result of the function.
 * @throws An error if the function fails after all retries.
 * @throws An error if the function throws an error that is not retryable.
 */
export const withRetries = async <Action, Observation>(
  func: () => Promise<[Action, Observation | null]>,
  maxRetries: number = 0,
  isRetryable: (error: unknown) => boolean = () => true,
): Promise<[Action, Observation | null]> => {
  Iif (maxRetries < 1) {
    return await func();
  }
  const errors: unknown[] = [];
  for (let retry = 0; retry < maxRetries; retry++) {
    try {
      return await func();
    } catch (error) {
      if (!isRetryable(error)) {
        throw error;
      }
      errors.push(error);
    }
  }
  throw new Error(
    [
      `Exceeded retry budget of ${maxRetries}.`,
      ...errors.map((error, index) => {
        const message = error instanceof Error ? error.message : String(error);
        return `  ${index + 1}: ${message}`;
      }),
    ].join('\n'),
    { cause: errors },
  );
};