import { captureException } from "@sentry/react";

import { snakeToCamelCase } from "~/util";

import {
  convertResultValue,
  mapColumnToColumnMetadata,
  mapRowToObject,
} from ".";
import {
  AuthOptions,
  MaterializeWebsocket,
  WebSocketResult,
} from "./MaterializeWebsocket";
import { ColumnMetadata, ErrorCode, SqlRequest } from "./types";

export enum SUBSCRIBE_ERROR_CODE {
  CONNECTION_CLOSED = "MZC001",
  KEY_VIOLATION = "MZC002",
  INVALID_STATE = "MZC003",
}

export interface SubscribeError {
  code: ErrorCode | string;
  message: string;
}

export interface SubscribeState<T> {
  /** The current values at the most recent closed timestamp */
  data: T[];
  snapshotComplete: boolean;
  error: SubscribeError | undefined;
}

export interface RawSubscribeOptions<T extends object, R> {
  httpAddress: string;
  request?: SqlRequest;
  closeSocketOnComplete?: boolean;
  authOptions?: AuthOptions;
  flushInterval?: number;
  select?: SelectFunction<T, R>;
}

export type SelectFunction<T extends object, R> = (row: SubscribeRow<T>) => R;

/**
 * Stateful object that executes Materialize SUBSCRIBE over a websocket. Results from the
 * socket are returned in raw form, meaning each progress message potentially causes a
 * render.
 *
 * Note that the subscribe statement must include WITH (PROGRESS) and ENVELOPE UPSERT.
 */
export class RawSubscribe<T extends object, R = SubscribeRow<T>> {
  socket: MaterializeWebsocket;
  select?: SelectFunction<T, R>;
  protected sqlRequest: SqlRequest | undefined;
  protected listeners = new Set<() => void>();
  protected columns: ColumnMetadata[] = [];
  protected closeSocketOnComplete: boolean = false;
  protected querySent: boolean = false;
  protected currentTimestamp: number | undefined;
  /** Holds completed timestamp messages, waiting for the flush interval */
  protected closedTimestampBuffer: SubscribeRow<T>[] = [];
  /** Holds messages from the socket until the timestamp is closed */
  protected currentTimestampBuffer = new Map<number, Array<SubscribeRow<T>>>();
  /** The current state exposed to callers */
  protected currentState: SubscribeState<R> = {
    data: [],
    snapshotComplete: false,
    error: undefined,
  };
  protected flushIntervalHandle: NodeJS.Timeout | undefined;
  protected flushInterval: number = 16;

  constructor(options: RawSubscribeOptions<T, R>) {
    this.socket = new MaterializeWebsocket({
      httpAddress: options.httpAddress,
      authOptions: options.authOptions,
      onReadyForQuery: this.onReadyForQuery,
      onMessage: this.onMessage,
      onClose: this.onClose,
      onOpen: this.onOpen,
    });
    this.closeSocketOnComplete = options.closeSocketOnComplete ?? false;
    this.sqlRequest = options.request;
    this.flushInterval = options.flushInterval ?? this.flushInterval;
    this.select = options.select;
  }

  connect = (
    request?: SqlRequest,
    httpAddress?: string,
    authOptions?: AuthOptions,
  ) => {
    this.disconnect();
    this.sqlRequest = request;
    this.socket.connect(httpAddress, authOptions);
    this.flushIntervalHandle = setInterval(
      this.flushSocketBuffer,
      this.flushInterval,
    );
  };

  disconnect = () => {
    clearInterval(this.flushIntervalHandle);
    this.socket.disconnect();
  };

  reset = () => {
    this.columns = [];
    this.querySent = false;
    this.closedTimestampBuffer = [];
    this.currentTimestamp = undefined;
    this.currentTimestampBuffer = new Map();
    this.setState({
      data: [],
      snapshotComplete: false,
      error: undefined,
    });
  };

  onOpen = () => {
    // We wait until the socket opens successfully to reset state so that we can continue
    // to show stale data if the socket is closed unexpectedly.
    this.reset();
  };

  onChange = (callback: () => void) => {
    this.listeners.add(callback);
    return () => {
      this.listeners.delete(callback);
    };
  };

  /**
   * Returns the current state of the subscribe. The name refers to the React concept,
   * not the Materialize one.
   */
  getSnapshot = () => {
    return this.currentState;
  };

  private setState(update: Partial<SubscribeState<R>>) {
    this.currentState = {
      ...this.currentState,
      ...update,
    };
    for (const callback of this.listeners) {
      callback();
    }
  }

  protected onReadyForQuery = () => {
    if (this.querySent && this.closeSocketOnComplete) {
      this.socket.disconnect();
      return;
    }
    if (this.sqlRequest) {
      this.socket.send(this.sqlRequest);
      this.querySent = true;
    }
  };

  protected onClose = (event: CloseEvent) => {
    this.setState({
      error: {
        code: SUBSCRIBE_ERROR_CODE.CONNECTION_CLOSED,
        message: `Socket closed unexpectedly, code: ${event.code}`,
      },
    });
  };

  protected onMessage = (message: WebSocketResult) => {
    if (message.type === "Error") {
      captureException(
        new Error(`Subscribe error: ${JSON.stringify(message.payload)}`),
      );
      this.setState({
        error: {
          code: message.payload.code,
          message: message.payload.message ?? "Unknown error",
        },
      });
    }
    if (message.type === "Rows") {
      this.columns = message.payload.columns.map(mapColumnToColumnMetadata);
    }
    if (message.type === "Row") {
      this.onRow(message.payload);
    }
  };

  protected onRow = (payload: unknown[]) => {
    // If querySent is false, it means we are still getting results from a previous
    // query, ignore the data.
    if (!this.querySent) return;

    const meta = extractSubscribeMetadata(payload, this.columns);
    if (this.currentTimestamp && meta.mzTimestamp > this.currentTimestamp) {
      // this timestamp is complete, flush it
      const updates = this.currentTimestampBuffer.get(this.currentTimestamp);
      if (updates) {
        this.closedTimestampBuffer.push(...updates);
        this.currentTimestampBuffer.delete(this.currentTimestamp);
      }
    }

    // Once we've received a second progress message, we know we've received
    // the initial snapshot.
    if (
      meta.mzProgressed &&
      this.currentTimestamp &&
      !this.currentState.snapshotComplete
    ) {
      // Eagerly flush the buffer to make the snapshot available.
      this.flushSocketBuffer();
      this.setState({
        snapshotComplete: true,
      });
    }
    // Track the new currently open timestamp.
    this.currentTimestamp = meta.mzTimestamp;
    const row = mapRowToObject<T>(payload, this.columns, [
      "mz_timestamp",
      "mz_state",
      "mz_progressed",
    ]);
    const updates = this.currentTimestampBuffer.get(meta.mzTimestamp) ?? [];
    updates.push({
      ...meta,
      data: row,
    });
    this.currentTimestampBuffer.set(meta.mzTimestamp, updates);
  };

  protected mapRowsToResult = (rows: IterableIterator<SubscribeRow<T>>) => {
    if (!this.select) {
      return Array.from(rows);
    }
    const result = [];
    for (const row of rows) {
      result.push(this.select(row));
    }
    return result;
  };

  protected flushSocketBuffer = () => {
    if (this.closedTimestampBuffer.length === 0) return;

    this.setState({
      data: [
        ...this.currentState.data,
        ...this.mapRowsToResult(this.closedTimestampBuffer.values()),
      ] as R[],
    });
    this.closedTimestampBuffer = [];
    for (const callback of this.listeners) {
      callback();
    }
  };
}

export function extractSubscribeMetadata(
  row: unknown[],
  columns: ColumnMetadata[],
) {
  const result: Record<string, unknown> = {};
  for (let i = 0; i < columns.length; i++) {
    const col = columns[i];
    if (SUBSCRIBE_METADATA_COLUMNS[col.name]) {
      result[snakeToCamelCase(col.name)] = convertResultValue(row[i], col);
    }
  }
  return result as SubscribeMetadata;
}

export type SubscribeMetadata = {
  mzTimestamp: number;
  /** when mzProgressed is true, mzState is null */
  mzState: null | "upsert" | "delete" | "key_violation";
  mzProgressed?: boolean;
};

export interface SubscribeRow<T> extends SubscribeMetadata {
  data: T;
}

// Copied from https://materialize.com/docs/sql/subscribe/#output
export const SUBSCRIBE_METADATA_COLUMNS: { [columnName: string]: string } = {
  mz_timestamp: "mz_timestamp",
  mz_progressed: "mz_progressed",
  mz_diff: "mz_diff",
  mz_state: "mz_state",
};
