class SafeObserver<T> {
  destination: any;
  isUnsubscribed: boolean = false;
  unsub: any;

  constructor(destination: any) {
    this.destination = destination;
  }

  next(value: T) {
    // only try to next if you're subscribed have a handler
    if (!this.isUnsubscribed && this.destination.next) {
      try {
        this.destination.next(value);
      } catch (err) {
        // if the provided handler errors, teardown resources, then throw
        this.unsubscribe();
        throw err;
      }
    }
  }

  error(err: any) {
    // only try to emit error if you're subscribed and have a handler
    if (!this.isUnsubscribed && this.destination.error) {
      try {
        this.destination.error(err);
      } catch (e2) {
        // if the provided handler errors, teardown resources, then throw
        this.unsubscribe();
        throw e2;
      }
      this.unsubscribe();
    }
  }

  complete() {
    // only try to emit completion if you're subscribed and have a handler
    if (!this.isUnsubscribed && this.destination.complete) {
      try {
        this.destination.complete();
      } catch (err) {
        // if the provided handler errors, teardown resources, then throw
        this.unsubscribe();
        throw err;
      }
      this.unsubscribe();
    }
  }

  unsubscribe() {
    this.isUnsubscribed = true;
    if (this.unsub) {
      this.unsub();
    }
  }
}

export type Observer<T> = {
  next: (t: T) => void;
  error: (err: any) => void;
  complete: () => void;
};

export function mkObservable<T>(_subscribe: (observer: Observer<T>) => void): IObservable<T> {
  return new Observable(_subscribe)
}

export class Observable<T> implements IObservable<T> {
  protected _subscribe: any;

  constructor(_subscribe: (observer: Observer<T>) => void) {
    this._subscribe = _subscribe;
  }

  // @ts-ignore:
  pipe<D>(...fns: ((x: any) => any)[]): Observable<D> {
    return fns.reduce((state, fn) => fn(state), this) as any as Observable<D>;
  }

  subscribe(observer: Observer<T>) {
    const safeObserver = new SafeObserver<T>(observer);
    safeObserver.unsub = this._subscribe(safeObserver);
    return safeObserver.unsubscribe.bind(safeObserver);
  }
}

interface IPipe<A> {
  pipe<B>(fn1: (x: IObservable<A>) => IObservable<B>): IObservable<B>;
  pipe<B, C>(fn1: (x: IObservable<A>) => IObservable<B>, fn2: (x: IObservable<B>) => IObservable<C>): IObservable<C>
  pipe<B, C, D>(fn1: (x: IObservable<A>) => IObservable<B>, fn2: (x: IObservable<B>) => IObservable<C>, fn3: (x: IObservable<C>) => IObservable<D>): IObservable<D>
  pipe<B, C, D, E>(fn1: (x: IObservable<A>) => IObservable<B>, fn2: (x: IObservable<B>) => IObservable<C>, fn3: (x: IObservable<C>) => IObservable<D>, fn4: (x: IObservable<D>) => IObservable<E>): IObservable<E>
}

export interface IObservable<T> extends IPipe<T> {
  // constructor(_subscribe: (observer: Observer<T>) => void): IObservable<T>;
  subscribe(observer: Observer<T>): () => void;
}