/**
 * Like simple event, but can be closed, and can be iterated over
 */
export interface Channel<ReadMessage, WriteMessage = ReadMessage> {
  /**
   * Adds a listener that will be invoked whenever an event is emitted.
   * Returns an unsubscribe function to remove the listener.
   */
  addListener(listener: (e: ReadMessage) => void): () => void;

  /**
   * Emits an event immediately to all current listeners.
   * Throws an error if the channel is closed.
   */
  emit(event: WriteMessage): void;

  /**
   * Returns a promise that resolves with the next event that satisfies the predicate.
   * If the channel is closed, the promise is rejected.
   */
  wait(predicate?: (e: ReadMessage) => boolean): Promise<ReadMessage>;

  /**
   * Closes the channel. After closing, no further listeners can be added,
   * and any attempts to emit events or wait for events will throw errors.
   */
  close(): void;

  signal: AbortSignal;

  /**
   * Returns an async iterator over the events.
   * The iteration stops when the channel is closed.
   */
  [Symbol.asyncIterator](): AsyncIterator<ReadMessage>;
}

export function createChannel<E>(): Channel<E> {
  let listeners: Array<(e: E) => void> = [];
  const controller = new AbortController();

  const id = Math.random().toString(36).substring(7);

  const channel: Channel<E> = {
    signal: controller.signal,
    addListener(listener) {
      if (controller.signal.aborted) {
        throw new ChannelError('ChannelClosed', 'Cannot add listener');
      }
      listeners.push(listener);
      // Return an unsubscribe function that removes this listener
      return () => {
        listeners = listeners.filter(l => l !== listener);
      };
    },

    emit(event) {
      if (controller.signal.aborted) {
        throw new ChannelError('ChannelClosed', 'Cannot emit');
      }
      // Copy the listeners array to avoid issues if listeners are removed during iteration
      for (const listener of [...listeners]) {
        listener(event);
      }
    },

    wait(predicate) {
      if (controller.signal.aborted) {
        return Promise.reject(
          new ChannelError('ChannelClosed', 'Cannot wait for a message')
        );
      }
      return new Promise((resolve, reject) => {
        // Create a temporary listener that checks the predicate.
        const handler = (event: E) => {
          if (predicate === undefined || predicate(event)) {
            unsubscribe();
            resolve(event);
          }
        };
        const unsubscribe = channel.addListener(handler);

        // Abort the promise if the channel is closed
        controller.signal.addEventListener(
          'abort',
          () => {
            unsubscribe();
            reject(
              new ChannelError('ChannelClosed', 'Cannot wait for a message')
            );
          },
          { once: true }
        );
      });
    },

    close() {
      if (controller.signal.aborted) {
        throw new ChannelError('ChannelClosed', 'Cannot close');
      }
      controller.abort(); // Marks the channel as closed
      listeners = [];
    },

    async *[Symbol.asyncIterator]() {
      while (!controller.signal.aborted) {
        try {
          // here - we should wait for abort event, abbort signal??
          const event = await this.wait(() => true);
          yield event;
        } catch (err) {
          if (err instanceof ChannelError && err.type === 'ChannelClosed') {
            return;
          } else {
            throw err;
          }
        }
      }
    }
  };
  return channel;
}

export type ChannelErrorType = 'ChannelClosed';

export class ChannelError extends Error {
  type: ChannelErrorType = 'ChannelClosed';
  constructor(type: ChannelError['type'], message?: string) {
    super(`${message} (${type})`);
  }
}
