import { Observable } from 'rxjs'
import {
  OnlineTimeout,
  RenewStore,
  isEmpty,
  network,
  getDateFrom,
  getDateTo,
} from '@wiz/utils'
import { ServiceChannel, DataSourceRequest } from '@wiz/api'
import { wizataApi } from '@/api'

const RealtimeDuration = 1 * 60 * 1000

export default function observeSensorsData ({
  // conditions,
  dataFilter,
  dataSources,
}, refLast) {
  return new Observable((subscriber) => {
    const sendResult = (data) => {
      refLast.current = data
      subscriber.next(data)
    }
    let timer
    const apiRequests = {}
    const requests = {}
    const responses = new RenewStore()

    const channel = new ServiceChannel((chunk) => {
      const key = Math.random()
      const request = wizataApi.getSensorsData(chunk)
      apiRequests[key] = request
      return request.fetch().finally(() => {
        delete apiRequests[key]
      })
    }, { isDebounce: true, bundleSize: 20 })

    const fetchRequest = () => {
      const nextRequests = []
      const nextResponses = []

      for (const source of dataSources) {
        const request = new DataSourceRequest({
          interval: [
            getDateFrom(dataFilter, true),
            getDateTo(dataFilter, true),
          ],
          source,
        })
        const response = responses.getItem(item => item.isForRequest(request))
        if (response) {
          nextResponses.push(response)
        } else if (!requests[request.hash]) {
          requests[request.hash] = request
          nextRequests.push(request)
        }
      }

      if (nextResponses.length) {
        sendResult(nextResponses)
      }

      if (nextRequests.length) {
        timer?.cancel()
        for (const request of nextRequests) {
          channel.postRequest(request)
        }
      }
    }

    channel.on('message', (data) => {
      responses.setLen(data.length * 2)

      let updated = false
      for (const response of data) {
        if (requests[response.request.hash]) {
          responses.setItem(response)
          updated = true
        }
      }

      for (const response of data) {
        if (requests[response.request.hash]) {
          delete requests[response.request.hash]
        }
      }

      if (updated) {
        sendResult(data)

        if (isEmpty(requests)) {
          timer?.cancel()
          timer = new OnlineTimeout(fetchRequest, RealtimeDuration)
        }
      }
    })

    channel.on('error', (error, data) => {
      for (const request of data) {
        delete requests[request.hash]
      }

      if (isEmpty(requests)) {
        timer?.cancel()
        timer = new OnlineTimeout(fetchRequest, RealtimeDuration)
      }
    })

    network.on('online', fetchRequest)
    fetchRequest()
    sendResult(refLast.current)

    return () => {
      channel.close()
      Object.values(apiRequests).forEach(item => item.abort())
      network.removeListener('online', fetchRequest)
      responses.clearAll()
      timer?.cancel()
    }
  })
}
