All files / kernel-language-model-service/src/test-utils/queue utils.ts

100% Statements 25/25
100% Branches 6/6
100% Functions 10/10
100% Lines 25/25

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99            1x     4x   3x                       1x   5x     5x 5x   5x 1x     4x 4x 5x 5x 5x     4x                                           1x     3x 3x   3x 6x 2x   4x       2x                   1x   2x                
/**
 * Normalize an array or async iterable to an async iterable.
 *
 * @param value - The value to normalize.
 * @returns The normalized value.
 */
export const normalizeToAsyncIterable = <Type>(
  value: Type[] | AsyncIterable<Type>,
): AsyncIterable<Type> =>
  Array.isArray(value)
    ? (async function* () {
        yield* value;
      })()
    : value;
 
/**
 * Map an async iterable to a new async iterable.
 * The mapper receives both the value and whether it's the last item.
 *
 * @param iterable - The iterable to map.
 * @param mapper - The mapper function that receives (value, done).
 * @returns The mapped iterable.
 */
export const mapAsyncIterable = <Type, Result>(
  iterable: AsyncIterable<Type>,
  mapper: (value: Type, done: boolean) => Result,
): AsyncIterable<Result> =>
  (async function* () {
    const iterator = iterable[Symbol.asyncIterator]();
    let current = await iterator.next();
 
    if (current.done) {
      return;
    }
 
    let next = await iterator.next();
    while (!next.done) {
      yield mapper(current.value, false);
      current = next;
      next = await iterator.next();
    }
 
    yield mapper(current.value, true);
  })();
 
/**
 * Creates a queue-based language model instance.
 * This is a minimal implementation of LanguageModel that uses a queue for responses.
 *
 * @template Options - The type of options supported by the model
 * @template Response - The type of response generated by the model
 * @returns A hardened queue-based language model instance with helper methods
 */
export type StreamWithAbort<Response> = {
  stream: AsyncIterable<Response>;
  abort: () => Promise<void>;
};
 
/**
 * Make an async iterable abortable.
 *
 * @param iterable - The iterable to make abortable.
 * @returns A tuple containing the abortable iterable and the abort function.
 */
export const makeAbortableAsyncIterable = <Type>(
  iterable: AsyncIterable<Type>,
): StreamWithAbort<Type> => {
  let didAbort = false;
  return {
    stream: (async function* () {
      for await (const value of iterable) {
        if (didAbort) {
          break;
        }
        yield value;
      }
    })(),
    abort: async () => {
      didAbort = true;
    },
  };
};
 
/**
 * Make an empty stream with abort.
 *
 * @returns A stream with abort.
 */
export const makeEmptyStreamWithAbort = <
  Response,
>(): StreamWithAbort<Response> => ({
  stream: (async function* () {
    // Empty stream
  })() as AsyncIterable<Response>,
  abort: async () => {
    // No-op abort
  },
});