import { useMemo } from "react";

import type { ClusterReplicaDetails } from "~/api/materialize/environment-overview/clusterReplicaDetails";
import {
  buildClusterStorageUtilizationSubscribeQuery,
  ClusterStorageUtilization,
} from "~/api/materialize/environment-overview/clusterStorageUtilization";
import { SubscribeRow } from "~/api/materialize/RawSubscribe";
import { useRawSubscribe } from "~/api/materialize/useSubscribe";

import { useClusterReplicaDetails } from "./queries";

// Console-specific categories of a replica. Compute implies the replica has compute objects only, storage likewise, and hybrid means both.
type ReplicaCategory = "compute" | "storage" | "hybrid" | "empty";

export type UtilizationStatus =
  | "overProvisioned"
  | "optimal"
  | "suboptimal"
  | "underProvisioned";

export type ThresholdPercentages = {
  // We call a replica overProvisioned if < thresholdPercentages.overProvisioned
  overProvisioned: number;
  // We call a replica optimal if < thresholdPercentages.optimal
  optimal: number;
  // We call a replica optimal if < thresholdPercentages.suboptimal and underProvisioned if above
  suboptimal: number;
};

function calculateReplicaCategory({
  numSources,
  numSinks,
  numIndexes,
  numMaterializedViews,
}: {
  numSources: ClusterReplicaDetails["numSources"];
  numSinks: ClusterReplicaDetails["numSinks"];
  numIndexes: ClusterReplicaDetails["numIndexes"];
  numMaterializedViews: ClusterReplicaDetails["numMaterializedViews"];
}) {
  const hasStorageObjects = (numSources ?? 0) > 0 || (numSinks ?? 0) > 0;
  const hasComputeObjects =
    (numIndexes ?? 0) > 0 || (numMaterializedViews ?? 0) > 0;
  const hasComputeAndStorageObjects = hasStorageObjects && hasComputeObjects;

  return hasComputeAndStorageObjects
    ? "hybrid"
    : hasStorageObjects
      ? "storage"
      : hasComputeObjects
        ? "compute"
        : "empty";
}

export function calculateUtilizationStatus({
  thresholdPercentages,
  peakStorageUtilizationPercent,
}: {
  thresholdPercentages: ThresholdPercentages;
  peakStorageUtilizationPercent: number;
}): UtilizationStatus {
  return peakStorageUtilizationPercent < thresholdPercentages.overProvisioned
    ? "overProvisioned"
    : peakStorageUtilizationPercent < thresholdPercentages.optimal
      ? "optimal"
      : peakStorageUtilizationPercent < thresholdPercentages.suboptimal
        ? "suboptimal"
        : "underProvisioned";
}

function calculatePeakStorageUtilization(params: {
  replicaCategory: ReplicaCategory;
  memoryBytes: ClusterStorageUtilization["memoryBytes"];
  totalMemoryBytes: ClusterStorageUtilization["totalMemoryBytes"];
  diskBytes: ClusterStorageUtilization["diskBytes"];
  totalDiskBytes: ClusterStorageUtilization["totalDiskBytes"];
}) {
  const { replicaCategory } = params;
  // Can safely cast these to numbers given storage will never be above uint8
  const totalMemoryBytes = Number(params.totalMemoryBytes);
  const totalDiskBytes = Number(params.totalDiskBytes);
  const memoryBytes = Number(params.memoryBytes);
  const diskBytes = Number(params.diskBytes);

  let peakStorageUtilizationPercent = 0;

  const thresholdPercentages: ThresholdPercentages = {
    overProvisioned: Number.NEGATIVE_INFINITY,
    optimal: 1.0,
    suboptimal: Number.POSITIVE_INFINITY,
  };
  if (replicaCategory === "hybrid" || replicaCategory === "compute") {
    // For hybrid and compute clusters, we use the percentage of the current disk and memory
    // relative to the total memory of a cluster. If over 100%, it means the replica is
    // spilling to disk. Too much spilling to disk is unwanted for compute clusters since it means
    // performance degradation, but it's satisfactory that a cluster will spill to disk during hydration/rehydration        .
    const maxUtilizationPercent =
      (totalMemoryBytes + totalDiskBytes) / totalMemoryBytes;
    peakStorageUtilizationPercent =
      (diskBytes + memoryBytes) / totalMemoryBytes;
    // Since peakStorageUtilizationPercent can be above 100,
    // we normalize it such that it fits on a scale from 0 to 100
    peakStorageUtilizationPercent =
      peakStorageUtilizationPercent / maxUtilizationPercent;

    // Means we haven't spilled to disk yet and only use memory
    thresholdPercentages.overProvisioned = 0.4 / maxUtilizationPercent;
    // Means we're close to spilling to disk and only use memory
    thresholdPercentages.optimal = 0.9 / maxUtilizationPercent;
    // We use 180% as our "satistfactory" threshold through analyzing different customer environments
    thresholdPercentages.suboptimal = 1.8 / maxUtilizationPercent;
  } else if (replicaCategory === "storage" || replicaCategory === "empty") {
    // For storage and empty clusters, we care about the max of memory or disk.
    const memoryUsedPercentage = memoryBytes / totalMemoryBytes;
    const diskUsedPercentage = diskBytes / totalDiskBytes;
    peakStorageUtilizationPercent = Math.max(
      memoryUsedPercentage,
      diskUsedPercentage,
    );

    thresholdPercentages.overProvisioned = 0.3;
    thresholdPercentages.optimal = 0.7;
    thresholdPercentages.suboptimal = 0.85;
  }

  return {
    peakStorageUtilizationPercent,
    thresholdPercentages,
  };
}

type ClusterStorageUtilizationSummaryMap = Map<
  string,
  {
    peakStorageUtilizationPercent: number;

    thresholdPercentages: ThresholdPercentages;

    replicaId: ClusterStorageUtilization["replicaId"];
    replicaName: ClusterStorageUtilization["replicaName"];
    clusterId: ClusterStorageUtilization["clusterId"];
    clusterName: ClusterReplicaDetails["clusterName"];
    lastRestartAt?: ClusterStorageUtilization["statusUpdatedAt"];
    numClusterReplicas: ClusterReplicaDetails["numClusterReplicas"];
    replicaCategory: ReplicaCategory;

    memoryBytes: ClusterStorageUtilization["memoryBytes"];
    diskBytes: ClusterStorageUtilization["diskBytes"];
    totalMemoryBytes: ClusterStorageUtilization["totalMemoryBytes"];
    totalDiskBytes: ClusterStorageUtilization["totalDiskBytes"];
    occurredAt: Date;
  }
>;
/**
 *
 * @param clusterStorageUtilizationSubscribeLog - A historical subscribe upsert log of replicas and its storage information over time
 * @param clusterReplicaDetailsByReplicaId - A map of current extant replica IDs to its details
 *
 * @returns - A map of replica IDs to its details and its 'peak storage metric' over time. The peak storage
 * metric depends on if it's a cluster with just sources/sinks, just compute objects, or a mix.
 */
function buildClusterStorageUtilizationSummaryMap(
  clusterStorageUtilizationSubscribeLog: SubscribeRow<ClusterStorageUtilization>[],
  clusterReplicaDetailsByReplicaId: Map<string, ClusterReplicaDetails>,
): ClusterStorageUtilizationSummaryMap {
  const clusterStorageUtilizationSummaryMap: ClusterStorageUtilizationSummaryMap =
    new Map();

  // Data is sorted by oldest to newest
  clusterStorageUtilizationSubscribeLog.forEach((row) => {
    if (row.mzProgressed || row.mzState !== "upsert") {
      return;
    }

    const {
      status,
      replicaId,
      replicaName,
      clusterId,
      totalDiskBytes,
      totalMemoryBytes,
      memoryBytes,
      diskBytes,
    } = row.data;

    const replicaDetails = clusterReplicaDetailsByReplicaId.get(replicaId);

    if (replicaDetails === undefined) {
      // If the current row doesn't exist in replicaDetails, it means the replica doesn't currently exist
      return;
    }

    const {
      clusterName,
      numClusterReplicas,
      numIndexes,
      numMaterializedViews,
      numSinks,
      numSources,
    } = replicaDetails;

    const prevStorageUtilization =
      clusterStorageUtilizationSummaryMap.get(replicaId);

    const replicaCategory = calculateReplicaCategory({
      numSinks,
      numSources,
      numIndexes,
      numMaterializedViews,
    });

    const { thresholdPercentages, peakStorageUtilizationPercent } =
      calculatePeakStorageUtilization({
        replicaCategory,
        totalDiskBytes,
        totalMemoryBytes,
        memoryBytes,
        diskBytes,
      });

    const isPeakStorageUtilization =
      peakStorageUtilizationPercent >
      (prevStorageUtilization?.peakStorageUtilizationPercent ?? -1);

    const isReplicaRestart = status === "not-ready";

    const lastRestartAt = isReplicaRestart
      ? row.data.statusUpdatedAt
      : prevStorageUtilization?.lastRestartAt;

    if (
      prevStorageUtilization === undefined ||
      isPeakStorageUtilization ||
      isReplicaRestart
    ) {
      clusterStorageUtilizationSummaryMap.set(replicaId, {
        replicaId,
        replicaName,
        replicaCategory,
        thresholdPercentages,
        clusterId,
        clusterName,
        lastRestartAt,
        peakStorageUtilizationPercent,
        numClusterReplicas,
        memoryBytes,
        diskBytes,
        totalMemoryBytes,
        totalDiskBytes,
        occurredAt: new Date(row.mzTimestamp),
      });
    }
  });

  return clusterStorageUtilizationSummaryMap;
}

/**
 * Initiates a websocket connection and returns an object of
 * replicas and their peak disk and memory info since the replica last restarted. We use restarts
 * as a best guess to when the replica reached its peak and we use the peak to recommend
 * whether they should size up/down their cluster.
 */
const useClusterStorageUtilization = () => {
  // We need to have a separate fetch for the details since some tables,
  // such as mz_clusters, aren't retained through history. Thus the subscribe would
  // not work on a join against it.
  const {
    data: clusterReplicaDetails,
    isLoading: isClusterReplicaDetailsLoading,
    isError: isClusterReplicaDetailsError,
  } = useClusterReplicaDetails();

  const {
    data: storageUtilizationData,
    snapshotComplete: storageUtilizationSnapshotComplete,
    error: storageUtilizationError,
  } = useRawSubscribe<ClusterStorageUtilization>({
    subscribe: useMemo(
      () => buildClusterStorageUtilizationSubscribeQuery(),
      [],
    ),
  });

  const isStorageUtilizationError = !!storageUtilizationError;

  const data = useMemo(() => {
    if (
      clusterReplicaDetails === undefined ||
      !storageUtilizationSnapshotComplete ||
      isStorageUtilizationError
    ) {
      return undefined;
    }

    return buildClusterStorageUtilizationSummaryMap(
      storageUtilizationData,
      clusterReplicaDetails,
    );
  }, [
    clusterReplicaDetails,
    storageUtilizationData,
    storageUtilizationSnapshotComplete,
    isStorageUtilizationError,
  ]);

  return {
    data,
    isLoading:
      !storageUtilizationSnapshotComplete || isClusterReplicaDetailsLoading,
    isError: isStorageUtilizationError || isClusterReplicaDetailsError,
  };
};

export type ClusterStorageUtilizationResult = ReturnType<
  typeof useClusterStorageUtilization
>;

export default useClusterStorageUtilization;
