import delay from 'lodash/delay';
import isFunction from 'lodash/isFunction';
import {
	BufMessage,
	BufServiceType,
	ConnectCallOptions,
	ConnectCode,
	ConnectError,
	ConnectPromiseClient,
	ConnectTransport,
	createConnectPromiseClient,
} from '../../../../bufbuild';
import { DebugBase } from '../../../../common';
import { toJs } from '../../../../helpers';
import { createRpcTransport } from '../utility';
import { StreamCancelCode } from './constants';
import {
	IStreamClient,
	IStreamClientListeners,
	IStreamClientOpts,
	IStreamClientRunRpcStreamMethodOptions,
	IStreamController,
	IStreamEndStatusData,
	IStreamError,
	IStreamErrorData,
	IStreamMethodOptions,
	RpcClientStreamProps,
	RpcServiceStreamMethod,
} from './types';
import { asyncSleep } from './utility';

/**
 * Generic RPC streaming client. It doesn't do a whole lot, but it reduces some complexity and allows us to hook in
 * global metadata and interceptors if we choose to.
 */
class StreamClient<RpcServiceType extends BufServiceType> extends DebugBase implements IStreamClient<RpcServiceType> {
	/* #region ---- Properties --------------------------------------------------------------------------------------- */

	/**
	 * Currently assigned options.
	 */
	protected override _options: IStreamClientOpts = StreamClient.defaultOptions();

	/**
	 * Buf Connect transport instance used by this client
	 */
	protected _rpcTransport: ConnectTransport;

	/**
	 * Buf Connect promise client instance used by this client
	 */
	protected _connectRpcClient: ConnectPromiseClient<RpcServiceType>;

	/**
	 * Client props
	 */
	protected _connectRpcClientStreamProps!: RpcClientStreamProps<RpcServiceType>;

	/**
	 * The base URL being used for the client.
	 */
	protected _baseUrl: string = '';

	/**
	 * The RPC protocol being used for RPC calls made with this client.
	 */
	protected _rpcProtocol: string = '';

	/* #endregion ---- Properties ------------------------------------------------------------------------------------ */

	/* #region ---- Constructor -------------------------------------------------------------------------------------- */

	constructor(rpcService: RpcServiceType, baseUrl: string, opts?: Maybe<IStreamClientOpts>) {
		super();

		if (rpcService == null) {
			throw this.makeError('RPC client service instance must be specified. eg. GameClient');
		}

		if (baseUrl === '') {
			throw this.makeError('Base url must be specified');
		}

		opts && this.setOptions(opts);

		const transportOpts = opts?.transport ?? {};

		if (transportOpts?.instance) {
			this._rpcTransport = transportOpts.instance;
		} else {
			const { protocol, transport } = createRpcTransport(baseUrl, transportOpts?.opts, transportOpts?.protocol);
			this._rpcTransport = transport;
			this._rpcProtocol = protocol;
		}

		this._connectRpcClient = this.createConnectPromiseClient(rpcService);
		this._baseUrl = baseUrl;
	}

	/* #endregion ---- Constructor ----------------------------------------------------------------------------------- */

	/* #region ---- Public ------------------------------------------------------------------------------------------- */

	/**
	 * Sets/initializes the class options.
	 *
	 * - Overrides the parent class method.
	 */
	public override setOptions(opts: IStreamClientOpts) {
		const { newOpts } = this.resolveOptions(opts);
		this._options = newOpts;
		this.onSetOptions(newOpts);
	}

	/**
	 * Gets the method keys supported by the RPC client.
	 */
	public get rpcStreamMethods(): RpcClientStreamProps<RpcServiceType> {
		return this._connectRpcClientStreamProps;
	}

	/**
	 * @returns the Buf Connect promise client instance used by this client
	 */
	public get rpcClient(): ConnectPromiseClient<RpcServiceType> {
		return this._connectRpcClient;
	}

	/**
	 * @returns The Buf Connect transport instance used by this client.
	 */
	public get rpcTransport(): ConnectTransport {
		return this._rpcTransport;
	}

	/**
	 * @returns The RPC protocol being used for RPC calls made with this client.
	 */
	public get rpcProtocol(): string {
		return this._rpcProtocol;
	}

	/**
	 * @returns The base URL being used for RPC calls made with this client.
	 */
	public get baseUrl(): string {
		return this._baseUrl;
	}

	/**
	 * @returns The currently assigned options.
	 */
	public get options(): IStreamClientOpts {
		return { ...this._options };
	}

	/**
	 * Make the specified streaming call to the RPC client and return a controller object that allows the stream
	 * to be managed.
	 *
	 * @param   rpcMethod  The streaming RPC method to call on the RPC client.
	 * @param   request    The request object to pass to the streaming RPC method.
	 * @param   listeners  Group of listeners to attach to the stream behaviors.
	 * @param   opts       Additional options used to define the stream.
	 * @returns An object that can be used to control the stream.
	 */
	public stream = <ReqT extends BufMessage<ReqT>, ResT extends BufMessage<ResT>>(
		rpcMethod: typeof this._connectRpcClientStreamProps,
		request: ReqT,
		listeners?: IStreamClientListeners<ResT>,
		opts?: Maybe<IStreamMethodOptions>
	): IStreamController => {
		const debugMethod = 'stream';

		const rpcMethodFn = this.getStreamRpcMethod<ReqT, ResT>(rpcMethod as string);

		if (rpcMethodFn == null) {
			throw this.makeError('RPC method unavailable for client', debugMethod, { rpcMethod });
		}

		const abortController = opts?.abortController ?? new AbortController();

		// TRUE if the stream has been detected as started.
		let isStarted: boolean = false;

		// TRUE if the stream has been detected as ended or cancelled.
		let isEnded: boolean = false;

		// Called when an attempt to start the stream is made (before data/status)
		const onTryStart = () => {
			this.info('Attempting to start stream', debugMethod, { rpcMethod, request, listeners, opts });
			listeners && listeners.onTryStart && listeners.onTryStart();
		};

		// Called when a stream successfully starts (ie. after it first received valid data)
		const onStart = (data?: Maybe<ResT>) => {
			// this.info('Stream started', debugMethod, { data });
			isStarted = true;
			listeners && listeners.onStart && listeners.onStart(data);
		};

		// Called when the server sends data via the stream
		const onData = (data: ResT) => {
			// this.warn('Stream received data', debugMethod, { data });
			if (!isStarted) {
				onStart(data);
			}

			listeners && listeners.onData && listeners.onData(data);
		};

		// Called when the server sends response headers
		const onHeaders = (headers: Headers) => {
			listeners && listeners.onHeaders && listeners.onHeaders(headers);
		};

		// Called when the server sends response trailers
		const onTrailers = (trailers: Headers) => {
			listeners && listeners.onTrailers && listeners.onTrailers(trailers);
		};

		// Called when stream is cancelled (via the `cancel` method or `AbortController.abort`)
		const onCancel = (code: StreamCancelCode) => {
			// this.warn('Stream cancelled', debugMethod, { code });
			listeners && listeners.onCancel && listeners.onCancel(code);
		};

		// Called once the stream has ended (ie. was cancelled, errored, or just ended normally)
		const onEnd = (props?: { status?: Maybe<IStreamEndStatusData<ResT>>; error?: Maybe<IStreamError> }) => {
			// this.warn('Stream ended', debugMethod, { ...props });
			isEnded = true;

			const status = props?.status ?? null;
			const error = props?.error ?? status?.error ?? null;

			const defaults: IStreamEndStatusData<ResT> = {
				dataReceivedCount: 0,
				isCancelled: false,
				isStarted: false,
				isEnded: false,
				isError: false,
				lastData: null,
				lastHeaders: null,
				lastTrailers: null,
				error: null,
				summary: 'Stream ended',
				cancelCode: null,
			};

			const endStatusData: IStreamEndStatusData<ResT> = {
				...defaults,
				...(status ?? {}),
				isStarted,
				isEnded,
				isError: error != null,
				error,
			};

			endStatusData.summary = this.makeStreamEndSummary(endStatusData);

			listeners && listeners.onEnd && listeners.onEnd(endStatusData);
		};

		// Called when an error has occured
		const onError = (err: IStreamError) => {
			this.error('Stream error', debugMethod, err);
			listeners && listeners.onError && listeners.onError(err);
		};

		// Called when an unexpected fatal error has occured
		const handleUnexpectedError = (e: unknown) => {
			const err = StreamClient.newStreamError(e);
			onError(err);
			!isEnded && onEnd({ error: err });
		};

		const controller: IStreamController = {
			cancel: (reason?: StreamCancelCode) => {
				// This will cause `runRpcStream` to terminate with a cancelled status.
				abortController.abort(reason ?? StreamCancelCode.CANCEL);
			},
		};

		try {
			const runStreamOpts = {
				onTryStart,
				onStart,
				onData,
				onHeaders,
				onTrailers,
				onError,
				onCancel,
				abortController,
				callOpts: opts?.callOpts,
				expectInitialPayload: opts?.expectInitialPayload,
			};

			// Run the stream
			this.runRpcStream<ReqT, ResT>(rpcMethodFn, request, runStreamOpts)
				.then((status) => onEnd({ status }))
				.catch((e: unknown) => handleUnexpectedError(e));
		} catch (e: unknown) {
			handleUnexpectedError(e);
		}

		return controller;
	};

	/**
	 * Runs an RPC stream until cancelled or an error has occurred.
	 */
	protected async runRpcStream<ReqT extends BufMessage<ReqT>, ResT>(
		rpcMethod: RpcServiceStreamMethod<ReqT, ResT>,
		request: ReqT,
		opts?: Maybe<IStreamClientRunRpcStreamMethodOptions<ResT>>
	): Promise<IStreamEndStatusData<ResT>> {
		const expectInitialPayload = opts?.expectInitialPayload ?? true;
		const sleepDelayMs: number = Math.max(opts?.sleepDelayMs ?? 0, 0);

		const callOpts: ConnectCallOptions = { ...(opts?.callOpts ?? {}) };
		const abortController: Nullable<AbortController> = opts?.abortController ?? null;
		abortController && (callOpts.signal = abortController.signal);

		// Number of data responses received
		let dataReceivedCount: number = 0;

		// Whether the stream has been cancelled
		let isCancelled: boolean = false;

		// Cancel code issued when the stream is cancelled
		let cancelCode: Nullable<StreamCancelCode> = null;

		// Whether the stream has been started
		let isStarted: boolean = false;

		// Whether the stream has been ended
		let isEnded: boolean = false;

		// Whether the stream has errored
		let isError: boolean = false;

		// Last data received
		let lastData: Nullable<ResT> = null;

		// Last response data received
		let lastHeaders: Nullable<Headers> = null;

		// Last response trailers received
		let lastTrailers: Nullable<Headers> = null;

		// Stream error received
		let error: Nullable<IStreamError> = null;

		const signal = callOpts.signal ?? null;

		callOpts.onHeader = (headers: Headers) => {
			lastHeaders = headers;
			opts?.onHeaders != null && opts?.onHeaders(headers);
		};

		callOpts.onTrailer = (trailers: Headers) => {
			lastTrailers = trailers;
			opts?.onTrailers != null && opts?.onTrailers(trailers);
		};

		const streamTimeoutMs = Math.max(this._options.streamTimeoutMs ?? 0, 0);
		let timeoutId: number = 0;
		const clearStreamTimeout = () => timeoutId > 0 && clearTimeout(timeoutId);

		// Called when the stream is started
		const onStart = (data?: Maybe<ResT>) => {
			// Clear the stream timeout timer once the stream has started
			clearStreamTimeout();

			isStarted = true;
			opts?.onStart != null && opts?.onStart(data);
		};

		try {
			// Signal that we are about to start the stream
			opts?.onTryStart != null && opts?.onTryStart();

			// Start the stream timeout timer - this will cancel the stream when it times out
			if (streamTimeoutMs > 0 && abortController != null) {
				timeoutId = delay(() => {
					abortController.abort(StreamCancelCode.TIMEOUT);
				}, streamTimeoutMs);
			}

			// Hack: Put the stream into a STARTED state if an initial stream payload is NOT expected
			// and the stream has not yet started or ended. This is to handle streams that send no initial
			// payload like the `ChatStream`.
			delay(() => {
				const isUnknownActive = !isStarted && !isEnded;
				if (isUnknownActive && !expectInitialPayload) {
					onStart();
				}
			}, 1000);

			// TODO: What to do if this await never resolves/rejects? Maybe that is what callOpts.timeoutMs is for?
			for await (const streamData of rpcMethod(request, callOpts)) {
				// Check for cancellation mid-loop and break if necessary
				isCancelled = signal?.aborted ?? false;
				if (isCancelled) {
					break;
				}

				lastData = streamData;
				dataReceivedCount++;

				// If we haven't yet signalled starting the stream, do so now
				if (!isStarted) {
					onStart(lastData);
				}

				// Signal that we have received data
				opts?.onData != null && opts?.onData(lastData, dataReceivedCount);

				// Wait a bit before continuing - if specified
				sleepDelayMs > 0 && (await asyncSleep(sleepDelayMs));
			}
		} catch (e: unknown) {
			// Resolve all errors to ConnectError
			const streamError = StreamClient.newStreamError(e);
			const connErr = streamError.connectError.err;

			// Cancellation is treated as a non-error
			if (connErr.code === ConnectCode.Canceled) {
				isCancelled = true;
			} else {
				error = streamError;
			}
		} finally {
			isEnded = true;
		}

		if (isCancelled) {
			cancelCode = (signal?.reason || StreamCancelCode.CANCEL) as StreamCancelCode;
			opts?.onCancel != null && opts?.onCancel(cancelCode);
		} else if (error != null) {
			isError = true;
			opts?.onError != null && opts?.onError(error);
		}

		const result: IStreamEndStatusData<ResT> = {
			lastData,
			lastHeaders,
			lastTrailers,
			dataReceivedCount,
			isStarted,
			isEnded,
			isCancelled,
			isError,
			error,
			cancelCode,
			summary: '',
		};

		result.summary = this.makeStreamEndSummary(result);

		opts?.onEnd != null && opts?.onEnd(result);

		return result;
	}

	/* #endregion ---- Public ---------------------------------------------------------------------------------------- */

	/* #region ---- Protected ---------------------------------------------------------------------------------------- */

	/**
	 * Resolves the options being passed in and returns the original and new options.
	 *
	 * - Overrides the parent class method.
	 */
	protected override resolveOptions(opts?: Maybe<IStreamClientOpts>) {
		const origOpts: IStreamClientOpts = {
			...StreamClient.defaultOptions(),
			...this._options,
		};

		const newOpts: IStreamClientOpts = {
			...origOpts,
			...(opts ?? {}),
		};

		return { origOpts, newOpts };
	}

	/**
	 * Dynamically gets a bound version of the named RPC stream method from the RPC client (if it exists).
	 *
	 * @param   rpcMethodName  Name of the method on the RPC client.
	 * @returns A bound version of the method, or NULL if not found.
	 */
	protected getStreamRpcMethod<ReqT extends BufMessage<ReqT>, ResT>(
		rpcMethodName: string
	): Nullable<RpcServiceStreamMethod<ReqT, ResT>> {
		const callable = this.getRpcMethodCallable(rpcMethodName);

		if (callable == null) {
			return null;
		}

		return callable as RpcServiceStreamMethod<ReqT, ResT>;
	}

	/**
	 * Dynamically gets a bound version of the named RPC method from the RPC client (if it exists).
	 *
	 * @param   rpcMethodName  Name of the method on the RPC client.
	 * @returns A bound version of the method, or NULL if not found.
	 */
	protected getRpcMethodCallable(rpcMethodName: string): Nullable<CallableFunction> {
		// @ts-ignore : Because strict mode does not like us accessing a property on a generic like this.
		const prop: Nullable<unknown> = this.rpcClient[rpcMethodName] ?? null;
		const isCallable = prop != null && isFunction(prop);
		const rpcMethod: Nullable<CallableFunction> = isCallable ? (prop as CallableFunction) : null;

		if (rpcMethod == null) {
			return null;
		}

		return rpcMethod.bind(this.rpcClient);
	}

	/**
	 * Dynamically gets a bound version of the named RPC method from the RPC client (if it exists).
	 */
	protected createConnectPromiseClient(
		rpcService: RpcServiceType,
		transport?: Maybe<ConnectTransport>
	): ConnectPromiseClient<RpcServiceType> {
		transport = transport ?? this._rpcTransport;

		return createConnectPromiseClient(rpcService, transport);
	}

	/**
	 * @returns A summary string for the stream end status.
	 */
	protected makeStreamEndSummary<ResT>(status: IStreamEndStatusData<ResT>) {
		const { isStarted, isCancelled, isError, error, cancelCode } = status;

		if (isCancelled) {
			return `Stream cancelled${cancelCode ? ` (${cancelCode})` : ''}`;
		} else if (isError) {
			return `Stream error: ${error?.message || 'Unknown error'}`;
		} else if (!isStarted) {
			return 'Stream ended with no data received';
		} else {
			return 'Stream ended';
		}
	}

	/* #endregion ---- Protected ------------------------------------------------------------------------------------- */

	/* #region ---- Static ------------------------------------------------------------------------------------------- */

	/**
	 * STATIC
	 * @returns The default options data used by this class.
	 */
	public static defaultOptions(): IStreamClientOpts {
		return {
			...DebugBase.defaultOptions(),
			transport: null,
			streamTimeoutMs: 30 * 1000,
		};
	}

	/**
	 * STATIC
	 * @returns A `StreamError` instance from the provided error.
	 */
	public static newStreamError(err: unknown): IStreamError {
		const errToJson = (val: unknown): IStreamErrorData => {
			const defaultErrorData: IStreamErrorData = {
				code: -1,
				message: 'Unknown stream error',
			};

			return { ...defaultErrorData, ...(toJs(val, { forceObject: true }) ?? {}) };
		};

		const errData: IStreamErrorData = errToJson(err);
		const rawError: Error = err instanceof Error ? err : new Error(JSON.stringify(errData));
		const connErr = err instanceof ConnectError ? err : ConnectError.from(err);
		const connErrData: IStreamErrorData = errToJson(connErr);

		return {
			rawError: { err: rawError, js: errData },
			connectError: { err: connErr, js: connErrData },
			code: connErr.code,
			message: connErr.message,
		};
	}

	/* #endregion ---- Static ---------------------------------------------------------------------------------------- */

	/* #region ---- Debug -------------------------------------------------------------------------------------------- */

	/**
	 * Overrides the parent class property.
	 *
	 * @returns The label to use when debugging.
	 */
	protected override get debugClassLabel(): string {
		return StreamClient.debugClassLabel();
	}

	/**
	 * STATIC
	 * @returns Label assigned to this class namespace.
	 */
	protected static debugClassLabel(): string {
		return `RpcLib.Core.StreamClient`;
	}

	/* #endregion ---- Debug ----------------------------------------------------------------------------------------- */
}

// ---- Exports -------------------------------------------------------------------------------------------------------

export { StreamClient as default };
export { StreamClient };
