import connectionService from './connectionService'
import jolokiaService from './jolokiaService'
import { mbeans } from './constants'
import { NotFound, ResponseError } from './errors'
import queryTables from './queryTables.sql?raw'
import querySchema from './querySchema.sql?raw'
import queryClusterUsage from './clusterUsage.sql?raw'
import queryConcurrentQueries from './concurrentQueries.sql?raw'
import queryPoolsAtTime from './poolsAtTime.sql?raw'
import * as functions from '@/util/functions'
import { parseFilter } from '@/services/expressions'
import Cluster from '@/models/Cluster'
import Database from '@/models/Database'
import Format from '@/models/Format'
import Function from '@/models/Function'
import Instance from '@/models/Instance'
import Location from '@/models/Location'
import Procedure from '@/models/Procedure.js'
import Role from '@/models/Role'
import Schema from '@/models/Schema'
import Sequence from '@/models/Sequence'
import Storage from '@/models/Storage'
import Table from '@/models/Table'
import View from '@/models/View'
import WLMProfile from '@/models/WLMProfile'
import WLMResourcePool from '@/models/WLMResourcePool'
import WLMRule from '@/models/WLMRule'

const CACHE_EXPIRATION = (15 * 60 * 1000) // 15min cache expiration

const OBJECT_IDENTIFIER_RE = /\{\{id\}\}/g // For replacing {{id}} in queries below
const OBJECT_TYPES = {
  system: {
    storedPrivilegesQuery: `select * from information_schema.system_privileges`,
    privileges: [
      'ALTER ANY DATABASE',
      'ALTER ANY ROLE',
      'ALTER ANY CLUSTER',
      'BACKUP ANY DATABASE',
      'BULK LOAD',
      'CONTROL ANY SESSION',
      'CONTROL CACHE',
      'CONTROL EXTERNAL AUTHENTICATION',
      'CONTROL LDAP',
      'CONTROL WLM',
      'CREATE CLUSTER',
      'CREATE DATABASE',
      'CREATE EXTERNAL FORMAT',
      'CREATE EXTERNAL LOCATION',
      'CREATE EXTERNAL STORAGE',
      'CREATE ROLE',
      'DROP ANY DATABASE',
      'DROP ANY EXTERNAL FORMAT',
      'DROP ANY EXTERNAL LOCATION',
      'DROP ANY EXTERNAL STORAGE',
      'DROP ANY ROLE',
      'DROP ANY CLUSTER',
      'EXPLAIN QUERY',
      'RESTORE ANY DATABASE',
      'TRACE QUERY',
      'USAGE ANY CLUSTER',
      'USAGE ANY EXTERNAL FORMAT',
      'USAGE ANY EXTERNAL LOCATION',
      'USAGE ANY EXTERNAL STORAGE',
      'VIEW QUERY TEXT',
      'VIEW ROLE'
    ]
  },
  database: {
    storedPrivilegesQuery: `select p.grantee, p.privilege_type, p.is_grantable
      from information_schema.database_privileges p
      join (
        select name as database_name
        from sys.database
        where database_id = {{id}}
      ) names
      on names.database_name = p.database_name
    `,
    privileges: [
      'ALTER ANY SCHEMA',
      'BACKUP',
      'BULK LOAD',
      'CONNECT',
      'CREATE',
      'DROP ANY SCHEMA',
      'EXPLAIN QUERY',
      'RESTORE',
      'TEMPORARY',
      'TRACE QUERY',
      'VIEW QUERY TEXT'
    ]
  },
  cluster: {
    storedPrivilegesQuery: `select p.grantee, p.privilege_type, p.is_grantable
      from information_schema.cluster_privileges p
      where p.cluster_name = '{{id}}'`,
    privileges: [
      'ALTER CLUSTER',
      'DROP CLUSTER',
      'USAGE'
    ]
  },
  schema: {
    storedPrivilegesQuery: `select p.grantee, p.privilege_type, p.is_grantable
      from information_schema.schemata_privileges p
      join (
        select name as schema_name
        from sys.schema
        where schema_id = {{id}}
      ) names
      on names.schema_name = p.schema_name
    `,
    privileges: [
      'CREATE',
      'USAGE'
    ]
  },
  table: {
    storedPrivilegesQuery: `select p.grantee, p.privilege_type, CASE WHEN p.is_grantable = 'YES' THEN TRUE ELSE FALSE END AS is_grantable
      from information_schema.table_privileges p
      join (
        select s.name as table_schema, t.name as table_name
        from sys.vt_table_info t
        join sys.schema s
        on t.schema_id = s.schema_id AND t.database_id = s.database_id
        where t.table_id = {{id}}
      ) names
      on names.table_schema = p.table_schema
      and names.table_name = p.table_name
    `,
    privileges: [
      'DELETE',
      'INSERT',
      'REFERENCES',
      'SELECT',
      'TRUNCATE',
      'UPDATE'
    ]
  },
  view: {
    storedPrivilegesQuery: `select p.grantee, p.privilege_type, CASE WHEN p.is_grantable = 'YES' THEN TRUE ELSE FALSE END AS is_grantable
      from information_schema.table_privileges p
      join (
        select s.name as table_schema, v.name as table_name
        from sys.view v
        join sys.schema s
        on v.schema_id = s.schema_id AND v.database_id = s.database_id
        where v.view_id = {{id}}
      ) names
      on names.table_schema = p.table_schema
      and names.table_name = p.table_name
      and p.privilege_type in ('SELECT')
    `,
    privileges: [
      'SELECT'
    ],
    typeOverride: 'table'
  },
  procedure: {
    storedPrivilegesQuery: `select p.grantee, p.privilege_type, CASE WHEN p.is_grantable = 'YES' THEN TRUE ELSE FALSE END AS is_grantable
      from information_schema.routine_privileges p
      join (
        select s.name as routine_schema, p.name as routine_name
        from sys.procedure p
        join sys.schema s
        on p.schema_id = s.schema_id AND p.database_id = s.database_id
        where p.procedure_id = {{id}}
      ) names
      on names.routine_schema = p.routine_schema
      and names.routine_name = p.routine_name
    `,
    privileges: [
      'EXECUTE'
    ],
    typeOverride: 'function'
  },
  sequence: {
    storedPrivilegesQuery: `select p.grantee, p.privilege_type, p.is_grantable
      from information_schema.sequence_privileges p
      join (
        select s.name as sequence_schema, t.name as sequence_name
        from sys.sequence t
        join sys.schema s
        on t.schema_id = s.schema_id AND t.database_id = s.database_id
        where t.sequence_id = {{id}}
      ) names
      on names.sequence_schema = p.sequence_schema
      and names.sequence_name = p.sequence_name
    `,
    privileges: [
      'USAGE',
      'SELECT',
      'UPDATE'
    ]
  },
  role: {
    storedPrivilegesQuery: `select grantee, role_name as privilege_type, CASE WHEN is_grantable = 'YES' THEN TRUE ELSE FALSE END AS is_grantable
      from information_schema.applicable_roles
      where grantee = (select name from sys.role where role_id = {{id}} union select name from sys.user where user_id = {{id}})
    `,
    privileges: [
      'USAGE',
      'MEMBER'
    ]
  },
  member: {
    storedPrivilegesQuery: `select grantee, role_name as privilege_type, CASE WHEN is_grantable = 'YES' THEN TRUE ELSE FALSE END AS is_grantable
      from information_schema.applicable_roles
      where role_name = (select name from sys.role where role_id = {{id}} union select name from sys.user where user_id = {{id}})
    `,
    privileges: [
      'USAGE',
      'MEMBER'
    ]
  },
  external_format: {
    storedPrivilegesQuery: `select p.grantee, p.privilege_type, p.is_grantable
      from information_schema.external_format_privileges p
      join (
        select name as external_format_name
        from sys.external_format
        where external_format_id = {{id}}
      ) names
      on names.external_format_name = p.external_format_name
    `,
    privileges: [
      'ALTER EXTERNAL FORMAT',
      'DROP EXTERNAL FORMAT',
      'USAGE EXTERNAL FORMAT'
    ]
  },
  external_storage: {
    storedPrivilegesQuery: `select p.grantee, p.privilege_type, p.is_grantable
      from information_schema.external_storage_privileges p
      join (
        select name as external_storage_name
        from sys.external_storage
        where external_storage_id = {{id}}
      ) names
      on names.external_storage_name = p.external_storage_name
    `,
    privileges: [
      'ALTER EXTERNAL STORAGE',
      'DROP EXTERNAL STORAGE',
      'USAGE EXTERNAL STORAGE'
    ]
  },
  external_location: {
    storedPrivilegesQuery: `select p.grantee, p.privilege_type, p.is_grantable
      from information_schema.external_location_privileges p
      join (
        select name as external_location_name
        from sys.external_location
        where external_location_id = {{id}}
      ) names
      on names.external_location_name = p.external_location_name
    `,
    privileges: [
      'ALTER EXTERNAL LOCATION',
      'DROP EXTERNAL LOCATION',
      'USAGE EXTERNAL LOCATION'
    ]
  }
}

class DatabaseObjectService {
  async populate(instanceId, database, options = { full: true }) {
    const { full, skiptables } = options
    if (!instanceId) {
      throw new Error(`Parameter 'instanceId' is required for populate`)
    }
    if (typeof instanceId !== 'string') {
      throw new TypeError(`Parameter 'instanceId' is not valid`)
    }
    const instance = Instance.query().whereId(instanceId).first()
    if (!instance) {
      throw new NotFound(`Could not locate instance: ${instanceId}`)
    }
    if (!instance.online) {
      return true // We can't populate a suspended instance.
    }
    const connected = await connectionService.connect(instance.id)
    if (connected) {
      // Populate the databases.
      let databases = Database.query().where('instance_id', instance.id).get()
      if (!databases.length) {
        databases = (await this.populateDatabases(instance.id))
        Database.insertWithInstance(instance.id, { data: databases })
        databases = Database.query().where('instance_id', instance.id).get()
      }
      if (!database) {
        return true
      }

      // Examine database object; is it populating, or is it fresh enough (cached)?
      const databaseObject = databases.find(d => d.name === database)
      if (!databaseObject) {
        throw new NotFound('Could not locate database: ' + database)
      }
      const { database_id } = databaseObject
      if (databaseObject.populating) {
        false && console.log('NOT populating already populating', database)
        return true // we are populating already.
      }
      if (!!databaseObject.populated && +new Date() < databaseObject.populated + CACHE_EXPIRATION) {
        false && console.log('NOT populating already populated', database, 'cache expires', new Date(databaseObject.populated + CACHE_EXPIRATION))
        return true // we haven't expired our cache.
      }
      await Database.update({
        where: [instance.id, database_id],
        data: {
          populating: true
        }
      })
      try {
        false && console.log('populating', database, database_id)

        // Populate the objects.
        await connectionService.connect(instance.id)
        const [
          roles,
          schemas,
          tables,
          views,
          sequences,
          funcs,
          procedures,
          storages,
          locations,
          formats,
          clusters
        ] = await Promise.all([
          this.populateRoles(database),
          this.populateSchemas(database, database_id),
          !skiptables ? this.populateTables(database, database_id, 0, 0) : Promise.resolve([]),
          this.populateViews(database, database_id),
          this.populateSequences(database, database_id),
          full ? this.populateFunctions(database, database_id) : Promise.resolve([]),
          this.populateProcedures(database, database_id),
          full ? this.populateStorages(database, database_id) : Promise.resolve([]),
          full ? this.populateLocations(database, database_id) : Promise.resolve([]),
          full ? this.populateFormats(database, database_id) : Promise.resolve([]),
          this.populateClusters(database, instance),
          jolokiaService.flush()
        ])

        // Populate the store.
        await Promise.all([
          !roles ? Promise.resolve([]) : Role.insertWithInstance(instance.id, { data: roles }, null, true),
          Schema.insertWithInstance(instance.id, { data: schemas }, database_id, true),
          Table.insertWithInstance(instance.id, { data: tables }, database_id, true),
          View.insertWithInstance(instance.id, { data: views }, database_id, true),
          Sequence.insertWithInstance(instance.id, { data: sequences }, database_id, true),
          Function.insertWithInstance(instance.id, { data: funcs }, database_id, true),
          Procedure.insertWithInstance(instance.id, { data: procedures }, database_id, true),
          Storage.insertWithInstance(instance.id, { data: storages }),
          Location.insertWithInstance(instance.id, { data: locations }),
          Format.insertWithInstance(instance.id, { data: formats }),
          Cluster.insertWithInstance(instance.id, { data: clusters })
        ])

        // We are now populated.  Update populated state.
        await Database.update({
          where: [instance.id, database_id],
          data: {
            populated: full ? +new Date() : 0
          }
        })
      } finally {
        // Done populating.
        await Database.update({
          where: [instance.id, database_id],
          data: {
            populating: false
          }
        })
      }
    }

    return true
  }

  async repopulate(instanceId, database) {
    // Clean out the data model for this database.
    const databaseObject = Database.query().where('instance_id', instanceId).where('name', database).first()
    if (!!databaseObject && !!databaseObject.populated) {
      Database.update({
        where: [instanceId, databaseObject.database_id],
        data: {
          populated: 0
        }
      })

      // Remove references to this database.
      if (false) { // Database objects
        Schema.delete(item => item.instance_id === instanceId && item.database_id === databaseObject.database_id)
        Table.delete(item => item.instance_id === instanceId && item.database_id === databaseObject.database_id)
        View.delete(item => item.instance_id === instanceId && item.database_id === databaseObject.database_id)
        Sequence.delete(item => item.instance_id === instanceId && item.database_id === databaseObject.database_id)
        Function.delete(item => item.instance_id === instanceId && item.database_id === databaseObject.database_id)
        Procedure.delete(item => item.instance_id === instanceId && item.database_id === databaseObject.database_id)
      }
      if (false) { // Global objects
        Role.delete(item => item.instance_id === instanceId)
        Storage.delete(item => item.instance_id === instanceId)
        Location.delete(item => item.instance_id === instanceId)
        Format.delete(item => item.instance_id === instanceId)
        Cluster.delete(item => item.instance_id === instanceId)
      }
    }

    // Issue populate.
    try {
      return await this.populate(instanceId, database)
    } catch (e) {
      if (!(e instanceof NotFound)) {
        throw e
      }
    }
  }

  async evict(instanceId) {
    const databaseObjects = Database.query().where('instance_id', instanceId).get()
    await Promise.all([
      Role.delete(item => item.instance_id === instanceId),
      Schema.delete(item => item.instance_id === instanceId),
      Table.delete(item => item.instance_id === instanceId),
      View.delete(item => item.instance_id === instanceId),
      Sequence.delete(item => item.instance_id === instanceId),
      Function.delete(item => item.instance_id === instanceId),
      Procedure.delete(item => item.instance_id === instanceId),
      Storage.delete(item => item.instance_id === instanceId),
      Location.delete(item => item.instance_id === instanceId),
      Format.delete(item => item.instance_id === instanceId),
      Cluster.delete(item => item.instance_id === instanceId),
      ...databaseObjects.map(databaseObject => Database.update({
        where: [instanceId, databaseObject.database_id],
        data: {
          populated: 0
        }
      })
      )
    ])
  }

  async populateTableSummary(instance_id, database, schema_id, limit = undefined, offset = undefined, sort = undefined, filter = undefined, count = false) {
    // Load and format initial sql with inner table.
    let sql = queryTables.replace(/{{columns}}/g, count ? 'count(*)' : '*') + '\n' +
      `and schema_id = ${schema_id}\n`

    // Add filter.
    if (filter) {
      if (typeof filter === 'object') {
        const mappedFilter = {}
        Object.entries(filter).forEach(([colId, f]) => {
          mappedFilter[colId] = f
        })
        filter = mappedFilter
      }
      const and = []
      parseFilter(and, filter, ['name', 'owner_name'])
      sql += and.join('\n')
    }

    // Add sort.
    const orderBy = []
    if (!count && !!sort) {
      sort.forEach(({ colId, sort }) => {
        orderBy.push(`${colId} ${sort} nulls last`)
      })
    }
    if (orderBy.length > 0) {
      sql += `order by ${orderBy.join(', ')}\n`
    }

    // Add offset/limit.
    if (typeof limit !== 'undefined') {
      sql += `limit ${limit}\n`
    }
    if (typeof offset !== 'undefined') {
      sql += `offset ${offset}\n`
    }

    const response = await this.sql(instance_id, database, sql)
    return count
      ? response.rows[0].count
      : Object.freeze((response.rows || []).map((table) => {
        table.table_id = table.id
        table.__type = 'table'
        table._owner_name = table.owner_name
        return table
      }))
  }

  async populateSchemaSummary(instance_id, database, database_id) {
    const sql = querySchema
    const response = await this.sql(instance_id, database, sql)
    return response.rows.map((row) => {
      row.database_id = database_id
      return row
    })
      .filter(Schema.filterCommon)
  }

  async populateTablesRowstoreData(instance_id, database_id) {
    const connected = connectionService.connect(instance_id)
    if (connected) {
      const args = {
        where: {
          database_id,
          or: {
            'rowstore_bytes:>': 0,
            'rowstore_row_count:>': 0
          }
        },
        columns: [
          'table_id',
          'rowstore_row_count',
          'rowstore_bytes'
        ]
      }
      const response = await jolokiaService.execute(mbeans.sysTable, 'retrieve', [args])
      return Object.freeze(functions.indexBy(response.rows || [], 'table_id'))
    } else {
      return {}
    }
  }

  async populateDatabases(instanceId) {
    const connected = connectionService.connect(instanceId)
    if (connected) {
      const sql = `
        select d.*, d.rows_columnstore + d.rows_rowstore as rows, coalesce(u.name, r.name) || '' as _owner_name
          , has_database_privilege(current_user_id(), d.database_id, 'CREATE') as can_create
        from sys.database d
        left outer join sys.user u on u.user_id = d.owner_id
        left outer join sys.role r on r.role_id = d.owner_id
        where has_database_privilege(current_user_id(), d.database_id, 'connect') = true`
      const response = await this.sql(instanceId, 'yellowbrick', sql)
      return (response.rows || [])
    }
  }

  async populateSchemas(database, database_id) {
    if (!database) {
      return []
    }
    const response = await jolokiaService.execute(
      mbeans.sysSchema,
      'retrieve',
      [
        {
          columns: ['database_id', 'schema_id', 'owner_id', 'name'],
          where: {
            'name:not in': ['pg_catalog'],
            database_id
          },
          options: {
            database
          }
        }
      ]
    )
    return (response.rows || [])
      .map((schema) => {
        schema.populated = true
        return schema
      })
  }

  async populateRoles(database, allowSystemRoles) {
    if (!database) {
      return []
    }
    const usersResponse = jolokiaService.execute(mbeans.datasources, 'queryEx', [database, `select user_id, name, pg_has_role(name, 'MEMBER') as is_member, pg_has_role(name, 'MEMBER WITH ADMIN OPTION') or has_system_privilege('ALTER ANY ROLE') as can_admin from sys.user where (name = current_user or user_id > ${allowSystemRoles ? 0 : 16384})`, null, 0])
    const rolesResponse = jolokiaService.execute(mbeans.datasources, 'queryEx', [database, `select role_id, name, pg_has_role(name, 'MEMBER') as is_member, pg_has_role(name, 'MEMBER WITH ADMIN OPTION') or has_system_privilege('ALTER ANY ROLE') as can_admin from sys.role where role_id > ${allowSystemRoles ? 0 : 16384}`, null, 0])
    const results = await Promise.all([usersResponse, rolesResponse])
    const users = (results[0].rows || []).map((r) => {
      return {
        ...r,
        id: r.user_id,
        __type: 'user'
      }
    }).filter(r => !r.name !== 'cdwmdiscsrva')
    const roles = (results[1].rows || []).map((r) => {
      return {
        ...r,
        id: r.role_id,
        __type: 'role'
      }
    }).filter(r => !r.name.match(/^sys_ybd_smc/))
    return [...users, ...roles]
  }

  async populateTables(database, database_id, limit = 0, offset = 0) {
    if (!database) {
      return []
    }
    const args = {
      where: { database_id },
      options: {
        database
      }
    }
    args.columns = ['database_id', 'schema_id', 'table_id', 'owner_id', 'name']

    if (limit) {
      args.limit = limit
    }
    if (offset) {
      args.offset = offset
    }
    const response = await jolokiaService.execute(mbeans.sysTable, 'retrieve', [args])
    return (response.rows || [])
  }

  async populateViews(database, database_id) {
    if (!database) {
      return []
    }
    const response = await jolokiaService.execute(mbeans.datasources, 'queryEx', [database, `select database_id, schema_id, view_id, owner_id, name from sys.view where database_id = ${database_id}`, null, 0])
    return (response.rows || [])
  }

  async populateSequences(database, database_id) {
    if (!database) {
      return []
    }
    const response = await jolokiaService.execute(mbeans.datasources, 'queryEx', [database, `select * from sys.sequence where database_id = ${database_id}`, null, 0])
    return (response.rows || [])
      .map((sequence) => {
        const parts = sequence.fullname?.split(/\./g)
        if (parts?.length === 2) {
          [sequence.schema, sequence.name] = parts
        } else {
          sequence.schema = 'public'
          sequence.name = sequence.fullname
        }
        return sequence
      })
  }

  async populateFunctions(database, database_id) {
    if (!database) {
      return []
    }
    // Source: ybsql --echo-hidden and \dfp
    const functionsSql = `
      SELECT
        p.oid as "function_id",
        p.proowner as "owner_id",
        ${database_id} as "database_id",
        current_database() as "database",
        n.oid as "schema_id",
        n.nspname as "schema",
        p.proname as "name",
        CASE
          WHEN p.proisagg THEN 'agg'
          WHEN p.proiswindow THEN 'window'
          WHEN p.prosp THEN 'stored procedure'
          WHEN p.prorettype = 'pg_catalog.trigger'::pg_catalog.regtype THEN 'trigger'
          ELSE 'normal'
        END as "function_type",
        pg_catalog.pg_get_function_result(p.oid) as "result_type",
        pg_catalog.pg_get_function_arguments(p.oid) as "argument_types"
      FROM pg_catalog.pg_proc p
          LEFT JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace
      WHERE pg_catalog.pg_function_is_visible(p.oid)
            AND n.nspname <> 'pg_catalog'
            AND n.nspname <> 'information_schema'
            AND NOT p.prosp`
    const response = await jolokiaService.execute(mbeans.datasources, 'queryEx', [database, functionsSql, null, 0])
    return (response.rows || []).map((_) => {
      _.function_id = _.oid
      _.owner_id = _.proowner
      _.schema = _.nspname
      _.name = _.proname
      return _
    })
  }

  async populateProcedures(database, database_id) {
    if (!database) {
      return []
    }
    const proceduresSql = `
      select
        p.*,
        case
          when current_database_id() = p.database_id then pg_catalog.pg_get_function_result(p.procedure_id)
          else null
        end as "result_type",
        case
          when current_database_id() = p.database_id then pg_catalog.pg_get_function_arguments(p.procedure_id)
          else null
        end as "argument_types"
      from sys.procedure p`
    const response = await jolokiaService.execute(mbeans.datasources, 'queryEx', [database, proceduresSql, null, 0])
    return (response.rows || [])
  }

  async populateStorages(database, database_id) {
    if (!database) {
      return []
    }
    const sql = `select *
      , has_external_storage_privilege(external_storage_id, 'ALTER EXTERNAL STORAGE') or has_system_privilege('DROP ANY EXTERNAL STORAGE') as can_alter
      , has_external_storage_privilege(external_storage_id, 'DROP EXTERNAL STORAGE') or has_system_privilege('DROP ANY EXTERNAL STORAGE') as can_drop
      , has_external_storage_privilege(external_storage_id, 'USAGE EXTERNAL STORAGE') or has_system_privilege('USAGE ANY EXTERNAL STORAGE') as can_usage
    from sys.external_storage`
    const response = await jolokiaService.execute(mbeans.datasources, 'queryEx', [database, sql, null, 0])
    return (response.rows || [])
  }

  async populateLocations(database, database_id) {
    if (!database) {
      return []
    }
    const sql = `select *
      , has_external_location_privilege(external_location_id, 'ALTER EXTERNAL LOCATION') or has_system_privilege('DROP ANY EXTERNAL LOCATION') as can_alter
      , has_external_location_privilege(external_location_id, 'DROP EXTERNAL LOCATION') or has_system_privilege('DROP ANY EXTERNAL LOCATION') as can_drop
      , has_external_location_privilege(external_location_id, 'USAGE EXTERNAL LOCATION') or has_system_privilege('USAGE ANY EXTERNAL LOCATION') as can_usage
    from sys.external_location`
    const response = await jolokiaService.execute(mbeans.datasources, 'queryEx', [database, sql, null, 0])
    return (response.rows || [])
  }

  async populateFormats(database, database_id) {
    if (!database) {
      return []
    }
    const sql = `select *
      , has_external_format_privilege(external_format_id, 'ALTER EXTERNAL FORMAT') or has_system_privilege('DROP ANY EXTERNAL FORMAT') as can_alter
      , has_external_format_privilege(external_format_id, 'DROP EXTERNAL FORMAT') or has_system_privilege('DROP ANY EXTERNAL FORMAT') as can_drop
      , has_external_format_privilege(external_format_id, 'USAGE EXTERNAL FORMAT') or has_system_privilege('USAGE ANY EXTERNAL FORMAT') as can_usage
    from sys.external_format`
    const response = await jolokiaService.execute(mbeans.datasources, 'queryEx', [database, sql, null, 0])
    return (response.rows || [])
  }

  async populateClusters(database, instance) {
    if (!database) {
      return []
    }
    const versionPrior62 = parseFloat(instance?.softwareVersion || '6.1') < 6.2
    const result = await jolokiaService.execute(mbeans.datasources, 'queryEx', ['yellowbrick', `select
        cluster_id
      , cluster_name
      , nodes
      , default_wlm_profile_name
      , active_wlm_profile_name
      , hardware_instance_type_id
      , ${versionPrior62 ? 'null' : 'error_message'} as error_message
      , is_default_cluster
      , auto_suspend
      , auto_resume
      , max_spill_pct
      , max_cache_pct
      , type
      , state
      , has_cluster_privilege(cluster_name, 'ALTER CLUSTER') or has_system_privilege('ALTER ANY CLUSTER') as can_alter
      , has_cluster_privilege(cluster_name, 'DROP CLUSTER') or has_system_privilege('DROP ANY CLUSTER') as can_drop
      , has_cluster_privilege(cluster_name, 'USAGE') or has_system_privilege('USAGE ANY CLUSTER') as can_usage
      from sys.cluster`, 'ym', 0])
    return (!!result && !!result.rows && !!result.rows.length && result.rows) || []
  }

  async populateHardwareInstanceTypes(database) {
    if (!database) {
      return []
    }
    // NB: the sys.hardware_instance_type view actually talks to K8S CRD, so if its slow we want a request all by itself,
    // hence the flush() here.
    jolokiaService.start()
    jolokiaService.flush()
    const hwTypesQuery = jolokiaService.execute(mbeans.datasources, 'queryEx', ['yellowbrick',
    `select t.*, coalesce(u.max_nodes, -1) as max_nodes, coalesce(u.requested_reserved_nodes, -1) as reserved_nodes, coalesce(u.current_nodes, 0) as used_nodes
    from sys.hardware_instance_type t
    left outer join sys.hardware_instance_usage u
    on u.hardware_instance_type_id = t.hardware_instance_type_id
    `, 'ym', 0])
    jolokiaService.flush()
    let result = null
    try {
      result = await hwTypesQuery
    } catch (error) {

    }
    return (!!result && !!result.rows && !!result.rows.length && result.rows) || []
  }

  async populateWlm(instanceId) {
    const instance = Instance.query().whereId(instanceId).first()
    if (!instance) {
      throw new NotFound(`Could not locate instance: ${instanceId}`)
    }
    if (instance.isInstancePopulatingWLM) {
      return
    }
    if (!instance.online) {
      return
    }
    try {
      // Set populating flag.
      await Instance.update({
        where: instanceId,
        data: {
          isInstancePopulatingWLM: true
        }
      })

      // Retrieve WLM config.
      const connected = connectionService.connect(instance.id)
      if (connected) {
        const sql = `
          select p.*, pending_count from sys.wlm_pending_profile p left outer join (select profile, count(*) as pending_count from sys.wlm_pending group by 1) pend on pend.profile = p.name;
          select * from sys.wlm_pending_pool;
          select * from sys.wlm_pending_rule;
          select array_agg(cluster_name) as clusters, active_wlm_profile_name from sys.cluster group by active_wlm_profile_name`
        const result = await databaseObjectService.sql(instance.id, 'yellowbrick', sql)
        if (result && !result.error) {
          const wlmUsageMap = functions.indexBy(result.rowSets[2].rows, 'active_wlm_profile_name')
          WLMProfile.insertWithInstance(instanceId, {
            data: result.rows.map(row => ({
              ...row,
              ...{
                clusters: (wlmUsageMap[row.name]?.clusters || []).join(', ')
              }
            }))
          })
          WLMResourcePool.insertWithInstance(instanceId, {
            data: result.rowSets[0].rows.map(row => ({
              ...row
            }))
              .filter(p => p.name !== 'system') // Oh god no! hack!
          })
          WLMRule.insertWithInstance(instanceId, { data: result.rowSets[1].rows })
        }
      }
    } finally {
      // Set populating flag.
      await Instance.update({
        where: instanceId,
        data: {
          isInstancePopulatingWLM: false
        }
      })
    }
  }

  async getViewDefinition(database, database_id, view_id) {
    const response = await jolokiaService.execute(mbeans.datasources, 'queryEx', [database, `select definition from sys.view where database_id = ${database_id} and view_id = ${view_id}`, null, 0])
    return (response.rows || [])[0].definition
  }

  async getComment(database, id) {
    if (!database) {
      return ''
    }
    const response = await jolokiaService.execute(mbeans.datasources, 'queryEx', [database, `select description from pg_catalog.pg_description where objoid = ${id}`, null, 0])
    return response.rows[0]?.description
  }

  async setComment(database, type, thing, comment) {
    await jolokiaService.execute(mbeans.datasources, 'queryEx', [database, `COMMENT ON ${type} ${thing} IS '${comment}'`, null, 0])
  }

  async getRelationships(url, database, schema_id) {
    const getTablesSql = `
      select t.table_id, t.schema_id, t.name, s.name || '' as schema_name
      from sys.vt_table_info t
      join sys.schema s using (schema_id)
      where t.table_id in (
        select conrelid
        from pg_catalog.pg_constraint
        where connamespace = ${schema_id}

        union

        select confrelid
        from pg_catalog.pg_constraint
        where connamespace = ${schema_id}
      )
    `
    const getRelationshipsSql = `
      select
        conrelid::bigint + 0 as table_id,
        confrelid::bigint + 0 as target_table_id,
        unnest(conkey) as column_id,
        unnest(confkey) as target_column_id,
        pg_catalog.pg_get_constraintdef(oid, true) as constraint
      from pg_catalog.pg_constraint
      where connamespace = ${schema_id}
    `
    const getColumnsSql = `
      select c.*
      from sys.column c
      where c.table_id in (

        select conrelid
        from pg_catalog.pg_constraint
        where connamespace = ${schema_id}

        union

        select confrelid
        from pg_catalog.pg_constraint
        where connamespace = ${schema_id}
      )
    `
    const remote = [
      jolokiaService.execute(mbeans.datasources, 'queryEx', [database, getTablesSql, null, 0]),
      jolokiaService.execute(mbeans.datasources, 'queryEx', [database, getRelationshipsSql, null, 0]),
      jolokiaService.execute(mbeans.datasources, 'queryEx', [database, getColumnsSql, null, 0])
    ]
    const results = await Promise.all(remote)
    const { rows: tables } = results[0]
    const { rows: relationships } = results[1]
    const { rows: columns } = results[2]
    return { tables, relationships, columns }
  }

  async connect(instanceId) {
    const result = await connectionService.connect(instanceId)
    return result
  }

  async sql(instanceId, database, sql, asyncTimeout, initiatedCallback, pollCallback) {
    if (!instanceId) {
      throw new Error(`Parameter 'instanceId' is required for sql`)
    }
    const connected = await connectionService.connect(instanceId)
    if (connected) {
      let result

      // Do we want async timeout handling?
      if (typeof asyncTimeout === 'number') {
        jolokiaService.flush()
        jolokiaService.timeout(2 * asyncTimeout) // we can wait a long time
        let query = jolokiaService.execute(mbeans.datasources, 'queryExOptions', [database || 'yellowbrick', sql, 'ym', 0, {
          asyncTimeout: typeof pollCallback === 'function' ? 1000 : asyncTimeout // quick poll to let cb have request id if desired
        }])
        jolokiaService.pin()
        jolokiaService.flush()

        // Does caller want signal of start?
        if (typeof initiatedCallback === 'function') {
          initiatedCallback()
        }

        // We long poll the response to this query here until it completes.
        while (true) {
          result = await query
          if (result.requestId) { // Did we get a async request?
            // Did we have a poll callback?
            if (typeof pollCallback === 'function') {
              pollCallback(result.requestId)
            }

            await connectionService.connect(instanceId)
            jolokiaService.flush()
            jolokiaService.timeout(2 * asyncTimeout) // we can wait a long time
            jolokiaService.pin()
            query = jolokiaService.execute(mbeans.datasources, 'pollResponse', [result.requestId, asyncTimeout])
            jolokiaService.flush()
          } else {
            break
          }
        }
      } else {
        result = await jolokiaService.execute(mbeans.datasources, 'queryEx', [database || 'yellowbrick', sql, 'ym', 0])
      }

      // For this type of invocation, any error gets thrown.
      if (result && result.error) {
        throw new ResponseError(result)
      }

      return result
    }

    return { rows: [], errors: ['could not connect'] }
  }

  async cancelRequest(instanceId, requestId) {
    const connected = await connectionService.connect(instanceId)
    if (connected) {
      await jolokiaService.execute(mbeans.datasources, 'cancelRequest', [requestId])
    }
  }

  async requestActive(instanceId, requestId) {
    const connected = await connectionService.connect(instanceId)
    if (connected) {
      return await jolokiaService.execute(mbeans.datasources, 'requestActive', [requestId])
    } else {
      return false
    }
  }

  async getClusterUsage(instanceId, database, interval) {
    const sql = queryClusterUsage.replace(/{{interval}}/g, interval)
    return await this.sql(instanceId, database, sql)
  }

  /**
   * Given an object type and id, return the privileges a given user has capability for,
   * whether explicitly defined or defined by role inheritance.  This returns the
   * "effective privileges", including indicator of which privileges can be granted to others.
   *
   * The shape of returned tuples is [{privilege_type, granted, is_grantable}, ...]
   * @param {Instance} instance - the instance
   * @param {string} database - the database to issue the query within
   * @param {string} type - the type of object (eg. system, database, table, etc.)
   * @param {number|array} idOrIds - the identifier(s) of the object (can be nullish/missing for 'system')
   * @param {number} roleOrUserId - the identifier of the user or role being queries or nullish/missing for current logged in user
   * @returns grants for given object or empty array
   */
  async getRoleObjectPrivileges(instanceId, database, type = 'system', idOrIds = 0, roleOrUserId = 0) {
    // Get the privileges asked for.
    const objectType = OBJECT_TYPES[type]
    if (!objectType) {
      throw new Error(`Invalid object type: ${type}`)
    }
    const { privileges } = objectType
    type = objectType.typeOverride || type

    // Construct SQL.
    const grantableName = privilege =>
      `${privilege} WITH ${type === 'role' ? 'ADMIN' : 'GRANT'} OPTION`
    const systemClusterPrivilege = (privilege) => {
      switch (privilege) {
        case 'ALTER CLUSTER': return 'ALTER ANY CLUSTER'
        case 'DROP CLUSTER': return 'DROP ANY CLUSTER'
        case 'USAGE': return 'USAGE ANY CLUSTER'
        case 'ALTER CLUSTER WITH GRANT OPTION': return 'ALTER ANY CLUSTER WITH GRANT OPTION'
        case 'DROP CLUSTER WITH GRANT OPTION': return 'DROP ANY CLUSTER WITH GRANT OPTION'
        case 'USAGE WITH GRANT OPTION': return 'USAGE ANY CLUSTER WITH GRANT OPTION'
      }
      throw new Error('Unmapped cluster system privilege: ' + privilege)
    }
    const hasPrivilege = (privilege, idOrIds) => {
      let result = ' '

      result += (Array.isArray(idOrIds) ? idOrIds : [idOrIds]).map((id) => {
        let privilegeSql = ''

        if (typeof id === 'string') {
          id = `'${id}'`
        }
        if (type === 'role') {
          privilegeSql += 'pg_has_role('
          if (roleOrUserId) {
            if (typeof roleOrUserId === 'number') {
              privilegeSql += `${roleOrUserId}, `
            } else {
              privilegeSql += `'${roleOrUserId}', `
            }
          }
          if (id) {
            privilegeSql += `${id}, `
          }
          privilegeSql += `'${privilege}') as "${privilege}"`
        } else {
          privilegeSql += `has_${type}_privilege(`
          if (roleOrUserId) {
            if (typeof roleOrUserId === 'number') {
              privilegeSql += `${roleOrUserId}, `
            } else {
              privilegeSql += `'${roleOrUserId}', `
            }
          }
          if (id) {
            privilegeSql += `${id}, `
          }
          privilegeSql += `'${privilege}')`
          if (type === 'cluster') {
            privilegeSql = `(${privilegeSql} or has_system_privilege('${systemClusterPrivilege(privilege)}'))`
          }
        }
        return privilegeSql
      }).join(' and ')
      result += ` as "${privilege}"`

      return result
    }
    const privilegesSql = `SELECT\n` +
      privileges.map(privilege => hasPrivilege(privilege, idOrIds)).join(',\n') + ',\n' +
      privileges.map(privilege => hasPrivilege(grantableName(privilege), idOrIds)).join(',\n')

    // Make query and return tuples: { privilege_type, granted, is_grantable }
    const {
      rows: [privilegesResult]
    } = await this.sql(instanceId, database, privilegesSql)
    return privileges.map((privilege_type) => {
      return { privilege_type, granted: !!privilegesResult[privilege_type], is_grantable: !!privilegesResult[grantableName(privilege_type)] }
    })
  }

  /**
   * Given an object type and id, return the grants stored.
   *
   * The shape of returned tuples is [{grantee, privilege_type, is_grantable}, ...]
   * @param {Instance} instance - the instance address
   * @param {string} database - the database to issue the query within
   * @param {string} type - the type of object (eg. system, database, table, etc.)
   * @param {number|array} idOrIds - the identifier(s) of the object (can be nullish/missing for 'system')
   * @returns grants for given object(s) or empty array
   */
  async getStoredObjectPrivileges(instanceId, database, type, idOrIds) {
    // Get the object type asked for.
    const objectType = OBJECT_TYPES[type]
    if (!objectType) {
      throw new Error(`Invalid object type: ${type}`)
    }
    const privilegesSql = (Array.isArray(idOrIds) ? idOrIds : [idOrIds]).map(id =>
      objectType.storedPrivilegesQuery.replace(OBJECT_IDENTIFIER_RE, id)
    ).join('\nINTERSECT\n')

    const result = await this.sql(instanceId, database, privilegesSql)
    return result.rows
  }

  getObjectPrivilegeNames(type) {
    const objectType = OBJECT_TYPES[type]
    if (!objectType) {
      throw new Error(`Invalid object type: ${type}`)
    }
    return objectType.privileges
  }

  async connectInstance(instanceId) {
    const instance = Instance.query().whereId(instanceId).first()
    if (!instance) {
      throw new NotFound(`Could not locate instance: ${instanceId}`)
    }
    if (!instance.online) {
      return null // We can't populate a suspended instance.
    }
    const connected = await connectionService.connect(instance.id)
    return connected && instance
  }

  async concurrentQueries(instanceId, startTime, endTime, clusterName) {
    const instance = await this.connectInstance(instanceId)
    if (instance) {
      const sql = queryConcurrentQueries
        .replace(/{{start}}/g, startTime.toISOString())
        .replace(/{{end}}/g, endTime.toISOString())
        .replace(/{{cluster_name}}/g, clusterName)
      const response = await this.sql(instance.id, 'yellowbrick', sql)
      return response.rows
    } else {
      return []
    }
  }

  async poolsAtTime(instanceId, queryId) {
    const instance = await this.connectInstance(instanceId)
    if (instance) {
      const sql = queryPoolsAtTime
        .replace(/{{queryId}}/g, queryId)
      const response = await this.sql(instance.id, 'yellowbrick', sql)
      return response.rows
    } else {
      return []
    }
  }
}

const databaseObjectService = new DatabaseObjectService()
export default databaseObjectService
