import { reactive } from 'vue'
import lru from 'tiny-lru'
import store from '@/store/storeService'

import { jolokiaService, mbeans, app, baseUrl, connectionService, errorHandler, databaseObjectService } from '@/services'
import { Logger } from '@/util'
import apollo from '@/apollo'
import INSTANCE_TASKS_ACTIVE from '@/graphql/instanceTasksActive.gql'
import { getInstance, hasRole } from '@/auth'
import { clearCommunicationsError } from '@/services/app'
import { useConfig } from '~/utils/helpers'

const log = Logger.get('yb.tasks')

const POLL_SHORT = 1000
const POLL_LONG = 60 * 1000
const POLL_COUNTDOWN = 10
const POLL_QUERY_TEXT_LENGTH = 100
const FINAL_STATES = new Set(['done', 'error', 'cancel'])
const FINAL_STATES_WITH_RESTART = new Set(['done', 'error', 'cancel', 'restart user', 'restart error'])
const QUERY_SOURCES = {
  active: `SELECT * FROM (SELECT {{columns}} FROM sys.query WHERE state NOT IN (${Array.from(FINAL_STATES).map(s => `'${s}'`).join(', ')})) q`,
  recent: 'SELECT {{columns}} FROM sys.query_recent',
  historical: 'SELECT {{columns}} FROM sys.log_query',
  active_recent: 'SELECT * FROM (select {{columns}} from sys.query union all select {{columns}} from sys.query_recent) q',
  all: 'SELECT * FROM (SELECT {{columns}} FROM sys.query UNION ALL SELECT {{columns}} FROM sys.query_recent UNION ALL SELECT {{columns}} FROM sys.log_query) q'
}

let windowUnloading = false
window.addEventListener('beforeunload', () => {
  windowUnloading = true
})

class PollInstanceState {
  constructor(instance, database, key) {
    if (!instance) {
      throw new Error('Instance parameter is required')
    }
    if (!database) {
      throw new Error('Database parameter is required')
    }
    if (!key) {
      throw new Error('Database parameter is required')
    }
    this._instance = {
      id: instance.id,
      name: instance.name
    }
    this._database = database
    this._key = key
    this._idleCountdown = POLL_COUNTDOWN
    this.expire(-1) // We are immediately expired.
  }

  expire(offset) {
    this._nextPoll = +new Date() + offset
    return this
  }

  reset() {
    this._idleCountdown = POLL_COUNTDOWN
  }

  idleCountdown() {
    return --this._idleCountdown < 0
  }

  get instance() {
    return this._instance
  }

  get database() {
    return this._database
  }

  get key() {
    return this._key
  }

  get expired() {
    return +new Date() > this._nextPoll
  }
}

class TasksService {
  constructor() {
    this._pollQueriesActive = false
    this._pollTasksActive = false
    this._pollInstanceStates = {}
    this._queryTexts = lru(100)
    this._taskState = reactive({
      pollActive: false,
      tasks: [],
      history: [],
      activeTasks: 0
    })
    this._restored = false
    const originalStore = store
    store.observe((store) => {
      store.watch(state => state?.tasks?.settings?.pollInstanceStates,
        (_) => {
          if (_ && _.length > 0 && !this._restored) {
            this._restored = true
            const pollInstanceStates = store.get('tasks/settings@pollInstanceStates')
            if (pollInstanceStates != null) {
              pollInstanceStates.forEach(pollInstanceState => this.pollInstance(pollInstanceState.instance, pollInstanceState.database))
            }
            this._taskState.history = store.get('tasks/settings@history')
          }
          if (originalStore?.roles && hasRole('consumeradmin', originalStore.roles)) {
            this._pollNextTasks()
          }
        },
        { deep: true }
      )
      store.watch(state => state?.global?.settings?.user?.roles,
        (roles, oldVal) => {
          if (oldVal === null && hasRole('consumeradmin', roles)) {
            // Create periodic poller.
            this._tasksPollInterval = window.setInterval(this._pollNextTasks.bind(this), 60000)
          }
        },
        { deep: true }
      )
    })
  }

  destroy () {
    this._tasksPollInterval && window.clearInterval(this._tasksPollInterval)
    delete this._tasksPollInterval
  }

  start() {
    if (!this._queryPollInterval) {
      this._queryPollInterval = window.setInterval(this._pollNextQueries.bind(this), 1000)
    }
    this._restored = true // no longer need to check.
  }

  stop() {
    this._queryPollInterval && window.clearInterval(this._queryPollInterval)
    delete this._queryPollInterval
  }

  get taskState() {
    return this._taskState
  }

  set taskState(_) {
    this._taskState = _
  }

  async queriesForInstance(instance, database, source, andWhere = '', limit = 0, offset = 0, orderBy = null, defaultValue = [], pollInstanceState = null) {
    // If database is not sent, error.
    if (typeof database !== 'string') {
      throw new TypeError('Database parameter must be string')
    }
    if (!QUERY_SOURCES[source]) {
      throw new Error('Invalid source: ' + source)
    }

    // We are using a fixed user here.  PUKE.
    const url = baseUrl.get()
    let poll
    try {
      // Start jolokia.
      if (!jolokiaService.started()) {
        jolokiaService.start()
      }

      // Include limit/offset?
      const options = {}
      if (limit) {
        options.limit = limit
      }
      if (offset) {
        options.offset = offset
      }

      // Get the queries for this instance.
      connectionService.connect(instance.id)
      jolokiaService.flush()
      jolokiaService.timeout((source === 'historical' ? 60 : 15) * 1000)
      const username = store.get().get('global/settings@user.email')
      const columns = `
        query_id,
        cluster_name,
        database_name,
        application_name,
        session_id,
        error_code,
        error_message,
        state,
        type,
        submit_time,
        done_time,
        total_ms,
        tags,
        io_read_bytes,
        io_write_bytes,
        io_spill_write_bytes,
        rows_inserted,
        rows_deleted,
        rows_returned,
        substring(query_text, 0, ${POLL_QUERY_TEXT_LENGTH}) as query_text,
        length(query_text) as query_text_length,
        ${source === 'active' ? 'blocked' : `'' as blocked`}
      `
      let sql = `
        SET ybd_query_tags TO 'ym-query-activity';
        ${QUERY_SOURCES[source].replace(/\{\{columns\}\}/g, columns)}
        WHERE application_name = 'ym'
          AND database_name = '${database}'
          AND tags LIKE 'ym:${username}:%'
          AND type NOT IN ('set', 'reset', 'show', 'session')`
      if (andWhere) {
        sql += `\n${andWhere}`
      }
      sql += `\n${orderBy || 'ORDER BY submit_time desc'}`
      if (limit) {
        sql += `\nLIMIT ${limit}`
      }
      if (offset) {
        sql += `\nOFFSET ${offset}`
      }
      jolokiaService.flush()
      poll = jolokiaService.execute(mbeans.datasources, 'queryEx', [database, sql, 'ym', 0])
      jolokiaService.pin()
      jolokiaService.flush()
    } finally {
      baseUrl.set(url)
    }

    // Wait on the poll for queries.
    try {
      const { rows } = await poll
      return rows
    } catch (e) {
      if (typeof e?.status === 'number' && e.status === 404) {
        log.debug('Received instance not found for instance', instance.name, String(e))
      } else {
        log.warn('Unexpected exception polling instance', instance.name, String(e))
        log.error(e)
      }
      if (!windowUnloading) {
        !!pollInstanceState && this._unpollInstance(pollInstanceState)
        return defaultValue
      } else {
        return null
      }
    }
  }

  pollInstance(instance, database) {
    this.start()
    const pollInstanceState = this._pollInstanceState(instance, database).expire(-1)
    log.debug(`Start polling instance ${pollInstanceState.key}`)
    this._storePollInstanceStates()
  }

  _unpollInstance(pollInstanceState) {
    delete this._pollInstanceStates[pollInstanceState.key]
    log.debug(`Stop polling instance ${pollInstanceState.key}`)
    this._storePollInstanceStates()
  }

  _storePollInstanceStates() {
    const pollInstanceStates = Object.values(this._pollInstanceStates).map((pollInstanceState) => {
      return { instance: pollInstanceState.instance, database: pollInstanceState.database }
    })
    store.get().dispatch('tasks/setPollInstanceStates', { pollInstanceStates })
  }

  _pollInstanceState(instance, database) {
    if (typeof instance !== 'object') {
      throw new TypeError('Expected instance object for poll instance state')
    }
    if (!database) {
      throw new Error('Expected database for poll instance')
    }
    const key = `${instance.id}:${database}`
    let pollInstanceState = this._pollInstanceStates[key]
    if (!pollInstanceState) {
      pollInstanceState = new PollInstanceState(instance, database, key)
      this._pollInstanceStates[key] = pollInstanceState
    } else {
      pollInstanceState.reset()
    }

    return pollInstanceState
  }

  _nextPollInstanceState() {
    // Loop through all states, determining which instance needs a poll.
    for (const pollInstanceState of Object.values(this._pollInstanceStates)) {
      if (pollInstanceState.expired) {
        return pollInstanceState
      }
    }
  }

  _mapQueryText(query) {
    // How we overlay query text.
    const key = `${query.instance?.id}:${query.query_id}`
    const query_text = this._queryTexts.get(key)
    if (query_text) {
      return query_text
    } else {
      const remaining = query.query_text_length - query.query_text?.length
      if (remaining > 0) {
        query.query_text += '... (' + remaining + ' more characters)'
      }
      return Promise.resolve(query.query_text)
    }
  }

  async _pollNextQueries() {
    // If polling, skip.
    if (this._pollQueriesActive) {
      return
    }

    // Determine next instance that needs a poll.
    const pollInstanceState = this._nextPollInstanceState()
    if (!pollInstanceState) {
      return
    }

    // Poll this one instance.
    this._pollQueriesActive = true
    try {
      // Wait on the poll for queries.
      const queries = await this.queriesForInstance(pollInstanceState.instance, pollInstanceState.database, 'active', '', 0, 0, null, [], pollInstanceState)
      if (!queries) {
        // Poll failed; skip this round.
        return
      }
      let history = []

      // Remove prior queries for this instance.
      for (let i = this._taskState.tasks.length - 1; i >= 0; --i) {
        if (this._taskState.tasks[i].instance.id === pollInstanceState.instance.id &&
            this._taskState.tasks[i].database_name === pollInstanceState.database) {
          history.push(...this._taskState.tasks.splice(i, 1))
        }
      }

      // If there are queries, we will expire short, otherwise long.
      if (!queries?.length && pollInstanceState.idleCountdown()) {
        this._unpollInstance(pollInstanceState)
      } else {
        pollInstanceState.expire(POLL_SHORT)
      }

      // Integrate the queries into the task list.
      await Promise.all(queries.map((query) => {
        query.instance = pollInstanceState.instance
        return this._mapQueryText(query)
          .then(async (queryText) => {
            if (queryText !== query.query_text) {
              const updated = await store.get().dispatch('tasks/updateQueryText', { query, queryText })
              if (!updated) {
                query.query_text = queryText
              }
            }
          })
          .then(() => this._taskState.tasks.push(query))
      }))

      // Update history if we need to.
      // NB: we are finding historical queries that have non-final state that just disappeared.
      const activeQueryIds = new Set(queries
        .filter(q => !FINAL_STATES.has(q.state))
        .map(q => q.query_id))
      history = history
        .filter(q => !activeQueryIds.has(q.query_id)) // filter out those that are active
        .filter(q => !FINAL_STATES.has(q.state))

      if (history.length > 0) {
        // This is temporary; state and final stats are coming below....
        const now = new Date().toISOString()
        history.forEach((q) => {
          q.done_time = q.done_time || now
          if (!FINAL_STATES_WITH_RESTART.has(q.state)) {
            q.state = 'completing'
          }
        })
        store.get().dispatch('tasks/addHistory', { queries: history })

        // We have some final queries; wait a bit then get their final disposition.
        window.setTimeout(async () => {
          // Query for final stats about these queries and add them to history.
          const added = await this.addCompletedQueriesById(pollInstanceState.instance, pollInstanceState.database, history.map(q => q.query_id))
          if (!added) {
            store.get().dispatch('tasks/addHistory', {
              queries: history.map((query) => {
                if (!FINAL_STATES_WITH_RESTART.has(query.state)) {
                  query.state = 'completing'
                  console.log(`Could not get final state of query; states will be stale in cached query history:\nID: ${query.query_id}\nSTATE: ${query.state}\nTYPE: ${query.type}\nQUERY: ${query.query_text}`)
                }
                query.instance = pollInstanceState.instance
                return query
              })
            })
          }
        }, 5000) // A bit of a delay to ensure history is written....
      }
    } catch (e) {
      pollInstanceState.expire(POLL_LONG)
    } finally {
      this._pollQueriesActive = false
    }
  }

  async addCompletedQueriesById(instance, database, queryIds) {
    const historicalQueries = await this.queriesForInstance(instance, database, 'active_recent', `AND query_id IN (${queryIds.join(', ')})`)
    if (historicalQueries?.length > 0) {
      store.get().dispatch('tasks/addHistory', {
        queries: historicalQueries.map((query) => {
          query.instance = instance
          this._mapQueryText(query)
            .then((queryText) => {
              if (queryText !== query.query_text) {
                store.get().dispatch('tasks/updateQueryText', { query, queryText })
              }
            })
          return query
        })
      })
      return true
    } else {
      return false
    }
  }

  async addCompletedQueriesByRowset(instance, database, rowset) {
    const queryIds = []
    if (rowset.execId) {
      queryIds.push(rowset.execId)
    }
    if (rowset.rowSets) {
      queryIds.push(...rowset.rowSets
        .filter(r => !!r.execId)
        .map(r => r.execId))
    }
    return queryIds.length > 0 ? this.addCompletedQueriesById(instance, database, queryIds) : true
  }

  async cancel(query) {
    // Issue the request.
    let cancelRequest
    const url = baseUrl.get()
    try {
      const connected = await connectionService.connect(query.instance.id)
      if (connected) {
        cancelRequest = jolokiaService.execute(mbeans.datasources, 'queryEx', [query.database_name, `CANCEL ${query.query_id}`, 'ym', 1])
      } else {
        return
      }
    } finally {
      baseUrl.set(url)
    }

    // Wait on it.
    try {
      await cancelRequest
      return true
    } catch (e) {
      log.error(e)
      app.errorResult(e, false, 'Could not cancel query: ')
      return false
    }
  }

  async getQueryText(query, historical) {
    // If the query text needs to be adjusted.
    if (query.query_text_length !== query.query_text?.length) {
      const key = `${query.instance?.id}:${query.query_id}`
      let result = this._queryTexts.get(key)
      if (!result) {
        // Make a new promise holding the query text.
        // eslint-disable-next-line no-async-promise-executor
        result = new Promise(async (resolve, reject) => {
          // Issue the request.
          const url = baseUrl.get()
          try {
            const connected = await connectionService.connect(query.instance.id)
            if (connected) {
              const result = await jolokiaService.execute(historical ? mbeans.sysLogQuery : mbeans.sysQuery, 'retrieve', [{
                columns: ['query_text'],
                where: {
                  query_id: query.query_id
                },
                options: {
                  database: query.database_name
                }
              }])
              let queryText
              if (result && result.rows && result.rows.length > 0) {
                queryText = result.rows[0].query_text

                if (historical) {
                  store.get().dispatch('tasks/updateQueryText', { query, queryText })
                } else {
                  // Done on poll cycle.
                }
              } else {
                queryText = query.query_text // could not find it, just use what we have and try again later.
                this._queryTexts.delete(key)
              }

              // Store results.
              resolve(queryText)
            }
          } catch (e) {
            this._queryTexts.delete(key)
            reject(e)
          } finally {
            baseUrl.set(url)
          }
        })

        this._queryTexts.set(key, result)
      }

      return result
    }
  }

  pollTasks() {
    if (hasRole('consumeradmin', store.roles)) {
      log.debug('Polling next active tasks')
      window.setTimeout(this._pollNextTasks.bind(this), 1000)
    }
  }

  async _pollNextTasks() {
    if(useConfig().deploymentMode === 'ce') {
      return
    }
    if (this._pollTasksActive) {
      return
    }
    const instance = getInstance()
    if (!instance || !instance.authenticated()) {
      return
    }
    this._pollTasksActive = true
    try {
      const response = await apollo.query({
        query: INSTANCE_TASKS_ACTIVE
      })
      this._taskState.activeTasks = response?.data?.activeTasks || 0
      clearCommunicationsError(true)
    } catch (e) {
      this._taskState.activeTasks = 0
      await errorHandler.thrown(e, 'Could not retrieve task information.')
    } finally {
      this._pollTasksActive = false
    }
  }
}

export default new TasksService()
