import { fromEvent, from, merge, Observable, iif, of, EMPTY } from "rxjs";
import {
  map,
  exhaustMap,
  filter,
  buffer,
  startWith,
  mergeMap,
  takeWhile,
  catchError,
} from "rxjs/operators";
import { Action$, State$ } from ".";
import { v4 as uuid } from "uuid";
import {
  getPreviousMessagesAction,
  addPreviousMessagesAction,
  setConversationLoaded,
  getParticipantAction,
  setOnlineStatusAction,
  Action,
  GetParticipantNextAction,
  getAllPreviousMessagesAction,
} from "../types/actions";
import {
  getArchivedMessagesRequest,
  getMessageFromArchive,
  getActionFromEvent,
  getEventFromStanza,
} from "../service";
import { filterNulls, retryIfOffline } from "./helpers";
import { IService } from "../service/xmppService";
import { Message } from "../types/entities";
import { EventMessage } from "store/types/events";
import { backOff } from "exponential-backoff";

export type SetOnlineStatus = ReturnType<typeof setOnlineStatusAction>;

export const getRealTimeEventsHandler = (
  action$: Action$,
  state$: State$,
  conversationId: string,
  service: IService
): Observable<Action> =>
  fromEvent(service.xmpp, "stanza").pipe(
    filter(
      (stanza: any) =>
        stanza.attrs.from && stanza.attrs.from.startsWith(conversationId)
    ),
    map(getEventFromStanza),
    filterNulls(),
    map(getActionFromEvent),
    mergeMap((a) =>
      iif(
        () =>
          (a.type === "AddMessage" &&
            a.payload.authorId !== "system" &&
            !state$.value.participants[a.payload.authorId]) ||
          (a.type === "SetUserOnline" &&
            !state$.value.participants[a.payload.userId]),
        of(
          getParticipantAction(
            a.type === "AddMessage"
              ? a.payload.authorId
              : (a as SetOnlineStatus).payload.userId,
            conversationId,
            a as GetParticipantNextAction
          )
        ),
        of(a)
      )
    ),
    startWith(getPreviousMessagesAction(conversationId))
  );

type GetPrev =
  | ReturnType<typeof getPreviousMessagesAction>
  | ReturnType<typeof getAllPreviousMessagesAction>;

export const getPrevMessagesHandler = (
  action$: Action$,
  state$: State$,
  conversationId: string,
  service: IService
): Observable<Action> =>
  action$.pipe(
    filter(
      (a): a is GetPrev =>
        a.type === "GetPreviousMessagesAction" ||
        a.type === "GetAllPreviousMessagesAction"
    ),
    filter(
      (getPrev) =>
        getPrev.type === "GetAllPreviousMessagesAction" ||
        getPrev.payload.conversationId === conversationId
    ),
    filter((_getPrev) => {
      const convo = state$.value.conversations[conversationId];
      return convo.type === "group" || !convo.isEnded;
    }),
    map((getPrev) => {
      let message: Message;
      const convo = state$.value.conversations[conversationId]!;
      if (getPrev.payload.after) {
        const sentMessages = convo.messages.filter((m) => m.status === "sent");
        message = sentMessages[sentMessages.length - 1];
      } else {
        message = convo.messages[0];
      }
      const before =
        !message || getPrev.payload.after
          ? new Date()
          : new Date(message.createdAt);
      const after =
        getPrev.payload.after && message
          ? new Date(message.createdAt)
          : undefined;
      return {
        ...getPrev,
        queryId: uuid(),
        before,
        after,
      };
    }),
    retryIfOffline(service, undefined, "getConversationHandlers"),
    exhaustMap((getPrev) =>
      merge(
        fromEvent(service.xmpp, "stanza").pipe(
          filter(
            (stanza: any) =>
              stanza.getChild("result") &&
              stanza.getChild("result").attrs.queryid === getPrev.queryId
          ),
          map(getMessageFromArchive),
          filterNulls(),
          buffer(
            from(
              backOff(() =>
                service.xmpp.iqCaller.request(
                  getArchivedMessagesRequest(conversationId, getPrev.queryId, {
                    before: getPrev.before,
                    after: getPrev.after,
                  }),
                  60 * 1000 // Increase timeout to 60s from default of 30s
                )
              )
            ).pipe(
              catchError(() => {
                return EMPTY;
              }),
            ),
          ),
          mergeMap((events) => {
            const users: { id: string; e: EventMessage }[] = [];
            events.forEach((e) => {
              if (
                e.type === "message" &&
                !state$.value.participants[e.authorId]
              ) {
                if (
                  e.authorId !== "system" &&
                  !users.find((a) => a.id === e.authorId)
                ) {
                  users.push({ id: e.authorId, e });
                }
              }
            });
            const actions = users.map((u) =>
              getParticipantAction(u.id, u.e.conversationId)
            );
            return from([
              ...actions,
              addPreviousMessagesAction(events, conversationId),
            ]);
          })
        ),
        fromEvent(service.xmpp, "stanza").pipe(
          filter(
            (stanza: any) =>
              stanza.getChild("fin") &&
              stanza.getChild("fin").attrs.queryid === getPrev.queryId
          ),
          map((stanza: any) =>
            setConversationLoaded(
              conversationId,
              getPrev.after === undefined &&
                stanza.getChild("fin").attrs.complete === "true"
            )
          ),
          filter((a) => a.payload.loaded)
        )
      ).pipe(
        takeWhile((a) => {
          return a.type !== "AddPreviousMessages";
        }, true)
      )
    )
  );
