import { Observable, GroupedObservable } from "rxjs";
import { filter, groupBy, mergeMap, map, bufferTime } from "rxjs/operators";
import { Dependencies } from ".";
import { Action } from "store/types/actions";
import { XmppClient } from "@xmpp/client";
import { backOff } from "exponential-backoff";

export function inputIsNotNull<T>(input: null | undefined | T): input is T {
  return input != null;
}

export function filterNulls<T>() {
  return (source$: Observable<null | undefined | T>) =>
    source$.pipe(filter(inputIsNotNull));
}

export const getPartnerId = (
  myId: string,
  listenerId: string,
  memberId: string
) => (myId === listenerId ? memberId : listenerId);

export const retryIfOffline = <T>(
  service: Dependencies["service"],
  additionalCheck: (action: T) => boolean = () => true,
  debugName: string = "Unknown"
) => (action$: Observable<T>): Observable<T> =>
  new Observable((observer) => {
    return action$.subscribe((x) => {
      const retry = () => {
        setTimeout(check, 500);
      };
      const check = () => {
        if (additionalCheck(x) && service.xmpp.status === "online") {
          observer.next(x);
        } else {
          retry();
        }
      };
      check();
    });
  });

export const batchActions = <T extends Action, A>(
  type: string,
  batchAction: (item: A) => Action,
  mapper: (items: T[]) => A,
  batchTime: number = 1000
) => (action$: Observable<Action>): Observable<Action> =>
  action$.pipe(
    groupBy((a) => a.type),
    mergeMap((group$) => {
      if (group$.key === type) {
        return (group$ as GroupedObservable<typeof type, T>).pipe(
          bufferTime(batchTime),
          filter((items) => items.length > 0),
          map((items) => batchAction(mapper(items)))
        );
      } else {
        return group$;
      }
    })
  );

export const xmppSend = (xmpp: XmppClient, stanza: string) =>
  backOff(
    async () => {
      return xmpp.send(stanza);
    },
    { jitter: "full", numOfAttempts: 6 }
  );
