import { of as of$, Observable } from 'rxjs'
import { switchMap } from 'rxjs/operators'
import PQueue from 'p-queue'
import EventEmitter from 'eventemitter3'
import { OnlineTimeout, debounce, network } from '@wiz/utils'
import { Q, dbProvider } from '@wiz/store'
import { registerSensorStream, unregisterSensorStream, wizataApi } from '@/api'

class SensorStatistics extends EventEmitter {
  constructor () {
    super()
    this.queue = new PQueue({ concurrency: 1 })
    this.bundle = []
  }

  fetchStatistics = debounce(() => {
    this.queue.add(() => {
      const ids = this.bundle.splice(0)
      if (!ids.length) {
        return Promise.resolve()
      }
      return wizataApi.getSensorStatistics(ids)
        .then((data) => {
          for (const key in data) {
            if (Object.hasOwnProperty.call(data, key)) {
              this.emit(`SensorStatistics:${key}`, data[key])
            }
          }
        })
    })
  }, 100)

  registerStatisticsRequest (id) {
    const idx = this.bundle.indexOf(id)
    if (idx === -1) {
      this.bundle.push(id)
      this.fetchStatistics()
    }
  }

  unregisterStatisticsRequest = (id) => {
    const idx = this.bundle.indexOf(id)
    if (idx !== -1) {
      this.bundle.splice(idx, 1)
      if (!this.bundle.length) {
        this.fetchStatistics.cancel()
      }
    }
  }
}

const sensorStat = new SensorStatistics()
const RealtimeDuration = 5 * 60 * 1000

function _observeSensorValue (sensor) {
  const { id: sensorId, hardwareId } = sensor

  return new Observable((subscriber) => {
    let timer
    let prevValue = null

    const updateValue = (data) => {
      const ts = prevValue?.timestamp ?? 0
      if (data.value !== null && data.timestamp && data.timestamp > ts) {
        prevValue = data
        subscriber.next(data)
      }
    }

    const handleMessage = debounce(({ value, timestamp }) => {
      updateValue({
        value: Number.isFinite(value) ? value : null,
        timestamp: timestamp || null,
        sensorId,
        hardwareId,
      })
    }, 1000)

    const handleStatisticsRequest = () => {
      sensorStat.registerStatisticsRequest(hardwareId)
    }

    const handleUpdateStatistics = (data) => {
      updateValue({
        value: Number.isFinite(data.lastValue) ? data.lastValue : null,
        timestamp: data.lastOccurrence || null,
        sensorId,
        hardwareId,
      })

      timer?.cancel()
      timer = new OnlineTimeout(handleStatisticsRequest, RealtimeDuration)
    }

    sensorStat.addListener(`SensorStatistics:${hardwareId}`, handleUpdateStatistics)
    handleStatisticsRequest()
    registerSensorStream(hardwareId, handleMessage)
    network.on('online', handleStatisticsRequest)
    timer = new OnlineTimeout(handleStatisticsRequest, RealtimeDuration)

    subscriber.next(prevValue)

    return () => {
      handleMessage.cancel()
      unregisterSensorStream(hardwareId, handleMessage)
      sensorStat.removeListener(`SensorStatistics:${hardwareId}`, handleUpdateStatistics)
      sensorStat.unregisterStatisticsRequest(hardwareId)
      network.removeListener('online', handleStatisticsRequest)
      timer?.cancel()
    }
  })
}

export default function observeSensorValue (sensorId) {
  return dbProvider.database.collections.get('sensors')
    .query(Q.where('id', sensorId))
    .observeWithColumns([ 'hardware_id' ])
    .pipe(
      switchMap(items => (items.length ? _observeSensorValue(items[0]) : of$(null))),
    )
}
