import { PrimitiveAtom, useSetAtom } from "jotai";
import { RawBuilder } from "kysely";
import React from "react";

import { useAutomaticallyConnectSocket } from "~/hooks/useAutomaticallyConnectSocket";
import { getStore } from "~/jotai";
import { useCurrentEnvironmentHttpAddress } from "~/store/environments";

import { queryBuilder } from "./db";
import {
  RawSubscribe,
  SelectFunction,
  SubscribeError,
  SubscribeRow,
  SubscribeState,
} from "./RawSubscribe";
import { UpsertKeyFunction, UpsertSubscribe } from "./UpsertSubscribe";

export { buildSubscribeQuery } from "~/api/materialize/buildSubscribeQuery";

export type HandleRowCallback<T, R> = (result: SubscribeRow<T>) => R;

export type UseSubscribeOptions<T extends object, R> = {
  subscribe: RawBuilder<T> | undefined;
  closeSocketOnComplete?: boolean;
  clusterName?: string;
  select?: SelectFunction<T, R>;
};

export type UseSubscribeReturn<T> = {
  /** disconnects the socket but keeps state */
  disconnect: () => void;
  /** Clears all state but does not disconnect */
  reset: () => void;
  data: T[];
  isError: boolean;
  snapshotComplete: boolean;
  error: SubscribeError | undefined;
};

/**
 * Executes a subscribe query and handles state internally. All updates are reduced to
 * the current set of values at latest closed timestamp.
 *
 * Note that the subscribe statement must have WITH (PROGRESS) and ENVELOPE UPSERT.
 *
 * The `select` and `upsertKey` functions are not expected to be stable, and new
 * function instances for these options will not restart the subscribe. On each render,
 * the socket will update the function reference.
 */
export function useSubscribe<T extends object, R = SubscribeRow<T>>({
  upsertKey,
  subscribe,
  ...options
}: UseSubscribeOptions<T, R> & {
  upsertKey: UpsertKeyFunction<T>;
}): UseSubscribeReturn<R> {
  const httpAddress = useCurrentEnvironmentHttpAddress();
  const request = useSubscribeRequest(subscribe);
  const subscribeRef = React.useRef(
    new UpsertSubscribe<T, R>({
      request,
      httpAddress,
      upsertKey: upsertKey,
      select: options?.select,
      sessionVariables: {
        cluster: options?.clusterName,
      },
      closeSocketOnComplete: options?.closeSocketOnComplete,
    }),
  );
  useAutomaticallyConnectSocket<T, R>({
    socket: subscribeRef.current.socket,
    subscribe: subscribeRef.current,
    request,
  });

  const { data, error, snapshotComplete } = React.useSyncExternalStore(
    subscribeRef.current.onChange,
    subscribeRef.current.getSnapshot,
  );

  return {
    data,
    disconnect: subscribeRef.current.disconnect,
    reset: subscribeRef.current.reset,
    error,
    isError: Boolean(error),
    snapshotComplete,
  };
}

/**
 * Executes a subscribe query, storing state in the provided atom.
 * All updates are reduced to the current set of values at latest closed timestamp.
 *
 * Note that the subscribe statement must have WITH (PROGRESS) and ENVELOPE UPSERT.
 *
 * The `select` and `upsertKey` functions are not expected to be stable, and new
 * function instances for these options will not restart the subscribe. On each render,
 * the socket will update the function reference.
 */
export function useGlobalUpsertSubscribe<
  T extends object,
  R = SubscribeRow<T>,
>({
  subscribe,
  upsertKey,
  ...options
}: UseSubscribeOptions<T, R> & {
  upsertKey: UpsertKeyFunction<T>;
  atom: PrimitiveAtom<SubscribeState<R>>;
}): { subscribeRef: React.MutableRefObject<UpsertSubscribe<T, R>> } {
  const setValue = useSetAtom(options.atom);
  const httpAddress = useCurrentEnvironmentHttpAddress();
  const request = useSubscribeRequest(subscribe);
  const subscribeRef = React.useRef(
    new UpsertSubscribe<T, R>({
      request,
      httpAddress,
      upsertKey: upsertKey,
      sessionVariables: {
        cluster: options?.clusterName,
      },
      closeSocketOnComplete: options?.closeSocketOnComplete,
      select: options.select,
    }),
  );
  subscribeRef.current.upsertKey = upsertKey;
  subscribeRef.current.select = options.select;
  useAutomaticallyConnectSocket<T, R>({
    socket: subscribeRef.current.socket,
    subscribe: subscribeRef.current,
    request,
  });

  React.useEffect(() => {
    const cleanup = subscribeRef.current.onChange(() => {
      const snapshot = subscribeRef.current.getSnapshot();
      if (getStore().get(options.atom) === snapshot) return;

      setValue(snapshot);
    });
    return cleanup;
  }, [options.atom, setValue]);

  return {
    subscribeRef,
  };
}

/**
 * Executes a subscribe query and handles state internally. The raw updates are flushed
 * for each closed timestamp.
 *
 * Note that the subscribe statement must have WITH (PROGRESS) and ENVELOPE UPSERT.
 *
 * The `select` and `upsertKey` functions are not expected to be stable, and new
 * function instances for these options will not restart the subscribe. On each render,
 * the socket will update the function reference.
 */
export function useRawSubscribe<T extends object, R = SubscribeRow<T>>({
  subscribe,
  ...options
}: UseSubscribeOptions<T, R>): UseSubscribeReturn<R> {
  const httpAddress = useCurrentEnvironmentHttpAddress();
  const request = useSubscribeRequest(subscribe);
  const subscribeRef = React.useRef(
    new RawSubscribe<T, R>({
      request,
      httpAddress,
      sessionVariables: {
        cluster: options?.clusterName,
      },
      closeSocketOnComplete: options?.closeSocketOnComplete,
      select: options?.select,
    }),
  );
  useAutomaticallyConnectSocket<T, R>({
    socket: subscribeRef.current.socket,
    subscribe: subscribeRef.current,
    request,
    sessionVariables: {
      cluster: options?.clusterName,
    },
  });

  const { data, error, snapshotComplete } = React.useSyncExternalStore(
    subscribeRef.current.onChange,
    subscribeRef.current.getSnapshot,
  );

  return {
    data,
    disconnect: subscribeRef.current.disconnect,
    reset: subscribeRef.current.reset,
    error,
    isError: Boolean(error),
    snapshotComplete,
  };
}

function useSubscribeRequest(subscribe: RawBuilder<unknown> | undefined) {
  return React.useMemo(() => {
    if (!subscribe) return undefined;

    const compiled = subscribe.compile(queryBuilder);
    return {
      queries: [
        {
          query: compiled.sql,
          params: compiled.parameters as string[],
        },
      ],
    };
  }, [subscribe]);
}
