import {Credentials, Signer} from '@aws-amplify/core'
import * as Paho from 'paho-mqtt';
import env from '@/env'
import Observable from "zen-observable-ts"
import PushStream from 'zen-push'
import {v4 as uuid} from 'uuid'

export function subscribe<T>(
  topics: string | string[],
  observer: ZenObservable.Observer<T>
) {
  return observe(topics).subscribe(observer)
}

export function observe<T>(
  topics: string | string[]
): Observable<T> {
  let topicsArray: string[] = [].concat(topics)

  const stream = new PushStream()
  getClient()
    .then((client: Paho.Client) => {
      topicsArray.forEach(t => client.subscribe(t))
      console.log('subscribed to', topicsArray)
      topicsArray.forEach(t => subscriptions.push({filter: t, stream, clientId: client.clientId}))
    })
    .catch(err => {
      stream.error(err)
    })

  return stream.observable
}

export async function publish<T>(
  topic: string,
  payload: T
) {
  const client = await getClient()
  console.log('publishing. topic:', topic, 'payload:', JSON.stringify(payload))
  client.send(topic, JSON.stringify(payload))
}

export async function terminateConnection() {
  try {
    const client = await getClient()
    client.disconnect()
  } catch (e) {
    console.error(e)
  }
  pahoClient = undefined
}

let pahoClient: Promise<Paho.Client>
let subscriptions = []
async function getClient(): Promise<Paho.Client> {
  if (pahoClient) return pahoClient
  else pahoClient = makeClient()

  return pahoClient
}

async function makeClient(): Promise<Paho.Client> {
  const url = await makeSignedEndpointUrl()
  const clientId = uuid()

  const client = new Paho.Client(url, clientId);
  client.onMessageArrived = (msg: Paho.Message) => {
    console.log('message received. topic:', msg.destinationName, 'payload:', msg.payloadString)
    subscriptions.filter(x => mqttTopicMatch(x.filter, msg.destinationName))
      .forEach(x => {
        x.stream.next(JSON.parse(msg.payloadString))
      })
  }
  client.onConnectionLost = ({errorCode, errorMessage}) => {
    console.error('socket connection lost', clientId, errorCode, errorMessage)
    pahoClient = undefined
    subscriptions.forEach(x => x.stream.complete())
  }

  await new Promise((resolve, reject) => {
    client.connect({
      useSSL: true,
      mqttVersion: 3,
      onSuccess: () => resolve(client),
      onFailure: reject,
    });
  });

  return client;
}

/* eslint-disable camelcase */
function makeSignedEndpointUrl() {
  return (async () => {
    const endpoint = env.aws.iot.endpointAddress

    const serviceInfo = {
      service: 'iotdevicegateway',
      region: env.aws.region,
    };
    const {
      accessKeyId: access_key,
      secretAccessKey: secret_key,
      sessionToken: session_token,
    } = await Credentials.get();

    return Signer.signUrl(
      endpoint,
      {access_key, secret_key, session_token},
      serviceInfo
    );
  })();
}
/* eslint-enable camelcase */

export function mqttTopicMatch(filter: string, topic: string) {
  const filterArray = filter.split('/');
  const length = filterArray.length;
  const topicArray = topic.split('/');

  for (let i = 0; i < length; ++i) {
    const left = filterArray[i];
    const right = topicArray[i];
    if (left === '#') return topicArray.length >= length;
    if (left !== '+' && left !== right) return false;
  }
  return length === topicArray.length;
}
