import { Injectable } from '@angular/core';
import { Observable, BehaviorSubject, Subject, Subscription, interval } from 'rxjs';
import { filter, tap } from 'rxjs/operators';

import { environment } from '../../../environments/environment';

import { MqttPublishService } from './mqtt-publish.service';
import { ArrivedMqttService } from './arrived-mqtt.service';
import { RouteUpdateAcceptedMqttService } from './route-update-accepted-mqtt.service';
import { RouteUpdateMqttService } from './route-update-mqtt.service';
import { TelemetryDriverTabletMqttService } from './telemetry-driver-tablet-mqtt.service';
import { TelemetryUpdateMqttService } from './telemetry-update-mqtt.service';

@Injectable({
  providedIn: 'root'
})
export class MqttConnectionService {
  private connectionStatusSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);

  private intervalSubject: Subject<number> = new Subject<number>();

  private subscriptions = new Subscription();

  constructor(
    private mqttPublishService: MqttPublishService,
    private arrivedMqttService: ArrivedMqttService,
    private routeUpdateAcceptedMqttService: RouteUpdateAcceptedMqttService,
    private routeUpdateMqttService: RouteUpdateMqttService,
    private telemetryDriverTabletMqttService: TelemetryDriverTabletMqttService,
    private telemetryUpdateMqttService: TelemetryUpdateMqttService
  ) { }

  /**
   * 開始します.
   */
  async start() {
    await this.mqttPublishService.start();
    await this.arrivedMqttService.start();
    await this.routeUpdateAcceptedMqttService.start();
    await this.routeUpdateMqttService.start();
    await this.telemetryDriverTabletMqttService.start();
    await this.telemetryUpdateMqttService.start();

    let processing = false;

    this.subscriptions.add(interval(environment.setting.interval).pipe(
      filter(() => !processing),
      tap(() => processing = true),
      tap(() => this.checkConnect()),
      tap((i) => this.intervalSubject.next(i)),
      tap(() => processing = false)
    ).subscribe());
  }

  /**
   * 停止します.
   */
  async stop() {
    this.subscriptions.unsubscribe();

    await this.mqttPublishService.stop();
    await this.arrivedMqttService.stop();
    await this.routeUpdateAcceptedMqttService.stop();
    await this.routeUpdateMqttService.stop();
    await this.telemetryDriverTabletMqttService.stop();
    await this.telemetryUpdateMqttService.stop();
  }

  /**
   * コネクションの状態を確認し、接続が切れている場合、再接続します.
   */
  checkConnect() {
    let status = true;

    if (!this.mqttPublishService.checkAndRetryConnect()) {
      status = false;
    }

    if (!this.arrivedMqttService.checkAndRetryConnect()) {
      status = false;
    }

    if (!this.routeUpdateAcceptedMqttService.checkAndRetryConnect()) {
      status = false;
    }

    if (!this.routeUpdateMqttService.checkAndRetryConnect()) {
      status = false;
    }

    if (!this.telemetryDriverTabletMqttService.checkAndRetryConnect()) {
      status = false;
    }

    if (!this.telemetryUpdateMqttService.checkAndRetryConnect()) {
      status = false;
    }

    this.setConnectionStatus(status);
  }

  /**
   * 接続状態の Observable を取得します.
   */
  connectionStatus(): Observable<boolean> {
    return this.connectionStatusSubject.asObservable();
  }

  /**
   * インターバルの Observable を取得します.
   */
  interval(): Observable<number> {
    return this.intervalSubject.asObservable();
  }

  /**
   * 接続状態を更新します.
   * 前回と同じ場合、更新しません.
   *
   * @param connectionStatus 接続状態
   */
  private setConnectionStatus(connectionStatus: boolean) {
    if (this.connectionStatusSubject.value !== connectionStatus) {
      this.connectionStatusSubject.next(connectionStatus);
    }
  }
}
