import { QueryKey } from "@tanstack/react-query";
import { InferResult, sql } from "kysely";

import { executeSqlV2, queryBuilder } from "..";

export const COLLECTION_INTERVAL_MS = 60_000;

/**
 * Returns a query for the statistics of a single source.
 */
export function buildSourceStatisticsQuery({ sourceId }: { sourceId: string }) {
  return queryBuilder
    .with("subsources", (qb) => {
      return qb
        .selectFrom("mz_sources as s")
        .innerJoin(
          "mz_object_dependencies as od",
          "referenced_object_id",
          "s.id",
        )
        .where("s.id", "=", sourceId)
        .where("s.type", "<>", "progress")
        .select(["od.object_id as id", "s.id as sourceId"]);
    })
    .with("sources", (qb) =>
      qb
        .selectFrom("mz_sources as s")
        .where("s.id", "=", sourceId)
        .select(["s.id", "s.id as sourceId"]),
    )
    .with("combined_sources", (qb) =>
      qb
        .selectFrom("subsources")
        .unionAll((eb) => eb.selectFrom("sources").select(["id", "sourceId"]))
        .select(["id", "sourceId"]),
    )
    .selectFrom("combined_sources as s")
    .innerJoin(`mz_source_statistics_with_history as ss`, "ss.id", "s.id")
    .select((eb) => [
      "s.sourceId as id",
      eb.fn.sum<number>("messages_received").as("messagesReceived"),
      eb.fn.sum<number>("bytes_received").as("bytesReceived"),
      eb.fn.sum<number>("updates_staged").as("updatesStaged"),
      eb.fn.sum<number>("updates_committed").as("updatesCommitted"),
      eb.fn.sum<number>("snapshot_records_known").as("snapshotRecordsKnown"),
      eb.fn.sum<number>("snapshot_records_staged").as("snapshotRecordsStaged"),
      eb
        .case()
        .when(
          eb.or([
            eb(eb.fn.max("offset_known"), "is", null),
            eb(eb.fn.max("offset_committed"), "is", null),
          ]),
        )
        .then(null)
        .else(
          sql<number>`greatest(max("offset_known")::numeric - max("offset_committed")::numeric, 0)`,
        )
        .end()
        .as("offsetDelta"),
      eb
        .case()
        .when(sql`bool_or("rehydration_latency" IS NULL)`)
        .then(null)
        .else(eb.fn.max("rehydration_latency"))
        .end()
        .as("rehydrationLatency"),
    ])
    .groupBy("s.sourceId");
}

export type SourceStatisticsDataPoint = InferResult<
  ReturnType<typeof buildSourceStatisticsQuery>
>[0];

export type Source = InferResult<
  ReturnType<typeof buildSourceStatisticsQuery>
>[0];

export async function fetchSourceStatistics(
  queryKey: QueryKey,
  filters: { sourceId: string },
  requestOptions: RequestInit,
) {
  const compiledQuery = buildSourceStatisticsQuery(filters).compile();
  return executeSqlV2({
    queries: compiledQuery,
    queryKey: queryKey,
    requestOptions,
  });
}
