import pollWrapper from './queryActivity_poll.sql?raw'
import { databaseObjectService, jolokiaService } from '.'
import { functions } from '@/util'
const sysQueryCols = ['query_id', 'session_id', 'transaction_id', 'plan_id', 'state', 'blocked', 'username', 'application_name', 'database_name', 'cluster_name', 'type', 'tags', 'error_code', 'error_message', 'query_text', 'pool_id', 'priority', 'slot', 'num_workers', 'longest_worker_id', 'compile_percent', 'cpu_percent', 'cpu_percent_max', 'num_restart', 'num_error', 'parse_ms', 'wait_parse_ms', 'wait_lock_ms', 'plan_ms', 'wait_plan_ms', 'assemble_ms', 'wait_assemble_ms', 'compile_ms', 'wait_compile_ms', 'acquire_resources_ms', 'run_ms', 'wait_run_cpu_ms', 'wait_run_io_ms', 'wait_run_spool_ms', 'client_ms', 'wait_client_ms', 'total_ms', 'cancel_ms', 'restart_ms', 'wlm_runtime_ms', 'spool_ms', 'cache_efficiency', 'submit_time', 'done_time', 'state_time', 'restart_time', 'io_read_bytes', 'io_write_bytes', 'io_spill_read_bytes', 'io_spill_write_bytes', 'io_network_bytes', 'io_client_read_bytes', 'io_client_write_bytes', 'io_spool_write_bytes', 'rows_inserted', 'rows_deleted', 'rows_returned', 'memory_bytes', 'memory_bytes_max', 'io_spill_space_bytes', 'io_spill_space_bytes_max', 'io_spill_space_granted_bytes', 'memory_estimated_bytes', 'memory_required_bytes', 'memory_granted_bytes', 'memory_estimate_confidence']
const sysLogQueryCols = ['query_id', 'session_id', 'transaction_id', 'plan_id', 'state', 'username', 'application_name', 'database_name', 'cluster_name', 'type', 'tags', 'error_code', 'error_message', 'query_text', 'pool_id', 'priority', 'slot', 'num_workers', 'longest_worker_id', 'compile_percent', 'cpu_percent', 'cpu_percent_max', 'num_restart', 'num_error', 'parse_ms', 'wait_parse_ms', 'wait_lock_ms', 'plan_ms', 'wait_plan_ms', 'assemble_ms', 'wait_assemble_ms', 'compile_ms', 'wait_compile_ms', 'acquire_resources_ms', 'run_ms', 'wait_run_cpu_ms', 'wait_run_io_ms', 'wait_run_spool_ms', 'client_ms', 'wait_client_ms', 'total_ms', 'cancel_ms', 'restart_ms', 'wlm_runtime_ms', 'spool_ms', 'cache_efficiency', 'submit_time', 'done_time', 'state_time', 'restart_time', 'io_read_bytes', 'io_write_bytes', 'io_spill_read_bytes', 'io_spill_write_bytes', 'io_network_bytes', 'io_client_read_bytes', 'io_client_write_bytes', 'io_spool_write_bytes', 'rows_inserted', 'rows_deleted', 'rows_returned', 'memory_bytes', 'memory_bytes_max', 'io_spill_space_bytes', 'io_spill_space_bytes_max', 'io_spill_space_granted_bytes', 'memory_estimated_bytes', 'memory_required_bytes', 'memory_granted_bytes', 'memory_estimate_confidence']

class QueryActivityService {
  async getQueries({ instanceId, filters, columns, time, queryText, pollLimitSeconds, lastStateChangeMs, limit = 10, offset = 0, order, options, stream }) {
    const totalWaitTimeCols = ['wait_parse_ms', 'wait_plan_ms', 'wait_assemble_ms', 'wait_compile_ms', 'wait_lock_ms', 'wait_client_ms', 'wait_run_cpu_ms', 'wait_run_io_ms', 'wait_run_spool_ms']
    let useTotalWaitTime = false
    const waitRunTimeCols = ['wait_run_cpu_ms', 'wait_run_io_ms']
    let useWaitRunTime = false
    let useJobName = false

    // Check time; custom times to be done later TODO:
    if (typeof time !== 'object') {
      throw new TypeError('The time parameter for query activity must be an object')
    }

    // Parse time.
    const { timeInMinutes, mode, customTime, clusterName } = time
    const customRange = typeof customTime === 'object' && (customTime.startDate !== null || (customTime.startDate === null && ['today', 'yesterday'].includes(customTime.label)))

    // Make query tags for exclusion of ourself polling.
    let sql = `SET ybd_query_tags TO 'ym-query-activity';\n`

    // Which kind of queries are we dealing with?
    if (mode === 'active') {
      switch (stream) {
        case 'session':
          sql += [
            'WITH sessions AS (',
            '  SELECT ss.*, su.name AS session_username, sd.name AS session_dbname FROM sys.session ss LEFT OUTER JOIN sys.user su ON ss.user_id = su.user_id   LEFT OUTER JOIN sys.database sd ON ss.database_id = sd.database_id',
            ')',
            ''
          ].join('\n')
          break
        case 'load':
          sql += [
            'WITH loads_table AS (',
            '  SELECT * FROM sys.load',
            ')',
            ''
          ].join('\n')
          break
        case 'unload':
          sql += [
            'WITH unloads_table AS (',
            '  SELECT * FROM sys.unload',
            ')',
            ''
          ].join('\n')
          break
        case 'backup':
          sql += [
            'WITH backup_table AS (',
            '  SELECT * FROM sys.backup',
            ')',
            ''
          ].join('\n')
          break
        case 'restore':
          sql += [
            'WITH restore_table AS (',
            '  SELECT * FROM sys.restore',
            ')',
            ''
          ].join('\n')
          break
        default:
          // Default to Query stream
          sql += [
            'WITH queries AS (',
            '  SELECT active.*, COALESCE(s.client_ip_address, ls.client_ip_address) AS client_ip_address FROM (',
            '    SELECT ' + sysQueryCols.join(', ') + ' FROM sys.query UNION ALL SELECT ' + sysQueryCols.join(', ') + ' FROM sys.query_recent',
            '  ) active',
            '  LEFT OUTER JOIN sys.session s ON active.session_id = s.session_id',
            '  LEFT OUTER JOIN sys.log_session ls ON active.session_id = ls.session_id',
            ')',
            ''
          ].join('\n')
          break
      }
    } else {
      switch (stream) {
        case 'session':
          sql += [
            'WITH sessions AS (',
            'SELECT ss.*, su.name AS session_username, sd.name AS session_dbname FROM sys.log_session ss LEFT OUTER JOIN sys.user su ON ss.user_id = su.user_id   LEFT OUTER JOIN sys.database sd ON ss.database_id = sd.database_id',
            ')',
            ''
          ].join('\n')
          break
        case 'load':
          sql += [
            'WITH loads_table AS (',
            'SELECT * FROM sys.log_load',
            ')',
            ''
          ].join('\n')
          break
        case 'unload':
          sql += [
            'WITH unloads_table AS (',
            'SELECT * FROM sys.log_unload',
            ')',
            ''
          ].join('\n')
          break
        case 'backup':
          sql += [
            'WITH backup_table AS (',
            'SELECT * FROM sys.log_backup',
            ')',
            ''
          ].join('\n')
          break
        case 'restore':
          sql += [
            'WITH restore_table AS (',
            'SELECT * FROM sys.log_restore',
            ')',
            ''
          ].join('\n')
          break
        default:

          // Default to Query stream but deal with 2hr "recent" separate from historical.
          if (!customRange && timeInMinutes <= 120) {
            sql += [
              'WITH queries AS (',
              '  SELECT recent.*, s.client_ip_address',
              '  FROM sys.query_recent recent',
              '  LEFT OUTER JOIN sys.log_session s ON recent.session_id = s.session_id',
              ')',
              ''
            ].join('\n')
          } else {
            sql += [
              'WITH queries AS (',
              '  SELECT historical.*, s.client_ip_address',
              '  FROM sys.log_query historical',
              '  LEFT OUTER JOIN sys.log_session s ON historical.session_id = s.session_id',
              ')',
              ''
            ].join('\n')
          }
          break
      }
    }

    // Compose sql.

    if (customRange) {
      // Date range

      switch (stream) {
        case 'session':
          sql += `SELECT ${[...columns.map(c => c.field)].join(', ')}
          FROM sessions
          WHERE start_time BETWEEN '${customTime.startDate}'::timestamp AND '${customTime.endDate}'::timestamp`
          sql += ''
          break
        case 'load':
          sql += `SELECT ${[...columns.map((c) => {
            if (c.field === 'end_time' && mode === 'active') {
              return 'start_time AS end_time'
            }
            if (c.field === 'job_name') {
              useJobName = true
              return 'client_hostname, username, database_name'
            }
            return c.field
          })].join(', ')}
          FROM loads_table
          WHERE start_time BETWEEN '${customTime.startDate}'::timestamp AND '${customTime.endDate}'::timestamp`
          sql += ''
          break
        case 'unload':
          sql += `SELECT ${[...columns.map((c) => {
            if (c.field === 'last_activity_time' && mode !== 'active') {
              return 'end_time AS last_activity_time'
            }
            if (c.field === 'job_name') {
              useJobName = true
              return 'client_hostname, username, database_name'
            }
            return c.field
          })].join(', ')}
          FROM unloads_table
          WHERE start_time BETWEEN '${customTime.startDate}'::timestamp AND '${customTime.endDate}'::timestamp`
          sql += ''
          break
        case 'backup':
          sql += `SELECT ${[...columns.map((c) => {
            if (c.field === 'job_name') {
              useJobName = true
              return 'client_hostname, username, database_name'
            }
            return c.field
          })].join(', ')}
          FROM backup_table
          WHERE start_time BETWEEN '${customTime.startDate}'::timestamp AND '${customTime.endDate}'::timestamp`
          sql += ''
          break
        case 'restore':
          sql += `SELECT ${[...columns.map((c) => {
            if (c.field === 'job_name') {
              useJobName = true
              return 'client_hostname, username, database_name'
            }
            return c.field
          })].join(', ')}
          FROM restore_table
          WHERE start_time BETWEEN '${customTime.startDate}'::timestamp AND '${customTime.endDate}'::timestamp`
          sql += ''
          break
        default:
          // Default to Query stream
          sql += `SELECT ${['query_id', ...columns.map((c) => {
            if (c.field === 'total_wait_time_ms') {
              useTotalWaitTime = true
              return totalWaitTimeCols.join(', ')
            }
            if (c.field === 'wait_run_time_ms') {
              useWaitRunTime = true
              return waitRunTimeCols.join(', ')
            }
            return c.field
          })].join(', ')}
          FROM queries
          WHERE (COALESCE(state_time, submit_time) BETWEEN '${customTime.startDate}'::timestamp AND '${customTime.endDate}'::timestamp`
          if (mode === 'active') {
            sql += ` OR (done_time IS NULL and state not in ('restart error', 'restart user') and total_ms > 100))`
          } else {
            sql += ')'
          }
          break
      }

      if (queryText) {
        sql += `\nAND query_text ILIKE $$%${queryText}%$$`
      }
    } else {
      // Time in minutes
      switch (stream) {
        case 'session':
          sql += `SELECT ${[...columns.map(c => c.field)].join(', ')}
          FROM sessions
          WHERE (start_time > current_timestamp - '${timeInMinutes} minutes'::INTERVAL`
          sql += ')'
          break
        case 'load':
          sql += `SELECT ${[...columns.map((c) => {
            if (c.field === 'end_time' && mode === 'active') {
              return 'start_time AS end_time'
            }
            if (c.field === 'job_name') {
              useJobName = true
              return 'client_hostname, username, database_name'
            }
            return c.field
          })].join(', ')}
          FROM loads_table
          WHERE (start_time > current_timestamp - '${timeInMinutes} minutes'::INTERVAL`
          sql += ')'
          break
        case 'unload':
          sql += `SELECT ${[...columns.map((c) => {
            if (c.field === 'last_activity_time' && mode !== 'active') {
              return 'end_time AS last_activity_time'
            }
            if (c.field === 'job_name') {
              useJobName = true
              return 'client_hostname, username, database_name'
            }
            return c.field
          })].join(', ')}
          FROM unloads_table
          WHERE (start_time > current_timestamp - '${timeInMinutes} minutes'::INTERVAL`
          sql += ')'
          break
        case 'backup':
          sql += `SELECT ${[...columns.map((c) => {
            if (c.field === 'job_name') {
              useJobName = true
              return 'client_hostname, username, database_name'
            }
            return c.field
          })].join(', ')}
          FROM backup_table
          WHERE (start_time > current_timestamp - '${timeInMinutes} minutes'::INTERVAL`
          sql += ')'
          break
        case 'restore':
          sql += `SELECT ${[...columns.map((c) => {
            if (c.field === 'job_name') {
              useJobName = true
              return 'client_hostname, username, database_name'
            }
            return c.field
          })].join(', ')}
          FROM restore_table
          WHERE (start_time > current_timestamp - '${timeInMinutes} minutes'::INTERVAL`
          sql += ')'
          break
        default:
          sql += `SELECT ${['query_id', ...columns.map((c) => {
            if (c.field === 'total_wait_time_ms') {
              useTotalWaitTime = true
              return totalWaitTimeCols.join(', ')
            }
            if (c.field === 'wait_run_time_ms') {
              useWaitRunTime = true
              return waitRunTimeCols.join(', ')
            }
            return c.field
          })].join(', ')}
          FROM queries
          WHERE (COALESCE(state_time, submit_time) > current_timestamp - '${timeInMinutes} minutes'::INTERVAL`
          if (mode === 'active') {
            sql += ` OR (done_time IS NULL and state not in ('restart error', 'restart user') and total_ms > 100))`
          } else {
            sql += ')'
          }
          // Default to Query stream
          break
      }

      if (queryText) {
        sql += `\nAND query_text ILIKE $$%${queryText}%$$`
      }
    }

    // Make filters into WHERE clause.
    const groupFilters = functions.groupBy(filters, 'value')

    Object.keys(groupFilters).forEach((key) => {
      if (key.match(/_ms$/)) {
        // Compare value as a millis.
        let sqlMs = ''
        sqlMs += '\nAND ('
        const options = []
        groupFilters[key].forEach((filter) => {
          const size1 = parseInt(filter.option.value.primarySize === 'miliseconds' ? filter.option.value.primaryTime : (filter.option.value.primaryTime * 1000))
          if (['less_than', 'more_than'].includes(filter.option.value.compare)) {
            const compare = filter.option.value.compare === 'less_than' ? '<' : '>'
            options.push(`\n${filter.value} ${compare} ${size1}`)
          } else {
            const size2 = parseInt(filter.option.value.secondarySize === 'miliseconds' ? filter.option.value.secondaryTime : (filter.option.value.secondaryTime * 1000))
            options.push(`\n${filter.value} >= ${size1} AND ${filter.value} <= ${size2}`)
          }
        })
        sqlMs += options.join(' OR ')
        sqlMs += ')'
        if (sqlMs !== '\nAND ()') {
          sql += sqlMs
        }
      } else if (key.match(/_bytes/)) {
        // Compare value as a byte quantity.
        let sqlBytes = ''
        sqlBytes += '\nAND ('
        const options = []
        groupFilters[key].forEach((filter) => {
          let size = parseInt(filter.option.value.value)
          if (filter.option.value.size === 'kb') {
            size *= 1024
          } else if (filter.option.value.size === 'mb') {
            size *= 1024 * 1024
          } else if (filter.option.value.size === 'gb') {
            size *= 1024 * 1024 * 1024
          }

          switch (filter.option.value.compare) {
            case 'greater_than':
              options.push(`\n${filter.value} > ${size}`)
              break
            case 'greater_than_or_equal_to':
              options.push(`\n${filter.value} >= ${size}`)
              break
            case 'less_than':
              options.push(`\n${filter.value} < ${size}`)
              break
            case 'less_than_or_equal_to':
              options.push(`\n${filter.value} <= ${size}`)
              break
            case 'equal_to':
              options.push(`\n${filter.value} = ${size}`)
              break
          }
        })
        sqlBytes += options.join(' OR ')
        sqlBytes += ')'

        if (sqlBytes !== '\nAND ()') {
          sql += sqlBytes
        }
      } else if (key.match(/(efficiency|num_|_count|_percent|cpu)/)) {
        // Compare value as a numeric.
        let sqlNumeric = ''
        sqlNumeric += '\nAND ('
        const options = []
        groupFilters[key].forEach((filter) => {
          const value = parseFloat(filter.option.value.value)
          switch (filter.option.value.compare) {
            case 'greater_than':
              options.push(`\n${filter.value} > ${value}`)
              break
            case 'greater_than_or_equal_to':
              options.push(`\n${filter.value} >= ${value}`)
              break
            case 'less_than':
              options.push(`\n${filter.value} < ${value}`)
              break
            case 'less_than_or_equal_to':
              options.push(`\n${filter.value} <= ${value}`)
              break
            case 'equal_to':
              options.push(`\n${filter.value} = ${value}`)
              break
          }
        })
        sqlNumeric += options.join(' OR ')
        sqlNumeric += ')'

        if (sqlNumeric !== '\nAND ()') {
          sql += sqlNumeric
        }
      } else {
        // Compare value as a string.
        let sqlString = ''
        sqlString += '\nAND ('
        const options = []
        let isUsingNotEqual = false
        groupFilters[key].forEach((filter) => {
          const operator = filter?.option?.compare || '='
          if (operator === '!=') {
            isUsingNotEqual = true
          }
          if (['Like', 'Not Like'].includes(operator)) {
            if (operator === 'Like') {
              options.push(`\n${filter.value} LIKE $$%${filter.option.value}%$$`)
            } else {
              options.push(`\n${filter.value} NOT LIKE $$%${filter.option.value}%$$`)
            }
          } else if (['IS NULL', 'IS NOT NULL'].includes(operator)) {
            options.push(`\n${filter.value} ${operator}`)
          } else {
            options.push(`\n${filter.value} ${operator} $$${filter.option.value}$$`)
          }
        })
        if (isUsingNotEqual) {
          sqlString += options.join(' AND ')
        } else {
          sqlString += options.join(' OR ')
        }
        sqlString += ')'
        if (sqlString !== '\nAND ()') {
          sql += sqlString
        }
      }
    })

    // Do filters if supplied.
    const { systemWork, automaticFlush, session } = (options || {})

    if (stream === 'query') {
      if (systemWork) {
        sql += `\nAND username NOT LIKE 'sys_ybd_%'`
      }
      if (automaticFlush) {
        sql += `\nAND type NOT IN ('yflush', 'ycopy')`
      }
      if (session) {
        sql += `\nAND type NOT IN ('set', 'show', 'reset')`
      }
    }

    // This will be used as the long poller
    if (false && (mode === 'active')) {
      // Supply defaults.
      pollLimitSeconds = typeof pollLimitSeconds === 'number' ? pollLimitSeconds : 45
      lastStateChangeMs = lastStateChangeMs || 0
      const excludeQueryTags = `'ym-query-activity'`

      // Wrap query with query activity.
      sql = pollWrapper
        .replace(/\{\{sql\}\}/, sql)
        .replace(/\{\{lastStateChangeMs\}\}/, lastStateChangeMs)
        .replace(/\{\{pollLimitMs\}\}/, pollLimitSeconds)
        .replace(/\{\{excludeQueryTags\}\}/, excludeQueryTags)
    }

    if (stream === 'query') {
      // Exclude YM itself.
      sql += `\nAND (tags IS NULL OR tags != 'ym-query-activity')`
    }

    if (order) {
      let orderColumn = order.column
      if (orderColumn === 'job_name') {
        orderColumn = "client_hostname||\' / \'||username||\' / \'|| database_name"
      }
      sql += `\nORDER BY ${orderColumn} ${order.order} NULLS LAST`
    }

    if (limit) {
      sql += `\nLIMIT ${limit}`
    }

    sql += `\nOFFSET ${offset}`

    // Add cluster.
    if (!!clusterName && mode !== 'active') {
      sql = `USE CLUSTER "${clusterName}";\n${sql}`
    }
    // Return result rows.
    jolokiaService.timeout((mode === 'active' ? 15 : 2 * 60) * 1000)
    const result = await databaseObjectService.sql(instanceId, 'yellowbrick', sql)
    let rows = result.rows
    // Add missing rows
    result.columns.forEach((col) => {
      rows.map((row) => {
        if (row[col.label] === undefined) {
          row[col.label] = row[col.name]
        }
        return row
      })
    })

    if (useTotalWaitTime) {
      rows = rows.map((row) => {
        row.total_wait_time_ms = totalWaitTimeCols.reduce((acc, colName) => {
          return acc + (row[colName] || 0)
        }, 0)
        return row
      })
    }
    if (useWaitRunTime) {
      rows = rows.map((row) => {
        row.wait_run_time_ms = waitRunTimeCols.reduce((acc, colName) => {
          return acc + (row[colName] || 0)
        }, 0)
        return row
      })
    }
    if (useJobName) {
      rows = rows.map((row) => {
        row.job_name = (row.client_hostname && row.username && row.database_name && `${row.client_hostname} / ${row.username} / ${row.database_name}`) || 'n/a'
        return row
      })
    }
    return { sql, rows }
  }

  async getColumnTopUsage({ instanceId, column, time, limit, options }) {
    // Do filters if supplied.
    const { systemWork, automaticFlush, session } = (options || {})

    let condition = ''
    if (systemWork) {
      condition += `\nAND username NOT LIKE 'sys_ybd_%'`
    }
    if (automaticFlush) {
      condition += `\nAND type NOT IN ('yflush', 'ycopy')`
    }
    if (session) {
      condition += `\nAND type NOT IN ('set', 'show', 'reset')`
    }
    // Check time; custom times to be done later TODO:
    if (typeof time !== 'object') {
      throw new TypeError('The time parameter for query activity must be an object')
    }

    // Parse time.
    const { timeInMinutes, mode, customTime, clusterName } = time
    if (customTime.startDate !== null || (customTime.startDate === null && ['today', 'yesterday'].includes(customTime.label))) {
      // throw new Error('The customTime parameter is not implemented');
    }
    const sourceQuery = mode === 'active'
      ? 'select ' + sysQueryCols.join(', ') + ' from sys.query union all select ' + sysQueryCols.join(', ') + ' from sys.query_recent'
      : 'select ' + sysLogQueryCols.join(', ') + ' from sys.log_query'

    // Compose sql.
    let sql = timeInMinutes
      ? `SELECT ${column}, COUNT(*)
FROM (
  ${sourceQuery}
) queries
WHERE COALESCE(state_time, submit_time) > current_timestamp - '${timeInMinutes} minutes'::INTERVAL ${condition}
GROUP BY 1
ORDER BY 2 DESC
LIMIT ${limit || 10}`
      : `SELECT ${column}, COUNT(*)
FROM (
  ${sourceQuery}
) queries
WHERE (tags IS NULL OR tags != 'ym-query-activity') ${condition}
GROUP BY 1
ORDER BY 2 DESC
LIMIT ${limit || 10}`

    // Return result rows.
    sql = `SET ybd_query_tags TO 'ym-query-activity';\n${sql}`
    const result = await databaseObjectService.sql(instanceId, 'yellowbrick', sql)
    return result.rows
  }

  async pollQueryStates({ instanceId, stateTimeMs, pollDurationMs, pollBufferDurationMs }) {
    if (isNaN(stateTimeMs) || stateTimeMs === -Infinity || stateTimeMs === Infinity) {
      stateTimeMs = 1
    }
    const sql = `SELECT query_id, state, pool_id, slot, cluster_name, extract(epoch from submit_time) * 1000 as submit_time, extract(epoch from execution_time) * 1000 as execution_time, extract(epoch from state_time) * 1000 as state_time, extract(epoch from restart_time) * 1000 as restart_time, extract(epoch from done_time) * 1000 as done_time, num_restart, run_ms, memory_granted_bytes FROM sys.query_poll_state(${stateTimeMs}, NULL, NULL, ${pollDurationMs}, ${pollBufferDurationMs})`
    jolokiaService.flush()
    jolokiaService.timeout(Math.max(pollDurationMs, 1 * 60 * 1000))
    const asyncPoll = databaseObjectService.sql(instanceId, 'yellowbrick', sql)
    jolokiaService.flush()
    const result = await asyncPoll
    return result.rows || []
  }

  async cancelQuery({ instanceId, queryId }) {
    const sql = `CANCEL ${queryId}`
    const result = await databaseObjectService.sql(instanceId, 'yellowbrick', sql)
    return result
  }

  async terminateSession({ instanceId, sessionId }) {
    const sql = `select yb_terminate_session(${sessionId})`
    const result = await databaseObjectService.sql(instanceId, 'yellowbrick', sql)
    return result
  }
}

const queryActivityService = new QueryActivityService()
export default queryActivityService
