import { QueryKey } from "@tanstack/react-query";
import { InferResult, sql } from "kysely";

import {
  executeSqlV2,
  IPostgresInterval,
  queryBuilder,
} from "~/api/materialize";

export const ALLOWED_OBJECT_TYPES = [
  "index" as const,
  "materialized-view" as const,
  "sink" as const,
  "source" as const,
  "table" as const,
];

export type AllowedObjects = (typeof ALLOWED_OBJECT_TYPES)[0];

export type MaterializationLagParams = {
  objectIds?: string[];
};

export const OUTDATED_THRESHOLD_SECONDS = 10;

/*
  This query calculates the "lag", the time difference between frontiers, between
  target objects and their upstream dependencies.
*/
export const buildMaterializationLagQuery = ({
  objectIds,
  mzNow,
  mzObjects = "mz_objects",
  mzMaterializationLag = "mz_materialization_lag",
  mzFrontiers = "mz_frontiers",
  mzHydrationStatuses = "mz_hydration_statuses",
}: Required<MaterializationLagParams> & {
  mzNow?: number;
  mzObjects?: "mz_objects";
  mzMaterializationLag?: "mz_materialization_lag";
  mzFrontiers?: "mz_frontiers";
  mzHydrationStatuses?: "mz_hydration_statuses";
}) => {
  const mzNowExpr = sql.raw(mzNow ? `${mzNow}::mz_timestamp` : "mz_now()");
  const query = queryBuilder
    .with("materializationLag", (qb) =>
      qb
        .selectFrom(
          qb
            .selectFrom(mzObjects)
            .selectAll()
            .where("id", "in", objectIds)
            .as("objects"),
        )
        .innerJoin(
          qb.selectFrom(mzFrontiers).selectAll().as("frontiers"),
          "frontiers.object_id",
          "objects.id",
        )
        .leftJoin(
          qb.selectFrom(mzHydrationStatuses).selectAll().as("hs"),
          "hs.object_id",
          "objects.id",
        )
        .leftJoin(
          qb.selectFrom(mzMaterializationLag).selectAll().as("ml"),
          "ml.object_id",
          "objects.id",
        )
        .select((eb) => [
          "objects.id as targetObjectId",
          "objects.type",
          "hs.hydrated",
          // We convert mz timestamps to timestamps since there are no arithmetic operators for the mz_timestamp type
          eb
            .val(
              sql`
              greatest(
                to_timestamp(${mzNowExpr}::text::double / 1000) -
                to_timestamp(${eb.ref("frontiers.write_frontier")}::text::double / 1000),
                INTERVAL '0'
              )`,
            )
            .$castTo<IPostgresInterval>()
            .as("lagFromMzNow"),
          eb
            .val(
              sql`
              greatest(
                to_timestamp(${mzNowExpr}::text::double / 1000)
                - (
                  to_timestamp(${eb.ref("frontiers.write_frontier")}::text::double / 1000) +
                  ${eb.ref("ml.local_lag")}
                ),
                INTERVAL '0'
              )`,
            )
            .$castTo<IPostgresInterval>()
            .as("greatestLagFromMzNowOfLocalInputs"),
          "ml.global_lag as globalLag",
          "ml.slowest_global_input_id as slowestRootObjectId",
          "ml.slowest_local_input_id as slowestLocalObjectId",
          "ml.local_lag as localLag",
        ]),
    )
    .selectFrom("materializationLag")
    .selectAll()
    .select((eb) => [
      eb(
        "lagFromMzNow",
        ">=",
        sql<IPostgresInterval>`INTERVAL '${sql.lit(OUTDATED_THRESHOLD_SECONDS)} seconds'`,
      ).as("isOutdated"),
      // We need to know if an object's slowest local input is outdated to differentiate
      // if an object is lagging or waiting
      eb(
        "greatestLagFromMzNowOfLocalInputs",
        ">=",
        sql<IPostgresInterval>`INTERVAL '${sql.lit(OUTDATED_THRESHOLD_SECONDS)} seconds'`,
      ).as("isSlowestLocalInputOutdated"),
    ]);

  return query;
};

export type LagResults = InferResult<
  ReturnType<typeof buildMaterializationLagQuery>
>;
type LagResult = LagResults[0];

export type LagInfo = {
  /**
   * We define 'outdated' as when an object is behind the timestamp oracle
   * by 10 seconds. 10 seconds is arbitrary but a good guess as to when things have gone awry for an object. In the future,
   * we'd like to use lagFromMzNow as the globalLag. We can't do so right now because we can't guarantee the freshness of this
   * lag. This should be resolved by https://github.com/MaterializeInc/materialize/issues/26748.
   *
   */
  isOutdated: LagResult["isOutdated"];
  lagFromMzNow: LagResult["lagFromMzNow"];
  lagStatus: LagStatus;
  hydrated: LagResult["hydrated"];
  localLag: LagResult["localLag"];
  globalLag: LagResult["globalLag"];
  slowestLocalObjectId: LagResult["slowestLocalObjectId"];
  slowestRootObjectId: LagResult["slowestRootObjectId"];
};

export type LagMap = Map<string, LagInfo>;

export type LagStatus = "UP_TO_DATE" | "LAGGING" | "WAITING" | "NOT_HYDRATED";

function isLagging(lag: IPostgresInterval | null) {
  // if there is no lag value, an object has processed all it's input data
  if (!lag) return false;
  return (
    (lag.years ?? 0) > 0 ||
    (lag.months ?? 0) > 0 ||
    (lag.days ?? 0) > 0 ||
    (lag.hours ?? 0) > 0 ||
    (lag.minutes ?? 0) > 0 ||
    (lag.seconds ?? 0) > 0
  );
}

export function calculateLagStatus({
  localLag,
  globalLag,
  hydrated,
  isOutdated,
  isSlowestLocalInputOutdated,
}: LagResult): LagStatus {
  if (hydrated === false) {
    return "NOT_HYDRATED";
  }

  const isSlowestGlobalInputLagging = isLagging(globalLag);

  const isSlowestLocalinputLagging = isLagging(localLag);

  if (isSlowestGlobalInputLagging && isSlowestLocalinputLagging) {
    return "LAGGING";
  }
  if (isSlowestGlobalInputLagging && !isSlowestLocalinputLagging) {
    return "WAITING";
  }

  // isOutdated takes lower precedence when calculating the lag status since
  // the data to calculate outdatedness is approximate
  if (isSlowestLocalInputOutdated && isOutdated) {
    return "WAITING";
  }

  if (isOutdated) {
    return "LAGGING";
  }

  return "UP_TO_DATE";
}

export async function fetchMaterializationLag(
  { objectIds }: MaterializationLagParams,
  queryKey: QueryKey,
  requestOptions?: RequestInit,
) {
  if (!objectIds || objectIds.length === 0) return null;

  const compiledQuery = buildMaterializationLagQuery({ objectIds }).compile();
  return executeSqlV2({
    queries: compiledQuery,
    queryKey: queryKey,
    requestOptions,
  });
}

/*
  This query uses buildMaterializationLag to calculate, given a list of objects,
  if they're outdated or hydrated. It also returns the source type if it's a source.

*/
export const buildBlockedDependenciesQuery = ({
  objectIds,
}: Required<MaterializationLagParams>) => {
  const materializationLagQuery = buildMaterializationLagQuery({
    objectIds,
  });

  return queryBuilder
    .selectFrom(materializationLagQuery.as("materializationLag"))
    .leftJoin(
      "mz_sources as sources",
      "materializationLag.targetObjectId",
      "sources.id",
    )
    .leftJoin(
      "mz_indexes as indexes",
      "indexes.id",
      "materializationLag.targetObjectId",
    )
    .leftJoin(
      "mz_materialized_views as mvs",
      "mvs.id",
      "materializationLag.targetObjectId",
    )
    .leftJoin("mz_clusters as clusters", (join) =>
      join.on((eb) =>
        eb(
          "clusters.id",
          "=",
          eb.fn.coalesce("indexes.cluster_id", "mvs.cluster_id"),
        ),
      ),
    )
    .select([
      "materializationLag.targetObjectId",
      "materializationLag.isOutdated",
      "materializationLag.hydrated",
      "materializationLag.type",
      "sources.type as sourceType",
      "clusters.id as clusterId",
      "clusters.name as clusterName",
    ]);
};

type BlockedDependenciesResults = InferResult<
  ReturnType<typeof buildBlockedDependenciesQuery>
>;

export type BlockedDependencyInfo = BlockedDependenciesResults[0];

export async function fetchBlockedDependencies(
  { objectIds }: MaterializationLagParams,
  queryKey: QueryKey,
  requestOptions?: RequestInit,
) {
  if (!objectIds || objectIds.length === 0) return null;

  const compiledQuery = buildBlockedDependenciesQuery({ objectIds }).compile();
  return executeSqlV2({
    queries: compiledQuery,
    queryKey: queryKey,
    requestOptions,
  });
}
