import { HttpClient, HttpHeaders, HttpResponse } from '@angular/common/http';
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable, of, Subject } from 'rxjs';
import { filter, finalize, map, mergeMap, take, takeUntil, tap } from 'rxjs/operators';
import { ACCESS_TOKEN_KEY } from 'src/app/core/authentication/authentication.service';
import { ACCESS_MODE_KEY, AccessMode, SHARED_TOKEN_KEY } from 'src/app/core/authentication/share-token';

// The Queued Http Request is used to allow the tile service to subscribe to the request
// without directly triggering the http get.  It also allows us to subscribe to this as
// part of the throttling mechanism again without impacting the http get subscription
// It's important to not subscribe to the http get more than once, and for the unsubscribe
// of the http get to be available for cancelling the request.

class QueuedHttpRequest<T> {
    public output: Observable<HttpResponse<T>>;
    public cancelled = false;
    public cancel = new Subject<void>();

    private input = new Subject<HttpResponse<T>>();

    constructor(private http: HttpClient, public url: string) {
        this.output = this.createHttpResultProxy();
    }

    public next(response: HttpResponse<T>): void {
        this.input.next(response);
    }

    public error(err?: any): void {
        this.input.error(err);
    }

    public complete(): void {
        this.input.complete();
    }

    // create an observable the tile service can subscribe to without triggering the get
    private createHttpResultProxy(): Observable<HttpResponse<T>> {
        const observable = new Observable<HttpResponse<T>>(obs => {
            const sub = this.input.subscribe(obs);

            return {
                unsubscribe: () => {
                    sub.unsubscribe();
                    this.cancelled = true;
                    this.cancel.next(null);
                }
            };
        });

        return observable;
    }

    public httpGet(): Observable<HttpResponse<T>> {
        if (this.cancelled) {
            return of(null);
        }

        const httpGet = this.createRequest();

        return httpGet.pipe(takeUntil(this.cancel), tap(this.input));
    }

    private createRequest(): Observable<HttpResponse<T>> {
        let headers = new HttpHeaders({ Accept: 'application/json' });

        const accessMode = sessionStorage.getItem(ACCESS_MODE_KEY);

        headers = headers.append('NoIntercept', 'true');

        if (accessMode && (accessMode === AccessMode.SHARED || accessMode === AccessMode.BEARER)) {
            headers = headers.append('Authorization', 'Bearer ' + sessionStorage.getItem(SHARED_TOKEN_KEY));
            headers = headers.append('X-SessionMode', sessionStorage.getItem(ACCESS_MODE_KEY));
            headers = headers.append('X-SharedToken', sessionStorage.getItem(SHARED_TOKEN_KEY));
        } else {
            const token = sessionStorage.getItem(ACCESS_TOKEN_KEY);
            if (token) {
                headers = headers.append('Authorization', 'Bearer ' + token);
            }
        }

        const httpRequest = this.http.get<T>(this.url, {
            headers: headers,
            observe: 'response',
            withCredentials: true,
            responseType: 'json'
        });

        return httpRequest;
    }
}

@Injectable({
    providedIn: 'root'
})
export class ThrottledHttpService {
    private requestStream$ = new Subject<QueuedHttpRequest<any>>();

    constructor(private http: HttpClient) {
        this.startRequestHandling();
    }

    private startRequestHandling(): void {
        const concurrencyLimit = 100;
        let tokens = concurrencyLimit;

        const tokenChanged = new BehaviorSubject(tokens);
        const consumeToken = () => tokenChanged.next(--tokens);
        const renewToken = () => tokenChanged.next(++tokens);

        // tokenChanged.subscribe(tokens => console.log(`${Date.now()}: available tokens = ${tokens}`));

        const availableTokens = tokenChanged.pipe(filter(() => tokens > 0));

        // if we've zoomed again, then previous requests will get cancelled
        const openRequests = this.requestStream$.pipe(filter(request => !request.cancelled));

        openRequests
            .pipe(
                mergeMap(request =>
                    availableTokens.pipe(
                        take(1),
                        map(() => {
                            consumeToken(); // each request needs a token.
                            return request
                                .httpGet()
                                .pipe(finalize(renewToken)) // return the token when the request response/completes/errors
                                .subscribe(); // triggers the HTTP GET
                        })
                    )
                )
            )
            .subscribe();
    }

    public queueRequest<T>(url: string): Observable<HttpResponse<T>> {
        const request = new QueuedHttpRequest<T>(this.http, url);
        this.requestStream$.next(request);
        return request.output;
    }
}
