import {CollectionViewer, DataSource} from '@angular/cdk/collections';
import {BehaviorSubject, EMPTY, forkJoin, Observable, ObservedValueOf} from 'rxjs';
import {catchError, finalize} from 'rxjs/operators';
import {NotificationsService} from '../notifications.service';
import {IPushNotification, NotificationsFilter} from '../notifications.models';

export class NotificationsDatasource implements DataSource<IPushNotification> {
  private notificationsSubject: BehaviorSubject<IPushNotification[]> = new BehaviorSubject<IPushNotification[]>([]);
  public notifications$: Observable<IPushNotification[]> = this.notificationsSubject.asObservable();
  private loadingSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
  public loading$: Observable<boolean> = this.loadingSubject.asObservable();
  private lengthSubject: BehaviorSubject<number> = new BehaviorSubject(0);
  public length$: Observable<number> = this.lengthSubject.asObservable();

  constructor(private notificationsService: NotificationsService) {
  }

  connect(collectionViewer: CollectionViewer): Observable<IPushNotification[]> {
    return this.notificationsSubject.asObservable();
  }

  disconnect(collectionViewer: CollectionViewer): void {
    this.notificationsSubject.complete();
    this.loadingSubject.complete();
    this.lengthSubject.complete();
  }

  loadNotifications(filter: NotificationsFilter,
                    sortProperties: string = 'createdAtTimeMillis',
                    sortDirection: string = 'desc',
                    pageNumber: number = 0,
                    pageSize: number = 10): void {
    this.loadingSubject.next(true);
    forkJoin({
      page: this.notificationsService.getNotificationsPage(
        filter,
        sortProperties, sortDirection,
        pageNumber, pageSize),
      count: this.notificationsService.getNotificationsCount(
        filter)
    }).pipe(
      catchError(() => EMPTY),
      finalize(() => this.loadingSubject.next(false))
    ).subscribe((results: {
      page: ObservedValueOf<Observable<IPushNotification[]>>
      count: ObservedValueOf<Observable<number>>;
    }) => {
      this.notificationsSubject.next(results.page);
      this.lengthSubject.next(results.count);
    });
  }

  clear(): void {
    this.notificationsSubject.next([]);
    this.lengthSubject.next(0);
  }
}
