import { RSocket, RSocketConnector } from 'rsocket-core'
import { WebsocketClientTransport } from 'rsocket-websocket-client'
import {
  encodeCompositeMetadata,
  encodeRoute,
  encodeBearerAuthMetadata,
  WellKnownMimeType,
} from 'rsocket-composite-metadata'
import { Buffer } from 'buffer'
import { useAuth } from '@/auth'
import useNotifications from '@/composables/useNotifications'
import { SaiftyNotification } from '@/config'
import { ref } from 'vue'

const rsocket = ref<RSocket>()

export function closeSocketConnection() {
  if (rsocket.value) {
    rsocket.value.close()
  }
}

export const ARE_NOTIFICATIONS_ENABLED =
  !!import.meta.env.VITE_NOTIFICATIONS_WS_HOST ||
  (window.ENV.NOTIFICATIONS_WS_HOST && window.ENV.NOTIFICATIONS_WS_HOST !== '**NOTIFICATIONS_WS_HOST**')

export default function useRSockets() {
  const { notifications, notificationsNotRead } = useNotifications()
  const auth = useAuth()

  async function createClient(options: any) {
    const transportOptions = {
      url: `wss://${options.host}:${options.port}`,
      // In non-browser environments we need to provide a
      // factory method which can return an instance of a
      // websocket object. Browsers however, have this
      // functionality built-in.
      // wsCreator: (url) => {
      //   return new WebSocket(url);
      // }
    }

    let token = ''
    if (auth.isAuthenticated) token = await auth.getAccessToken()

    const setupOptions = {
      keepAlive: 1000000000,
      lifetime: 1000000000,
      dataMimeType: 'text/plain',
      metadataMimeType: WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.string,
      payload: {
        data: Buffer.from(''),
        metadata: encodeCompositeMetadata([
          [WellKnownMimeType.MESSAGE_RSOCKET_ROUTING, encodeRoute('connect')],
          [WellKnownMimeType.MESSAGE_RSOCKET_AUTHENTICATION, encodeBearerAuthMetadata(token)],
        ]),
      },
    }
    const connector = new RSocketConnector({
      transport: new WebsocketClientTransport(transportOptions),
      setup: setupOptions,
    })
    return await connector.connect()
  }

  async function createSocket() {
    rsocket.value = await createClient({
      host: import.meta.env.VITE_NOTIFICATIONS_WS_HOST || window.ENV.NOTIFICATIONS_WS_HOST,
      port: 443,
    })
  }

  async function runSocket() {
    if (rsocket.value) {
      await new Promise((resolve, reject) => {
        let payloadsReceived = 0
        const maxPayloads = 1000

        const requester = rsocket.value!.requestStream(
          {
            data: Buffer.from(''),
            metadata: encodeCompositeMetadata([
              [WellKnownMimeType.MESSAGE_RSOCKET_ROUTING, encodeRoute('notify')],
              // [WellKnownMimeType.MESSAGE_RSOCKET_AUTHENTICATION, encodeBearerAuthMetadata(token)],
            ]),
          },
          3,
          {
            onError: (e) => reject(e),
            onNext: (payload, isComplete) => {
              // console.info(
              //   `[client] payload[data: ${payload.data}; metadata: ${payload.metadata}]|isComplete: ${isComplete}`
              // )

              const payloadString = payload.data?.toString()
              if (payloadString) {
                const payloadNotification = JSON.parse(payloadString) as SaiftyNotification

                const notificationFound = notifications.value.find(
                  (n) => payloadNotification.messageId && n.messageId === payloadNotification.messageId
                )

                if (!notificationFound) {
                  notifications.value = [...notifications.value, payloadNotification]
                }
              }

              payloadsReceived++

              // request 5 more payloads every 5th payload, until a max total payloads received
              if (payloadsReceived % 2 == 0 && payloadsReceived < maxPayloads) {
                requester.request(2)
              } else if (payloadsReceived >= maxPayloads) {
                requester.cancel()
                setTimeout(() => {
                  resolve(null)
                })
              }

              if (isComplete) {
                resolve(null)
              }
            },
            onComplete: () => {
              resolve(null)
            },
            onExtension: () => {
              console.log('onExtension')
            },
          }
        )
      })
    } else {
      await createSocket()
      runSocket()
    }
  }

  async function notificationsRead() {
    if (rsocket.value) {
      const notificationPromises: Promise<any>[] = []

      notificationsNotRead.value.forEach((notification) => {
        notificationPromises.push(
          new Promise((resolve, reject) => {
            rsocket.value!.requestResponse(
              {
                data: Buffer.from(JSON.stringify({ messageId: notification.messageId, status: 'READ' })),
                metadata: encodeCompositeMetadata([
                  [WellKnownMimeType.MESSAGE_RSOCKET_ROUTING, encodeRoute('updateNotificationStatus')],
                ]),
              },
              {
                onComplete: () => {
                  console.log('onComplete')
                  resolve(null)
                },
                onError: (error) => {
                  console.error(error)
                  resolve(null)
                },
                onNext: (payload, isComplete) => {
                  console.log(payload)
                  console.log('onNext')
                  resolve(null)
                  /* call cancel() to stop onComplete/onError */
                },
                onExtension: () => {
                  console.log('onExtension')
                },
              }
            )
          })
        )
      })

      await Promise.all(notificationPromises)
    } else {
      await createSocket()
      notificationsRead()
    }
  }

  return {
    rsocket,
    runSocket,
    notificationsRead,
    closeSocketConnection,
  }
}
