import { Injectable, OnDestroy } from '@angular/core';
import { environment } from '../../../environments/environment';
import { ProfileService } from '../user/profile/profile.service';
import * as SockJS from 'sockjs-client';
import { CompatClient, Stomp, StompSubscription } from '@stomp/stompjs';
import { BehaviorSubject, Observable, Subscriber } from 'rxjs';
import { EventBusNames, EventBusService } from '../event-bus/event-bus.service';
import { WebSocketHeaders } from '../../models/web-socket-headers';

@Injectable({
  providedIn: 'root'
})
export class WebsocketService implements OnDestroy {
  ws: CompatClient | null = null;
  connectionStatus = new BehaviorSubject<any>(null);

  constructor(
    private profileService: ProfileService,
    private eventBusService: EventBusService
  ) {
    this.eventBusService.on(EventBusNames.profile, (p: any) => {
      this.connect();
    });
  }

  ngOnDestroy(): void {
    this.disconnect();
  }

  connect(): BehaviorSubject<any> {
    const connectionStatus = new BehaviorSubject<any>(null);
    const socket = new SockJS(environment.wsUrl);
    this.ws = Stomp.over(() => socket);

    if (environment.production) {
      this.ws.debug = () => {};
    }

    this.ws.connect(
      this.getHeaders(),
      (frame: any) => {
        this.connectionStatus.next(true);
        this.connectionStatus.complete();
      },
      (error: string) => {
        console.error(error);
        this.connectionStatus.next(false);
      }
    );

    return connectionStatus;
  }

  private getHeaders(): WebSocketHeaders {
    return new WebSocketHeaders(
      this.profileService.user?.identity ?? '',
      '2',
      'user',
      localStorage.getItem('token') ?? ''
    );
  }

  disconnect(): void {
    if (this.ws && this.ws.connected) {
      this.ws.disconnect();
      console.debug('WebSocket connection closed');
    }
  }

  subscribeToTopic(
    destination: string,
    callback: (message: string) => void
  ): Observable<StompSubscription> {
    return new Observable<StompSubscription>(
      (subscriber: Subscriber<StompSubscription>) => {
        const subscribeLogic = (): void => {
          let subscription = this._makeSubscription(destination, callback);
          if (subscription) {
            subscriber.next(subscription);
          } else {
            const intervalId = setInterval(() => {
              subscription = this._makeSubscription(destination, callback);
              if (subscription) {
                clearInterval(intervalId);
                subscriber.next(subscription);
              }
            }, 500);
          }
        };

        if (this.connectionStatus.isStopped) {
          // socket is already connected
          subscribeLogic();
        } else {
          // just waiting until socket will be connected
          this.connectionStatus.subscribe((val: boolean | null) => {
            if (val) {
              subscribeLogic();
            }
          });
        }
      }
    );
  }

  private _makeSubscription(
    destination: string,
    callback: (message: string) => void
  ): StompSubscription | null {
    if (!this.ws || !this.ws.connected) {
      console.error('Could not make subscription: websocket is not connected');
      return null;
    }

    const stompMessageCallback = (message: any) => {
      if (message.body) {
        callback(message.body);
      } else {
        console.warn('Received message with empty body from websocket');
      }
    };

    return this.ws.subscribe(destination, stompMessageCallback);
  }
}
