import { Injectable } from '@angular/core';
import { Observable, BehaviorSubject } from 'rxjs';
import { filter } from 'rxjs/operators';

import { Paho } from 'ng2-mqtt/mqttws31';

import { environment } from '../../../environments/environment';

import { Arrived } from './models/arrived';
import { MqttPublishService } from './mqtt-publish.service';
import { MqttService } from './mqtt.service';

import { MqttResponse } from './models/mqtt-response';
import { MqttResultType } from './types/mqtt-result-type';

import { SharedSettingService } from '../shared-setting.service';

@Injectable({
  providedIn: 'root'
})
export class ArrivedMqttService {
  private cache: BehaviorSubject<MqttResponse<Arrived> | null> = new BehaviorSubject<MqttResponse<Arrived> | null>(null);

  private publishClient: MqttService;
  private subscribeClient: MqttService;

  private topic: string;

  constructor(
    private mqttPublishService: MqttPublishService,
    private sharedSettingService: SharedSettingService
  ) { }

  /**
   * 開始します.
   */
  async start() {
    this.topic = this.sharedSettingService.getVehicle().vin + environment.setting.arrivedTopic;
    this.publishClient = this.mqttPublishService.getMqttClient();

    if (this.sharedSettingService.getServicePCFunctionEnabled()) {
      this.subscribeClient = new MqttService();
      this.subscribeClient.setSubscribe(this.topic, this.onMessageArrived);
      this.subscribeClient.setConnection();
    }
  }

  /**
   * 停止します.
   */
  async stop() {
    this.publishClient = null;

    if (this.subscribeClient) {
      this.subscribeClient.disconnect();
      this.subscribeClient = null;
    }
  }

  /**
   * トピックを取得します.
   */
  getTopic(): string {
    return this.topic;
  }

  /**
   * メッセージを送信します.
   * 
   * @param message 送信メッセージ
   * @returns 送信結果
   */
  publishMessage(message: Arrived): boolean {
    if (this.publishClient?.checkConnect()) {
      this.publishClient.sendTopic(JSON.stringify(message), this.topic);

      return true;
    }

    return false;
  }

  /**
   * コネクションの状態を確認し、接続が切れている場合、再接続します.
   * 
   * @returns 接続状態
   */
  checkAndRetryConnect(): boolean {
    if (!this.sharedSettingService.getServicePCFunctionEnabled()) {
      // サービスPC機能がOFFの場合、subscribeClientは再接続しない
      return true;
    }

    if (!this.subscribeClient) {
      return false;
    }

    if (this.subscribeClient.checkConnect()) {
      return true;
    }

    this.subscribeClient.setConnection();

    return false;
  }

  /**
   * コールバック.
   */
  onMessageArrived = (message: Paho.MQTT.Message) => {
    try {
      const messageObject = JSON.parse(message.payloadString) as Arrived;

      this.cache.next({ result: MqttResultType.SUCCESS, message: messageObject });
    } catch (e) {
      console.error(e);

      this.cache.next({ result: MqttResultType.FAIL, message: null });
    }
  }

  /**
   * Observable を取得します.
   */
  message(): Observable<MqttResponse<Arrived>> {
    return this.cache.asObservable().pipe(filter((response) => response !== null));
  }
}
