import { useSuspenseQuery } from "@tanstack/react-query";
import { subDays } from "date-fns";

import {
  buildQueryKeyPart,
  buildRegionQueryKey,
} from "~/api/buildQueryKeySchema";
import { fetchReplicaUtilizationHistory } from "~/api/materialize/cluster/replicaUtilizationHistory";
import {
  ClusterReplicaDetails,
  fetchClusterDetails,
} from "~/api/materialize/environment-overview/clusterDetails";

import {
  BucketMap,
  BucketWithUtilizationData,
  calculateClusterCategory,
  calculateClusterStatus,
  calculateMemDiskUtilizationStatus,
  calculatePeakMemDiskUtilization,
  initializeBuckets,
  MemDiskUtilizationStatus,
} from "./utils";

const environmentOverviewQueryKeys = {
  all: () => buildRegionQueryKey("environmentOverview"),
  clusterMemDiskUtilization: () =>
    [
      ...environmentOverviewQueryKeys.all(),
      buildQueryKeyPart("clusterMemDiskUtilization"),
    ] as const,
};

const UTILIZATION_LOOKBACK_DAYS = 14;
const EIGHT_HOURS_IN_MS = 8 * 60 * 60 * 1000;

export function useClusterMemDiskUtilization() {
  return useSuspenseQuery({
    queryKey: environmentOverviewQueryKeys.clusterMemDiskUtilization(),
    queryFn: async ({ queryKey, signal }) => {
      const endDate = new Date();

      const startDate = subDays(endDate, UTILIZATION_LOOKBACK_DAYS);

      const [replicaDetailsRes, replicaUtilizationHistoryRes] =
        await Promise.all([
          fetchClusterDetails({
            queryKey,
            requestOptions: { signal },
          }),
          fetchReplicaUtilizationHistory({
            params: {
              bucketSizeMs: EIGHT_HOURS_IN_MS,
              startDate: startDate.toISOString(),
            },
            queryKey,
            requestOptions: { signal },
          }),
        ]);

      const clusterDetailsById = replicaDetailsRes.rows.reduce((accum, row) => {
        accum.set(row.clusterId, row);
        return accum;
      }, new Map<string, ClusterReplicaDetails>());

      const clusterBucketsMap = new Map<
        string,
        Map<number, BucketWithUtilizationData>
      >();

      // For each bucket per replica, we aggregate the peak memory and disk utilization
      // by each replica's cluster
      for (const replicaId in replicaUtilizationHistoryRes.bucketsByReplicaId) {
        const buckets =
          replicaUtilizationHistoryRes.bucketsByReplicaId[replicaId];

        if (buckets.length === 0) {
          continue;
        }

        for (const bucket of buckets) {
          const clusterDetails = clusterDetailsById.get(bucket.clusterId);
          // Skip buckets whose clusters don't belong in the current list.
          if (!clusterDetails) {
            continue;
          }

          let extantClusterBuckets = clusterBucketsMap.get(bucket.clusterId);

          // If a cluster doesn't have buckets yet, initialize it with an empty map
          if (!extantClusterBuckets) {
            extantClusterBuckets = new Map();
            clusterBucketsMap.set(bucket.clusterId, extantClusterBuckets);
          }

          const bucketStartTs = bucket.bucketStart.getTime();

          const clusterCategory = calculateClusterCategory({
            numSources: Number(clusterDetails.numSources),
            numSinks: Number(clusterDetails.numSinks),
            numIndexes: Number(clusterDetails.numIndexes),
            numMaterializedViews: Number(clusterDetails.numMaterializedViews),
          });

          const peakUtilization = calculatePeakMemDiskUtilization({
            category: clusterCategory,
            bucket,
          });

          const extantBucket = extantClusterBuckets.get(bucketStartTs);

          // For a cluster with many replicas, if the current replica has higher utilization
          // than the current replica in clusterBucketsMap, we override it with the current replica.
          if (
            !extantBucket ||
            (extantBucket.peakMemDiskUtilizationPercent ?? 0) <
              (peakUtilization.peakMemDiskUtilizationPercent ?? 0)
          ) {
            const oomEvents =
              bucket.offlineEvents?.filter(
                ({ reason }) => reason === "oom-killed",
              ) ?? [];

            extantClusterBuckets.set(bucketStartTs, {
              status:
                oomEvents.length > 0
                  ? "underProvisioned"
                  : calculateMemDiskUtilizationStatus({
                      peakMemDiskUtilizationPercent:
                        peakUtilization.peakMemDiskUtilizationPercent,
                      thresholdPercentages:
                        peakUtilization.thresholdPercentages,
                    }),
              oomEvents,
              peakMemDiskUtilizationPercent:
                peakUtilization.peakMemDiskUtilizationPercent,
              memoryPercent: peakUtilization.memoryPercent,
              diskPercent: peakUtilization.diskPercent,
              occurredAt: peakUtilization.occurredAt,
              thresholdPercents: peakUtilization.thresholdPercentages,
              bucketStart: bucket.bucketStart,
              bucketEnd: bucket.bucketEnd,
            });
          }
        }
      }

      const res = new Map<
        string,
        {
          clusterId: string;
          clusterName: string;
          status: MemDiskUtilizationStatus;
          buckets: BucketMap;
        }
      >();

      for (const [clusterId, buckets] of clusterBucketsMap.entries()) {
        const clusterDetails = clusterDetailsById.get(clusterId);

        if (!clusterDetails) {
          continue;
        }

        const bucketsWithGapsFilled: BucketMap = initializeBuckets({
          startMs: startDate.getTime(),
          endMs: endDate.getTime(),
          minBucketStartMs: replicaUtilizationHistoryRes.minBucketStartMs,
          maxBucketEndMs: replicaUtilizationHistoryRes.maxBucketEndMs,
          bucketSizeMs: EIGHT_HOURS_IN_MS,
        });

        for (const bucket of buckets.values()) {
          const bucketStartTs = bucket.bucketStart.getTime();
          // If there exists a bucket for that timestamp, we update it
          if (bucketsWithGapsFilled.has(bucketStartTs)) {
            bucketsWithGapsFilled.set(bucketStartTs, bucket);
          }
        }

        res.set(clusterId, {
          clusterId,
          clusterName: clusterDetails.clusterName,
          status: calculateClusterStatus(bucketsWithGapsFilled),
          buckets: bucketsWithGapsFilled,
        });
      }

      return res;
    },
  });
}
