import { Observable, of, OperatorFunction, race, timer, zip } from 'rxjs';
import { Actions, ofType } from '@ngrx/effects';
import { Injectable } from '@angular/core';
import { filter, map, switchMap, take, tap } from 'rxjs/operators';
import { Action, Store } from '@ngrx/store';

export type ExpectedActions = {
  success: { type: string; correlationId: string }[];
  error: { type: string; correlationId: string }[];
}[];

export type CorrelatedAction = Action & { correlationId?: string };
export type Result = 'Success' | 'Error' | 'Timeout';
export type ActionResult = { result: Result; action?: CorrelatedAction };

@Injectable({
  providedIn: 'root',
})
export class ActionAggregatorService {
  constructor(private actions$: Actions, private store: Store) {}

  aggregate(
    expected: ExpectedActions,
    callbacks: {
      onSuccess: (store: Store, actions?: (Action | undefined)[]) => void;
      onError: (store: Store, actions?: (Action | undefined)[]) => void;
      onTimeout: (store: Store) => void;
    },
    timeout: number = 120000
  ): void {
    if (expected.length === 0) {
      callbacks.onSuccess(this.store, []);
      return;
    }
    const returnObservables = expected.map(
      ({ success: successActions, error: failureActions }) => {
        return race(
          race(
            ...successActions.map((successAction) =>
              this.actions$.pipe(
                filter((a) => a.type === successAction.type),
                filter(
                  (a: CorrelatedAction) =>
                    successAction.correlationId === a.correlationId
                ),
                take(1),
                map((a) => ({ result: 'Success', action: a } as ActionResult))
              )
            )
          ),
          race(
            ...failureActions.map((failureAction) =>
              this.actions$.pipe(
                filter((a) => a.type === failureAction.type),
                filter(
                  (a: CorrelatedAction) =>
                    failureAction.correlationId === a.correlationId
                ),
                take(1),
                map((a) => ({ result: 'Error', action: a } as ActionResult))
              )
            )
          ),
          timer(timeout).pipe(
            take(1),
            map((_) => ({ result: 'Timeout' } as ActionResult))
          )
        );
      }
    );

    zip(...returnObservables)
      .pipe(take(1))
      .subscribe((results) => {
        if (results.every((result) => result.result === 'Success')) {
          callbacks.onSuccess(
            this.store,
            results.map((r) => r.action)
          );
        } else if (results.some((result) => result.result === 'Timeout')) {
          callbacks.onTimeout(this.store);
        } else if (results.some((result) => result.result === 'Error')) {
          callbacks.onError(
            this.store,
            results.filter((r) => r.result === 'Error').map((r) => r.action)
          );
        }
      });
  }

  /**
   * Same idea as aggregate() but each action fires only after its predecessor completes.
   * (actions) and (expected) must be the same length.
   * If any expectation is empty, it gets a placeholder response and the next one fires immediately.
   */
  aggregateSerially(
    actions: Action[],
    expected: ExpectedActions,
    callbacks: {
      onSuccess?: (store: Store, actions?: (Action | undefined)[]) => void;
      onError?: (store: Store, actions?: (Action | undefined)[]) => void;
      onTimeout?: (store: Store) => void;
    },
    timeout: number = 30000
  ): void {
    if (!actions.length) {
      callbacks.onSuccess?.(this.store, []);
      return;
    }
    if (actions.length !== expected.length) {
      callbacks.onError?.(this.store, []);
      return;
    }
    const reactions: Action[] = [];
    race(
      timer(timeout).pipe(
        take(1),
        map(() => 'Timeout')
      ),
      this.aggregateSeriallyInternal(reactions, actions, expected)
    )
      .pipe(take(1))
      .subscribe(
        (result) => {
          if (result === 'Timeout') {
            callbacks.onTimeout?.(this.store);
          } else if (result === 'Error') {
            callbacks.onError?.(this.store, reactions);
          } else {
            callbacks.onSuccess?.(this.store, reactions);
          }
          callbacks = {};
        },
        (error) => {
          callbacks.onError?.(this.store, reactions);
        },
        () => {
          // If our callbacks haven't been blanked, call it a timeout.
          // (but this really shouldn't happen)
          callbacks.onTimeout?.(this.store);
        }
      );
  }

  private aggregateSeriallyInternal(
    reactions: Action[],
    actions: Action[],
    expected: ExpectedActions
  ): Observable<Result> {
    if (!actions.length) {
      return of('Success');
    }
    const action = actions[0] as CorrelatedAction;
    const expect = expected[0];
    actions.splice(0, 1);
    expected.splice(0, 1);
    const outcomes = [...expect.success, ...expect.error];
    if (outcomes.length) {
      const observation = this.actions$.pipe(
        ofType(
          ...outcomes.map((a) => a.type),
        ) as OperatorFunction<Action, CorrelatedAction>,
        filter((reaction) => reaction.correlationId === action.correlationId),
        tap((reaction) => {
          reactions.push(reaction);
          if (expect.error.find((a) => a.type === reaction.type)) {
            throw new Error();
          }
        }),
        switchMap(() =>
          this.aggregateSeriallyInternal(reactions, actions, expected)
        )
      );
      this.store.dispatch(action);
      return observation;
    } else {
      this.store.dispatch(action);
      reactions.push({ type: 'No response expected' });
      return this.aggregateSeriallyInternal(reactions, actions, expected);
    }
  }
}
