import { captureException } from "@sentry/react";
import deepEqual from "fast-deep-equal";

import {
  RawSubscribe,
  RawSubscribeOptions,
  SelectFunction,
  SUBSCRIBE_ERROR_CODE,
  SubscribeRow,
} from "./RawSubscribe";

export type UpsertKeyFunction<T> = (row: T) => string;

/**
 * Stateful object that executes Materialize SUBSCRIBE over a websocket. The statement
 * must include WITH PROGESS and ENVELOPE UPSERT.
 * Results from the socket will be reduced the current set at the latest closed
 * timestamp, based on the provided `upsertKey` function.
 *
 * Note that the subscribe statement must have WITH (PROGRESS) and ENVELOPE UPSERT.
 */
export class UpsertSubscribe<
  T extends object,
  R = SubscribeRow<T>,
> extends RawSubscribe<T, R> {
  /** Hold the current set of upsert objects */
  protected upsertBuffer = new Map<string, SubscribeRow<T>>();
  upsertKey: UpsertKeyFunction<T>;

  constructor(
    options: RawSubscribeOptions<T, R> & { upsertKey: UpsertKeyFunction<T> },
  ) {
    super(options);
    this.upsertKey = options.upsertKey;
  }

  setUpsertKey(upsertKey: UpsertKeyFunction<T>) {
    this.upsertKey = upsertKey;
  }

  setSelect(select: SelectFunction<T, R> | undefined) {
    this.select = select;
  }

  protected flushSocketBuffer = () => {
    if (this.closedTimestampBuffer.length === 0) return;

    for (const change of this.closedTimestampBuffer) {
      switch (change.mzState) {
        case "upsert":
          this.upsertBuffer.set(this.upsertKey(change.data), change);
          break;
        case "delete":
          this.upsertBuffer.delete(this.upsertKey(change.data));
          break;
        case "key_violation":
          captureException(
            new Error(`Invalid mz_state: ${JSON.stringify(change)}`),
          );
          this.currentState = {
            ...this.currentState,
            error: {
              code: SUBSCRIBE_ERROR_CODE.KEY_VIOLATION,
              message: "Key violation",
            },
          };
          break;
        case null:
          // Progress message, which we can ignore here
          break;
        default:
          captureException(
            new Error(`Invalid mz_state: ${JSON.stringify(change)}`),
          );
          this.currentState = {
            ...this.currentState,
            error: {
              code: SUBSCRIBE_ERROR_CODE.INVALID_STATE,
              message: "Internal error",
            },
          };
      }
    }
    this.closedTimestampBuffer = [];

    const newValues = this.mapRowsToResult(this.upsertBuffer.values());
    if (!deepEqual(this.currentState.data, newValues)) {
      this.currentState = {
        ...this.currentState,
        data: newValues as R[],
      };
    }
    for (const callback of this.listeners) {
      callback();
    }
  };
}
