import {observe, publish} from "@/deviceShadow/mqtt"
import {tap, spread} from '@/libs/utils'
import {Shadow} from './Shadow.d'
import Observable from "zen-observable-ts"

export interface ShadowHandler<T> {
  refresh(): Promise<void>
  ready: Promise<void>
  desire(shadowPart: Partial<T>, requestId?: string): Promise<void>
  onDesiredUpdated(cb: (shadowPart: Partial<T>, requestId?: string) => void): ZenObservable.Subscription
  onReportedUpdated(cb: (shadow: T, requestId?: string) => void): ZenObservable.Subscription
  onError(cb: (e: any) => void): ZenObservable.Subscription
  onClose(cb: () => void): ZenObservable.Subscription
  close(): void
}

export function createClient<T>(thingName: string): ShadowHandler<T> {
  const subscriptions: Array<ZenObservable.Subscription> = []

  const shadowTopicBase = `$aws/things/${thingName}/shadow`
  const mqttObservable: Observable<Shadow<T>> = observe([
    `${shadowTopicBase}/get/accepted`,
    `${shadowTopicBase}/update/accepted`,
    `${shadowTopicBase}/name/pets/get/accepted`,
    `${shadowTopicBase}/name/pets/update/accepted`,
    `${shadowTopicBase}/name/components/get/accepted`,
    `${shadowTopicBase}/name/components/update/accepted`,
  ])
  const desiredObservable = mqttObservable
    .filter(payload => !!payload.state.desired)
    .map<[Partial<T>, string]>(payload => ([payload.state.desired, payload.clientToken]))
  const reportedObservable = mqttObservable
    .filter(payload => !!payload.state.reported)
    .map<[T, string]>(payload => ([payload.state.reported, payload.clientToken]))

  const refresh = async () => {
    await Promise.all([
      assureGetResponseReceived(shadowTopicBase),
      assureGetResponseReceived(`${shadowTopicBase}/name/pets`),
      assureGetResponseReceived(`${shadowTopicBase}/name/components`),
    ])

    async function assureGetResponseReceived(base) {
      const interval = setInterval(() => publish(`${base}/get`, {}), 3000)
      let completed = false
      let subscription

      await Promise.race([
        new Promise((res) => {
          subscription = observe<Shadow<any>>(`${base}/get/accepted`).subscribe(payload => {
            if (payload.state.reported) {
              clearInterval(interval)
              if (!completed) res()
              completed = true
              subscription.unsubscribe()
            }
          })
        }),
        new Promise((_, rej) => setTimeout(() => {
          clearInterval(interval)
          if (!completed) rej(new Error("timeout"))
          completed = true
          subscription && subscription.unsubscribe()
        }, 30100))
      ])
    }
  }

  const ready = refresh()

  return {
    /**
     *
     * Refreshes shadow state from the broker.
     * The function repeatedly publishes a message to the `get` topic, since this function is usually called
     * when connections are set up and topic subscription may not be finished (they return void but take a
     * while anyway)
     *
     * TODO: timeout
     */
    refresh,
    ready,
    async desire(shadowPart_: Partial<T>, requestId = null) {
      await ready

      const shadowPart = {...shadowPart_}

      if ('pets' in shadowPart) {
        await publish(`${shadowTopicBase}/name/pets/update`, {
          state: {desired: {pets: shadowPart['pets']}},
          ...requestId && {clientToken: requestId}
        })
        delete shadowPart['pets']
      }
      if ('components' in shadowPart) {
        await publish(`${shadowTopicBase}/name/components/update`, {
          state: {desired: {components: shadowPart['components']}},
          ...requestId && {clientToken: requestId}
        })
        delete shadowPart['components']
      }

      await publish(`${shadowTopicBase}/update`, {
        state: {desired: shadowPart},
        ...requestId && {clientToken: requestId}
      })
    },
    onDesiredUpdated(cb) {
      return tap(s => subscriptions.push(s), desiredObservable.subscribe(spread(cb), () => {}))
    },
    onReportedUpdated(cb) {
      return tap(s => subscriptions.push(s), reportedObservable.subscribe(spread(cb), () => {}))
    },
    onError(cb) {
      return tap(s => subscriptions.push(s), mqttObservable.subscribe({error: cb}))
    },
    onClose(cb): ZenObservable.Subscription {
      return tap(s => subscriptions.push(s), mqttObservable.subscribe({complete: cb}))
    },
    close() {
      subscriptions.map(s => s.unsubscribe())
    }
  }
}
