import {
  SQSClient as SQSClientAws,
  SendMessageCommand,
  ReceiveMessageCommand,
  DeleteMessageCommand,
  GetQueueAttributesCommand
} from '@aws-sdk/client-sqs'

import { AWS_IDENTITY_POOL_REGION } from '../config/env'
import { delay, getIdToken, resolveCredentials } from '../utils'

const FIRST_DELAY = 20 // 20s
const SECOND_DELAY = 50 // 50s

const TIMEOUT_TO_CHANGE_DELAY = 5 * 60 * 1000 // 5min

const changeDelay = async () => {
  const accessToken = await getIdToken()
  const consumer = await SQSConsumer.getInstance({ accessToken })
  consumer.delay = SECOND_DELAY
  const timer = consumer.timers.shift()

  if (timer) {
    clearTimeout(timer)
  }
}

class InternalSQSConsumer {
  constructor ({
    callback,
    queueUrl,
    delay = FIRST_DELAY,
    credentials,
    region = AWS_IDENTITY_POOL_REGION,
    waitEnd = false,
    sqsClient = undefined
  }) {
    this.region = region
    this.providers = []
    this.delay = delay
    this.queueUrl = queueUrl
    this.callback = callback
    this.waitEnd = waitEnd
    this.running = false
    this.pendingMessages = []
    this.startedTimer = false
    this._credentials = credentials
    this.timers = []

    if (sqsClient) {
      this.client = sqsClient
    } else {
      this.client = new SQSClientAws({
        region: this.region,
        credentials: this._credentials
      })
    }
  }

  addProvider (providerName) {
    if (this.providers.length === 0) {
      this._stopAllTimers()
      this._startDelayTimer()
      this.delay = FIRST_DELAY
    }

    this.providers.push(providerName)
  }

  _someMessageContainsEnd (messages) {
    return messages.some(message => {
      const content = JSON.parse(message.Body)
      return content.action === 'end'
    })
  }

  _endedProviders (messages) {
    return (messages || [])
      .filter(message => {
        const content = JSON.parse(message.Body)
        return content.action === 'end'
      })
      .map(message => message.Attributes?.MessageGroupId)
  }

  async _getAmountOfUndeliveredMessages () {
    const { Attributes } = await this.client.send(
      new GetQueueAttributesCommand({
        QueueUrl: this.queueUrl,
        AttributeNames: ['ApproximateNumberOfMessages']
      })
    )
    return parseInt(Attributes.ApproximateNumberOfMessages)
  }

  async sendMessage ({ queueUrl, message }) {
    const jsonMessage = JSON.stringify(message)

    const response = await this.client.send(
      new SendMessageCommand({
        QueueUrl: queueUrl,
        MessageBody: jsonMessage,
        MessageGroupId: 'MessageGroupId',
        ContentBasedDeduplication: false
      })
    )

    return response
  }

  async _getMessages (maxNumberOfMessages) {
    const response = await this.client.send(
      new ReceiveMessageCommand({
        QueueUrl: this.queueUrl,
        MaxNumberOfMessages: maxNumberOfMessages,
        AttributeNames: ['MessageGroupId']
      })
    )

    const parsedMessages = (response.Messages || []).map(message => {
      const id = message.MessageId
      const receiptHandler = message.ReceiptHandle
      const messageGroupId = message.Attributes?.MessageGroupId
      const content = JSON.parse(message.Body)

      return { id, content, messageGroupId, receiptHandler }
    })
    if (this._someMessageContainsEnd(response.Messages || [])) {
      const endedProviders = this._endedProviders(response.Messages || [])
      const filteredProviders = this.providers.filter(
        provider => !endedProviders.includes(provider)
      )
      this.providers = filteredProviders
    }

    await Promise.allSettled(
      parsedMessages.map(async message =>
        this._ackMessage({
          queueUrl: this.queueUrl,
          receiptHandler: message.receiptHandler
        })
      )
    )

    return parsedMessages
  }

  async consume (maxNumberOfMessages = 10) {
    this.running = true
    while (this.providers.length > 0) {
      await delay(this.delay * 1000)
      const totalUndelivered = await this._getAmountOfUndeliveredMessages()
      const expectedSteps = Math.ceil(totalUndelivered / maxNumberOfMessages)
      const steps = Math.min(expectedSteps, 10)

      console.debug(`mensagens: ${totalUndelivered}, steps: ${steps}`)
      for (let i = 0; i < steps; i += maxNumberOfMessages) {
        const messages = await this._getMessages(maxNumberOfMessages)
        this.pendingMessages.push(...messages)
      }

      if (this.callback && this.pendingMessages.length > 0 && !this.waitEnd) {
        if (!this.startedTimer) {
          this._startDelayTimer()
        }
        this.callback(this.pendingMessages)
      }
    }

    if (this.pendingMessages.length > 0 && this.callback && this.waitEnd) {
      this.callback(this.pendingMessages)
    }

    this.running = false
  }

  _startDelayTimer () {
    this.startedTimer = true
    const timeoutId = setTimeout(changeDelay, TIMEOUT_TO_CHANGE_DELAY)
    this.timers.push(timeoutId)
  }

  _stopAllTimers () {
    this.timers.forEach(timer => clearTimeout(timer))
    this.timers = []
  }

  stop () {
    this.providers = []
    this._stopAllTimers()
    this.startedTimer = false
    this.running = false
  }

  async _ackMessage ({ queueUrl, receiptHandler }) {
    try {
      const response = await this.client.send(
        new DeleteMessageCommand({
          QueueUrl: queueUrl,
          ReceiptHandle: receiptHandler
        })
      )

      return response
    } catch (err) {
      console.error(err)
    }
  }
}

export class SQSConsumer {
  static _instance
  static _accessToken
  static _credentials

  static async _resolveCredentials () {
    if (!SQSConsumer._accessToken) {
      throw new Error('Access token is required')
    }

    const credentials = await resolveCredentials({
      accessToken: SQSConsumer._accessToken
    })

    SQSConsumer._credentials = credentials
  }

  static _resolveInstance ({ waitEnd }) {
    if (!SQSConsumer._accessToken) {
      throw new Error('Access token is required')
    }

    if (!SQSConsumer._instance) {
      SQSConsumer._instance = new InternalSQSConsumer({
        queueUrl: undefined,
        delay: FIRST_DELAY,
        waitEnd,
        credentials: SQSConsumer._credentials
      })
    } else {
      SQSConsumer._instance.client = new SQSClientAws({
        region: SQSConsumer._instance.region,
        credentials: SQSConsumer._credentials
      })
    }
  }

  static async getInstance ({ accessToken, waitEnd }) {
    const isNewToken =
      !SQSConsumer._accessToken || SQSConsumer._accessToken !== accessToken
    if (isNewToken) {
      SQSConsumer._accessToken = accessToken
      await SQSConsumer._resolveCredentials()
    }

    const isCredentialsExpired =
      SQSConsumer._credentials?.expiration &&
      SQSConsumer._credentials.expiration.getTime() <= Date.now()
    if (isCredentialsExpired) {
      await SQSConsumer._resolveCredentials()
    }

    console.log(
      'SQS:',
      'isNewToken',
      isNewToken,
      'isCredentialsExpired',
      isCredentialsExpired,
      SQSConsumer._credentials?.expiration
    )

    const isNewInstanceRequired = isNewToken || isCredentialsExpired

    if (!SQSConsumer._instance || isNewInstanceRequired) {
      SQSConsumer._resolveInstance({ waitEnd })
    }

    return SQSConsumer._instance
  }
}
