import {BehaviorSubject, Observable, Subject} from 'rxjs';
import {filter, first, map} from 'rxjs/operators';
import {setWsHeartbeat} from 'ws-heartbeat/client';

export interface Message {
  id?: number;
  method: string;
  result?: any;
  params?: any;
  error?: any;
}

export class WebsocketService {

  private static id: number = 1;

  private uriGenerator: null | (() => string) = null;
  private c: null | WebSocket = null;

  /**
   *
   */
  private connectionStatus$: BehaviorSubject<null | boolean> = new BehaviorSubject<null | boolean>(null);
  private message$: Subject<Message> = new Subject<Message>();

  constructor() {
    console.log('[WebsocketService] construct');
  }

  /**
   * Connect to the satellite host system.
   */
  connect(uriGen: () => string, token: string): void {
    console.log('[WebsocketService] connect to: ' + uriGen());
    this.uriGenerator = uriGen;

    // Try to close the connection.
    if (this.c) {
      try {
        this.c.close();
      } catch (_) {
        // Ignore any errors
      }

      this.c = null;
    }

    if (!this.uriGenerator) {
      throw new Error('unable to connect, uri generator not set');
    }

    this.c = new WebSocket(this.uriGenerator(), ['access_token', token]);

    this.c.onopen = this.onOpen.bind(this);
    this.c.onclose = this.onClosed.bind(this);
    this.c.onmessage = this.onMessage.bind(this);
    this.c.onerror = this.onError.bind(this);
  }

  destroy(): void {
    console.log('[WebsocketService] destroy');

    if (this.connectionStatus$) {
      this.connectionStatus$.complete();
    }

    if (this.message$) {
      this.message$.complete();
    }
  }

  public isConnected(): boolean {
    return this.connectionStatus$.getValue() || false;
  }

  public afterConnectionChanged(): Observable<null | boolean> {
    return this.connectionStatus$.asObservable();
  }

  public pub(wsm: Message): boolean {
    if (!this.c) {
      console.error(`unable to publish message ${wsm.method} - no connection available`);
      return false;
    }

    if (this.c.readyState === this.c.OPEN) {
      // console.log('WS >> ' + wsm.method);
      this.c.send(JSON.stringify(wsm));
      return true;
    } else {
      console.error(`unable to publish message ${wsm.method} - still connecting`);
      return false;
    }
  }

  private onOpen(evt: Event): void {
    if (!this.c) {
      console.error('unable to handle opened connection - connection not set');
      return;
    }

    setWsHeartbeat(this.c, '{"method": "heartbeat"}', {
      pingTimeout: 5000,
      pingInterval: 3000,
    });

    this.connectionStatus$.next(true);
  }

  private onClosed(evt: CloseEvent): void {
    this.connectionStatus$.next(false);
  }

  private onMessage(evt: MessageEvent): void {
    const msg: Message = JSON.parse(evt.data);
    this.message$.next(msg);
  }

  public afterConnectionClosed(): Observable<any> {
    return this.connectionStatus$.pipe(filter(c => (c === false)));
  }

  public afterConnectionOpened(): Observable<any> {
    return this.connectionStatus$.pipe(filter(c => (c === true)));
  }

  public sub(s: string = ''): Observable<any> {
    return this.message$.pipe(
      filter(wsm => {
        if (!wsm.method) {
          return false;
        }
        return wsm.method.startsWith(s);
      }),
      map(resp => resp.params),
    );
  }

  private onError(evt: Event): void {
    console.error(evt);
  }

  public rpc(out: Message): Promise<any> {
    out.id = WebsocketService.id++;

    const prom = this.message$.pipe(
      filter((wsmIn) => {
        return wsmIn.id === out.id;
      }),
      first(),
      map(wsm => {
        if (wsm.error !== undefined) {
          throw new Error(wsm.error);
        }
        return wsm.result;
      }),
    ).toPromise();

    if (this.pub(out)) {
      return prom;
    }

    return Promise.reject('unable to send rpc request');
  }
}
