import { NotFound } from './errors'
import { functions, Logger } from '@/util'
import * as g64 from '@/util/g64'
import { jolokiaService, mbeans, app, workerQuery } from '@/services'
import queryStatService from '@/services/queryStatService'
import queryPlanBuilderService from '@/services/queryPlanBuilderService'

const log = Logger.get('yb.query-details')

const EMPTY_QUERY = {
  application_name: null,
  authenticatedUserName: null,
  avg_cpu_percent: 0,
  cluster_id: null,
  cost: 0,
  cpu_stats: {},
  cpu_percent: 0,
  cpu_percent_avg: 0,
  cpu_percent_max: 0,
  database_name: null,
  dequeue_time: null,
  end_time: null,
  execution_cycles: 0,
  execution_time: null,
  first_run: null,
  host_name: null,
  io_network_bytes: 0,
  io_read_bytes: 0,
  io_read_count: 0,
  io_read_ms: 0,
  io_spill_read_bytes: 0,
  io_spill_space_bytes: 0,
  io_spill_write_bytes: 0,
  io_write_bytes: 0,
  io_write_count: 0,
  io_write_ms: 0,
  io_total_bytes: 0,
  last_run: null,
  lock_ms: 0,
  max_execution_cycles: 0,
  max_time: 0.511,
  mem_stats: {},
  memory_bytes: 0,
  memory_estimate_confidence: null,
  memory_estimate_diff_bytes: 0,
  memory_estimated_bytes: 0,
  memory_granted_bytes: 0,
  memory_required_bytes: 0,
  memory_total_bytes: 0,
  min_time: 0,
  next_id: null,
  parse_ms: 0,
  plan_ms: 0,
  assemble_ms: 0,
  compile_ms: 0,
  acquire_resources_ms: 0,
  run_ms: 0,
  client_ms: 0,
  cancel_ms: 0,
  plan_id: null,
  planning_ms: 0,
  pool_id: null,
  prepare_ms: 0,
  prior_id: 0,
  priority: 0,
  query_id: 0,
  query_plan_stats: {},
  query_plan_yb_original: null,
  query_plan_yb: null,
  query_text_long: null,
  query_text: null,
  queue_ms: 0,
  queue_time: null,
  requeue_status: null,
  rows_deleted: 0,
  rows_inserted: 0,
  rows_returned: 0,
  rows_scanned: 0,
  rows_total: 0,
  run_count: 0,
  runtime_execution_ms: 0,
  runtime_ms: 0,
  runtime_unscheduled_ms: 0,
  runtime_wait_ms: 0,
  session_id: 0,
  slot: 0,
  spool_ms: 0,
  start_time: null,
  status: 0,
  submit_time_ms: 0,
  submit_time: null,
  sum_time: 0,
  tableNames: null,
  tags: null,
  tenant_id: null,
  text_index: null,
  total_cluster_ms: 0,
  total_known_ms: 0,
  total_ms: 0,
  transaction_id: 0,
  type: null,
  user_name: null,
  wait_admission_ms: 0,
  wait_concurrency_ms: 0,
  worker_count: 0,
  worker_elapsed_ms: 0,
  worker_sync: 0,
  worker_worst_sync: 0,
  detail: {
    executions: [],
    statistics: [],
    queryAlerts: [],
    ruleEvents: [],
    hasLongText: true,
    nodesByType: {},
    nodes: [],
    queries: [],
    queryMaxExecutionTime: Number.MIN_VALUE,
    cpus: [],
    memory: [],
    executionTime: 0,
    executionTicks: 0,
    totalMem: 0,
    max_duration_ms: 0,
    cpuStats: [],
    memStats: []
  }
}
export { EMPTY_QUERY }

class QueryDetailService {
  getDetails(query_id, ybSystemDebug, historical) {
    query_id = parseInt(query_id, 10)

    // Setup empty query state.
    const query = Object.assign({}, EMPTY_QUERY, { query_id })

    // We can wait a bit for this data.
    jolokiaService.timeout(30 * 1000)

    // How to query.
    const executeQuery = sql =>
      jolokiaService.execute(mbeans.datasources, 'queryEx', ['yellowbrick', sql.join('\n'), 'ym', 0])

    // Only grab the query log in debug mode.
    const queryLogRetrieval = (!ybSystemDebug
      ? Promise.resolve({ rows: [] })
      : jolokiaService.execute(mbeans.queryLog, 'retrieve', [
        {
          where: {
            query_id: query.query_id || 0
          }
        }
      ])
    )
      .then((response) => {
        log.debug('Got query log response', response)

        // Store the statistics as a sorted array of stat_name, workerId.
        const statistics = response.rows.sort(function(s1, s2) {
          let result = app.i18NCompare(s1.stat_name, s2.stat_name)
          if (result === 0) {
            result = String(s1.id_worker).localeCompare(String(s2.id_worker))
          }
          return result
        })

        query.detail.statistics = Object.freeze(statistics)
      })
      .catch(response => app.errorResult(response))

    // Get the alerts.
    const queryAlertRetrieval = !historical
      ? Promise.resolve([])
      : executeQuery(
        [
          'select *',
          'from sys.log_query_alert',
        `where query_id = ${query_id || 0}`
        ])
        .catch(response => app.errorResult(response))
        .then((response) => {
          query.detail.queryAlerts = Object.freeze((response && response.rows) || [])
        })

    const queryDetailRetrieval = Promise.all([
      // Get the execution data.
      executeQuery(
        [
          'select',
          '*',
          ', (extract(epoch from submit_time) * 1000)::bigint as submit_time_ms',
          ', (extract(epoch from state_time) * 1000)::bigint as state_time_ms',
          ', (num_restart + num_error) as execution',
          historical ? 'from sys.log_query' : 'from sys.query',
            `where query_id = ${query_id || 0}`,
            'order by done_time desc',
            'limit 1'
        ])
        .then((response) => {
          log.debug('Got query response', response)

          if (!response.rows || !response.rows.length) {
            // Pop an error; not found.
            throw new NotFound()
          }

          // Process the response.
          query.plan_id = response.rows[0].plan_id
          for (const p in response.rows[0]) {
            if (!(p in query) || !query[p]) {
              query[p] = response.rows[0][p]
            }
          }

          // Do the time segmenting.
          const submit_time = query.submit_time_ms
          query.submit_time = new Date(submit_time)
          query.end_time = query.done_time
          query.memory_estimate_diff_bytes = query.memory_estimated_bytes - query.memory_bytes
          query.total_cluster_ms = query.run_ms
          query.total_known_ms =
            query.parse_ms +
            query.plan_ms +
            query.assemble_ms +
            query.compile_ms +
            query.acquire_resources_ms +
            query.run_ms +
            query.client_ms +
            query.cancel_ms +
            query.restart_ms
          query.total_wait_ms =
            query.wait_parse_ms +
            query.wait_lock_ms +
            query.wait_plan_ms +
            query.wait_assemble_ms +
            query.wait_compile_ms +
            query.wait_run_cpu_ms +
            query.wait_run_io_ms +
            query.wait_run_spool_ms +
            query.spool_ms +
            query.wait_client_ms +
            query.wlm_runtime_ms
          query.io_total_bytes =
            (query.io_read_bytes || 0) +
            (query.io_write_bytes || 0) +
            (query.io_spill_read_bytes || 0) +
            (query.io_spill_write_bytes || 0)
          query.rows_total =
            (query.rows_returned || 0) +
            (query.rows_inserted || 0) +
            (query.rows_deleted || 0)

          return Promise.all([
            // Get the query plan.
            queryStatService.getQueryPlan(query.plan_id, historical).then((plan) => {
              query.query_plan_yb = plan
              query.query_plan_yb_original = Object.assign({}, plan)
            }),

            // Get the next query that has started before us.
            (!historical || query.num_restart === 0
              ? Promise.resolve()
              : executeQuery(
                [
                  'select query_id, (extract(epoch from state_time) * 1000)::bigint as state_time_ms',
                  'from sys.log_query',
                  `where query_id = ${query_id || 0}`,
                  `and state_time < '${new Date(query.state_time_ms).toISOString()}'`,
                  'order by state_time desc',
                  'limit 1'
                ])
                .then((response) => {
                  query.prior_query = response && response.rows && response.rows[0]
                })),

            (!query.session_id
              ? Promise.resolve({})
              : executeQuery(
                [
                  'select name',
                  'from sys.user',
              `where user_id = (select user_id from sys.log_session where session_id = ${query.session_id})`
                ])
                .then((response) => {
                  query.authenticatedUserName = response && response.rows && response.rows[0] && response.rows[0].name
                }))
          ])
        }),

      // Get the analysis.
      executeQuery(
        [
          'select *',
          historical ? 'from sys.log_query_analyze' : 'from sys.query_analyze',
            `where query_id = ${query_id || 0}`,
            `limit ${1000 * 100}` // A query plan with count(*) > 100,000 is going to need a different approach
        ])
        .then((response) => {
          query.query_plan_stats = functions.indexBy(
            response.rows || [],
            'node_id'
          )
        }),

      // Get the analysis detail.
      !historical ? Promise.resolve({}) : executeQuery(
        [
          'select *',
          'from sys.log_query_analyze_detail',
            `where query_id = ${query_id || 0}`,
            `limit ${1000 * 100}` // A query plan with count(*) > 100,000 is going to need a different approach
        ])
        .then((response) => {
          return functions.groupBy(
            response.rows || [],
            'node_id'
          )
        }),

      // Get the rule events.
      executeQuery(
        [
          'select *',
          'from sys.query_rule_event',
            `where query_id = ${query_id || 0}`
        ])
        .then((response) => {
          query.detail.ruleEvents = Object.freeze(response)
        })
        .catch((error) => {
          query.detail.ruleEvents = Object.freeze([])
        })
    ])
      .then((responses) => {
        // Fold in analysis detail.
        const queryAnalysisDetail = responses[responses.length - 2] || {}
        Object.values(query.query_plan_stats).forEach((node) => {
          node.custom = functions.indexBy(queryAnalysisDetail[node.node_id] || [], 'name')
        })

        // Get node summaries.
        const rows = []
        query.tableNames = []
        const nodesByType = {}
        const root = queryPlanBuilderService.createTree(query.query_plan_yb, query.query_plan_stats, false)
        root &&
        (function gather(node) {
          if (node.nodeTypeShort !== 'distribute' || !(node.expressions || []).join().match(/Rand/i)) {
            if (!nodesByType[node.nodeTypeShort]) {
              nodesByType[node.nodeTypeShort] = []
            }
            nodesByType[node.nodeTypeShort].push(node)
          }
          rows.push(node)
          query.tableNames.push(...(node.tableNames || []));
          (node.children || []).forEach(gather)
        })(root)
        if (nodesByType.tableScanComposite) {
          if (!nodesByType.tableScan && nodesByType.tableScanComposite) {
            nodesByType.tableScan = nodesByType.tableScanComposite
          } else {
            nodesByType.tableScan = nodesByType.tableScan.concat(...nodesByType.tableScanComposite)
          }
        }
        query.tableNames = functions.uniq(query.tableNames.sort(), true)
        if (nodesByType.tableScan) {
          query.rows_scanned = functions.sum(
            nodesByType.tableScan.map(n => n.stats?.rows_actual || 0)
          )
        } else {
          query.rows_scanned = 0
        }

        // Merge the hash types into single list.
        const hashJoin = nodesByType.hashJoin || []
        const hashEquiJoin = nodesByType.hashEquiJoin || []
        nodesByType.hashJoin = hashJoin.concat(...hashEquiJoin)
        if (!nodesByType.hashJoin.length) {
          delete nodesByType.hashJoin
        }

        // Merge the distribute types into single list.
        const distribute = nodesByType.distribute || []
        const distributeMap = nodesByType.distributeMap || []
        const distributeSortMerge = nodesByType.distributeSortMerge || []
        nodesByType.distribute = distribute.concat(...distributeMap).concat(...distributeSortMerge)
        if (!nodesByType.distribute.length) {
          delete nodesByType.distribute
        }

        // Reverse sort the nodes.
        functions.forEach(nodesByType, (nodes) => {
          nodes.sort((n1, n2) => {
            return n2.stats.rows_actual - n1.stats.rows_actual
          })
        })

        // Commit the nodes.
        query.detail.nodesByType = Object.freeze(nodesByType)
        query.detail.nodes = Object.freeze(rows)

        // Calculate the max ticks active and scale the execution_cycles.
        query.max_execution_cycles =
          (root &&
            (function calculate(node) {
              const childTicks = (node.children || []).map(calculate)
              const selfTicks = node.stats && node.stats.ticks && node.stats.ticks.avg
              return Math.max(selfTicks || 0, functions.max(childTicks) || 0)
            })(root)) ||
          query.execution_cycles

        // Store scope variables for tooltip access.
        query.detail.executionTime = query.run_ms
        query.detail.executionTicks = query.execution_cycles

        return query
      })

    // Get usage, if present.
    const queryUsage = executeQuery(
      [
        'select *',
        'from sys.log_query_usage',
          `where query_id = ${query_id || 0}`
      ])
      .then((response) => {
        // Process CPU stats.
        if (!response || !response.rows || !response.rows.length || !response.rows[0].cpu_stats) {
          query.cpu_stats = Object.freeze({})
        } else {
          try {
            query.cpu_stats = Object.freeze(JSON.parse(g64.decode(response.rows[0].cpu_stats)))
          } catch (e) {
            log.warn('Unexpected exception parsing cpu_stats: ' + e)
            query.cpu_stats = Object.freeze({})
          }
        }
        let maxCPUs = 0
        let i
        functions.forEach(query.cpu_stats, function(stats) {
          if (stats.length > maxCPUs) {
            maxCPUs = stats.length
          }
        })
        const cpus = []
        for (i = 0; i < maxCPUs; i++) {
          cpus.push(i)
        }
        query.detail.cpus = Object.freeze(cpus)

        // Process memory stats.
        if (!response || !response.rows || !response.rows.length || !response.rows[0].mem_stats) {
          query.mem_stats = Object.freeze({})
        } else {
          try {
            query.mem_stats = Object.freeze(JSON.parse(g64.decode(response.rows[0].mem_stats)))
          } catch (e) {
            log.warn('Unexpected exception parsing mem_stats: ' + e)
            query.mem_stats = Object.freeze({})
          }
        }
        let maxMem = Number.MIN_VALUE
        let totalMem = 0
        functions.forEach(query.mem_stats, function(stats) {
          if (stats.highWaterBytes > maxMem) {
            maxMem = stats.highWaterBytes
          }
          totalMem += stats.highWaterBytes
        })
        functions.forEach(query.mem_stats, function(stats) {
          stats.stat = (stats.highWaterBytes / maxMem) * 100
        })
        query.detail.totalMem = totalMem
      })
      .then((response) => {
        return workerQuery({ skinny: true })
          .then(({ workerMap }) => {
            const mapStats = (stats, workerId) => {
              return {
                stats,
                workerId,
                chassisId: workerMap[workerId] && workerMap[workerId].chassisId,
                key: workerMap[workerId] && workerMap[workerId].key
              }
            }
            query.detail.cpuStats = Object.freeze(functions.sortBy(functions.map(query.cpu_stats, mapStats), 'key'))
            query.detail.memStats = Object.freeze(functions.sortBy(functions.map(query.mem_stats, mapStats), 'key'))

            if (query.detail.cpuStats && query.detail.cpuStats.length) {
              query.cpu_percent_avg = functions.mean(query.detail.cpuStats.map(c => functions.mean(c.stats)))
            }
          })
      })
      .catch(response => app.errorResult(response))

    return Promise.all([queryLogRetrieval, queryAlertRetrieval, queryDetailRetrieval, queryUsage])
      .then(() => query)
  }
}
const queryDetailService = new QueryDetailService()
export default queryDetailService
