import { differenceInMinutes, subMinutes } from "date-fns";
import { InferResult, sql } from "kysely";
import React, { useMemo } from "react";

import { escapedLiteral as lit, queryBuilder } from "~/api/materialize";
import { Cluster } from "~/api/materialize/cluster/clusterList";
import { buildClusterReplicaUtilizationTable } from "~/api/materialize/expressionBuilders";
import {
  buildSubscribeQuery,
  useRawSubscribe,
} from "~/api/materialize/useSubscribe";
import {
  DataPoint,
  OfflineEvent,
  ReplicaData,
} from "~/platform/clusters/ClusterOverview/types";
import { nowUTC } from "~/util";

export interface ReplicaUtilization extends Record<string, unknown> {
  id: string;
  name: string;
  size: string | null;
  timestamp: number;
  cpuPercent: number | null;
  memoryPercent: number | null;
  diskPercent: number | null;
  offlineReason: string | null;
  status: string;
}

export function buildClusterUtilizationQuery({
  clusterId,
  replicaId,
  mzClusterReplicaUtilization = "mz_cluster_replica_utilization",
}: {
  clusterId: string;
  replicaId?: string;
  mzClusterReplicaUtilization?: "mz_cluster_replica_utilization";
}) {
  let utilizationQuery = queryBuilder
    .selectFrom("mz_cluster_replicas as cr")
    .innerJoin("mz_cluster_replica_statuses as crs", (join) =>
      join.onRef("crs.replica_id", "=", "cr.id").on("crs.process_id", "=", "0"),
    )
    .innerJoin(
      buildClusterReplicaUtilizationTable({ mzClusterReplicaUtilization }).as(
        "cru",
      ),
      (join) => join.onRef("cr.id", "=", "cru.replica_id"),
    )
    .where("cr.cluster_id", "=", clusterId)
    .select([
      "cr.id",
      "cr.name",
      "cr.size",
      "cru.cpu_percent as cpuPercent",
      "cru.memory_percent as memoryPercent",
      "cru.disk_percent as diskPercent",
      "crs.reason as offlineReason",
      "crs.status",
    ]);
  if (replicaId) {
    utilizationQuery = utilizationQuery.where("crs.replica_id", "=", replicaId);
  }
  return utilizationQuery;
}

export type ClusterUtilizationResult = InferResult<
  ReturnType<typeof buildClusterUtilizationQuery>
>;

export function buildClusterUtilizationSubscribe({
  initialStartTime,
  clusterId,
  replicaId,
  mzClusterReplicaUtilization = "mz_cluster_replica_utilization",
}: {
  initialStartTime: Date;
  clusterId: string;
  replicaId?: string;
  mzClusterReplicaUtilization?: "mz_cluster_replica_utilization";
}) {
  const utilizationQuery = buildClusterUtilizationQuery({
    clusterId,
    replicaId,
    mzClusterReplicaUtilization,
  });
  const subscribeQuery =
    sql`SUBSCRIBE (${utilizationQuery}) WITH (PROGRESS) AS OF AT LEAST TIMESTAMP ${lit(
      new Date(initialStartTime).toISOString(),
    )} ENVELOPE UPSERT (KEY (id))`.compile(queryBuilder);

  return subscribeQuery;
}

/**
 * This hook gets historical replica utilization data via SUBSCRIBE. This will
 * be replaced by useReplicaUtilizationHistory once its data source, mz_internal.mz_cluster_replica_metric_history has
 * 30 days of data
 */
const useClusterUtilization = (
  {
    clusterId,
    replicaId,
    timePeriodMinutes,
    bucketSizeMs,
    cluster,
  }: {
    cluster?: Cluster;
    clusterId: string | undefined;
    replicaId?: string;
    timePeriodMinutes: number;
    bucketSizeMs: number;
  },
  { subscribeEnabled }: { subscribeEnabled?: boolean },
) => {
  // When we first set the time period, we pick the current time as the end time for the
  // graph, then work backwards to calculate the initialStartTime based on the selected
  // time period.
  const initialEndTime = React.useMemo(() => new Date().getTime(), []);

  const subscribe = React.useMemo(() => {
    if (!clusterId || !subscribeEnabled) return;

    return buildSubscribeQuery(
      buildClusterUtilizationQuery({
        clusterId,
        replicaId,
      }),
      {
        upsertKey: "id",
        asOfAtLeast: subMinutes(initialEndTime, timePeriodMinutes),
      },
    );
  }, [
    clusterId,
    initialEndTime,
    replicaId,
    timePeriodMinutes,
    subscribeEnabled,
  ]);

  const {
    reset,
    data: rawData,
    snapshotComplete,
    error,
  } = useRawSubscribe<ClusterUtilizationResult[0]>({ subscribe });

  const rawReplicaUtilization = rawData
    .filter((row) => {
      // We keep messages that have a null state to keep progress messages.
      // We also filter out 'key_violation' and 'delete' messages since we get null values
      // for all fields in a row except for the upsert key. We also want to exclude these events from our graphs.
      return row.mzState === "upsert" || row.mzState === null;
    })
    .map((row) => ({
      timestamp: row.mzTimestamp,
      ...row.data,
    }));

  // This is the current end time of the graph
  const currentEndTime = React.useMemo(() => {
    if (rawReplicaUtilization) {
      const newestData = new Date(
        rawReplicaUtilization[rawReplicaUtilization.length - 1]?.timestamp,
      );
      if (new Date(initialEndTime) < newestData) {
        return new Date(
          rawReplicaUtilization[rawReplicaUtilization.length - 1]?.timestamp,
        );
      }
    }
    return new Date(initialEndTime);
  }, [rawReplicaUtilization, initialEndTime]);

  // This is the current start time of the graph
  const currentStartTime = React.useMemo(() => {
    return subMinutes(currentEndTime, timePeriodMinutes);
  }, [timePeriodMinutes, currentEndTime]);

  // An array of timestamps that represents the buckets we actually display on the graph
  const buckets = React.useMemo(() => {
    const startTimestamp = currentStartTime.getTime();
    const result = [];
    let currentBucket = startTimestamp;
    while (currentBucket < currentEndTime.getTime()) {
      result.push(currentBucket);
      currentBucket += bucketSizeMs;
    }
    return result;
  }, [bucketSizeMs, currentStartTime, currentEndTime]);

  type ReplicaId = string;
  type Timestamp = number;
  type ReplicaMap = Map<ReplicaId, ReplicaUtilization[]>;

  // We don't want to render every datapoint because it doesn't perform well on large
  // timespans so we bucket the data. We decide on the bucket size by the total timespan
  // we are displaying so that it scales nicely.
  const graphData = React.useMemo(() => {
    if (!cluster) return undefined;

    // First we build a nested map of timestamps to replica and their data
    const bucketMap = new Map<Timestamp, ReplicaMap>();
    for (const datum of rawReplicaUtilization) {
      const bucket = buckets.find(
        (b) =>
          // greater than the start of the bucket, less than the end
          datum.timestamp >= b && datum.timestamp <= b + bucketSizeMs,
      );
      if (!bucket || !datum.id) {
        continue;
      }
      const replicaMap = bucketMap.get(bucket);

      if (replicaMap) {
        const replicaBucket = replicaMap.get(datum.id);
        if (replicaBucket) {
          replicaBucket.push(datum);
        } else {
          replicaMap.set(datum.id, [datum]);
        }
      } else {
        bucketMap.set(bucket, new Map([[datum.id, [datum]]]));
      }
    }
    const chartLinesMap = new Map<string, DataPoint[]>();
    // Now we are ready to pull out the actual data from each bucket we want to display
    // which is max value of each metric observed during that bucket.
    // We also keep track of all the not ready events so we can show exact timestamps in
    // the tooltip.
    for (const [bucket, replicaMap] of bucketMap.entries()) {
      for (const id of replicaMap.keys()) {
        const lineData: DataPoint[] = chartLinesMap.get(id) ?? [];
        const utilizations = replicaMap.get(id);
        if (!utilizations) {
          continue;
        }
        const name = utilizations[0].name;
        const size = utilizations[0].size;
        let maxCpu = utilizations[0].cpuPercent;
        let maxMemory = utilizations[0].memoryPercent;
        let maxDisk = utilizations[0].diskPercent;
        let offlineReason = utilizations[0].offlineReason;
        const offlineEvents: OfflineEvent[] = [];
        for (const value of utilizations) {
          if (!maxCpu || (value.cpuPercent && value.cpuPercent > maxCpu)) {
            maxCpu = value.cpuPercent;
          }
          if (
            !maxMemory ||
            (value.memoryPercent && value.memoryPercent > maxMemory)
          ) {
            maxMemory = value.memoryPercent;
          }
          if (
            !maxDisk ||
            (value.memoryPercent && value.memoryPercent > maxDisk)
          ) {
            maxDisk = value.diskPercent;
          }
          if (!offlineReason && value.offlineReason) {
            offlineReason = value.offlineReason;
          }
          if (value.status === "not-ready" || value.status === "offline") {
            offlineEvents.push({
              id,
              offlineReason: value.offlineReason,
              status: value.status,
              timestamp: value.timestamp,
            });
          }
        }
        const bucketValue: DataPoint = {
          id,
          name,
          size,
          bucketStart: bucket,
          bucketEnd: bucket + bucketSizeMs,
          cpuPercent: maxCpu,
          memoryPercent: maxMemory,
          diskPercent: maxDisk,
          offlineEvents: offlineEvents.reverse(),
        };
        lineData.push(bucketValue);
        chartLinesMap.set(id, lineData);
      }
    }
    const chartData: ReplicaData[] = [];
    for (const [id, data] of chartLinesMap) {
      chartData.push({ id, data });
    }
    return chartData;
  }, [bucketSizeMs, buckets, cluster, rawReplicaUtilization]);

  const offlineEvents = React.useMemo(() => {
    const result: Array<OfflineEvent> = [];
    if (!rawReplicaUtilization) return result;
    const now = nowUTC();
    for (const {
      id,
      status,
      offlineReason,
      timestamp,
    } of rawReplicaUtilization) {
      if (
        (status === "not-ready" || status === "offline") &&
        offlineReason !== "oom-killed" &&
        differenceInMinutes(now, timestamp) <= timePeriodMinutes
      ) {
        result.push({
          id,
          offlineReason,
          status,
          timestamp,
        });
      }
    }
    return result;
  }, [rawReplicaUtilization, timePeriodMinutes]);

  const data = useMemo(() => {
    return {
      startDate: currentStartTime,
      endDate: currentEndTime,
      offlineEvents,
      graphData,
    };
  }, [graphData, currentEndTime, currentStartTime, offlineEvents]);

  // Because sockets often disconnect for all sorts of reasons
  // we really only want to show an error if we actually failed to load data
  const showSocketError = error && !snapshotComplete;

  // The snapshot includes all the historical data, but we can start graphing it once we
  // have any data. In the case there is no data to show, snapshotComplete will be true,
  // even though data is empty.
  const graphDataLoaded =
    (graphData && graphData.length > 0) || snapshotComplete;

  return {
    reset,
    isLoading: !graphDataLoaded,
    isError: showSocketError,
    data,
  };
};

export default useClusterUtilization;
