import { ApolloLink } from '@apollo/client';
import type {
    ApolloClient,
    FetchResult,
    NextLink,
    NormalizedCacheObject,
    Operation,
} from '@apollo/client';
import { errorThatWillCauseAlert, trace } from 'owa-trace';
import type { TraceErrorObject } from 'owa-trace';
import type {
    QueuedOperation,
    QueuedAction,
    QueuedActionV0,
    QueuedActionV1,
} from '../types/QueuedAction';
import { getQueuedActionDb, QueuedActionId } from '../database/queuedActionDb';
import { isResultAccepted, processResult } from '../types/QueuedActionResultProcessor';
import { isLeader } from 'owa-offline-worker-leader';
import { toBasicContext, fromBasicContext } from 'owa-data-worker-utils';
import { createDependencyMap } from './dependencyMap';
import { serializeError, serializeResultErrors } from 'owa-offline-errors';
import { subscribeToNetworkChange } from 'owa-offline/lib/subscribeToNetworkChange';
import { updateIds } from './idUpdater';
import type {
    RejectedQueuedResult,
    AcceptedQueuedResult,
    DataWorkerBroadcast,
    QueuedActionResult,
} from 'owa-data-worker-utils';
import type { QueuedActionContext } from '../types/QueuedActionContext';
import { getTime } from '../util/getTime';
import { logUsage } from 'owa-analytics';
import { type ResolverContext } from 'owa-graph-schema';
import { emitSyncEvent, type ActionQueueSummary } from 'owa-offline-sync-diagnostics';
import { type LogData } from 'owa-logging-utils';
import { getThrottleState, updateRejectionLog } from './rejectionLog';
import { reportQueueStatus, ReportType } from './queueStatusReport';
import { isAlertableQueueError } from '../util/isAlertableQueueError';
import { trackQueueDbHealth } from '../util/trackQueueHealth';
import { NO_ID_CHANGES } from '../resultProcessors/defaultResultProcessor';
import { ORPHANED_ACTION_THRESHOLD_MS } from '../types/Constants';
import { isClientId } from 'owa-identifiers';
import { removeTombstones } from 'owa-offline-tombstone';
import { getDatabase } from 'owa-offline-database';
import {
    type QueuedActionEdit,
    type QueuedActionUpdate,
} from '../types/QueuedActionSubmitProcessor';
import { processSubmit } from '../submitProcessors/processSubmit';
import { assertNever } from 'owa-assert';
import { type SubmitProcessor } from '../types/QueuedActionOptions';
import { findAllSerializationErrors } from 'owa-worker-wrapper';

// the 'rejected' callback is used at this time as test hook -- there's currently no known
// scenario where the UX needs to know an action's result is 'rejected' by the queue.  rejected
// doesn't mean error -- it means the queue considers the action to still be pending in the queue
export type ActionQueueCallbacks = Pick<
    DataWorkerBroadcast,
    'actionQueueChanged' | 'queuedActionAccepted'
> & {
    queuedActionRejected?: (
        result: RejectedQueuedResult,
        op: Pick<Operation, 'extensions' | 'operationName' | 'query' | 'variables'>
    ) => Promise<void>;
    updateRequestSettings: () => Promise<void>;
    updateDiagnosticPanel: (getSummary: () => ActionQueueSummary) => void;
};

let isQueueStarted = false;
let queueCallbacks: ActionQueueCallbacks;
let executeLink: ApolloLink;
let contextClient: ApolloClient<NormalizedCacheObject>;

// the actions currently running.  these actions remain in the queue until their
// results are 'accepted' by their response processor
type RunningAction = QueuedAction & {
    timeStarted: number;
};
const runningActions: Map<number, RunningAction> = new Map();

// map of queue dependencies, tracking blocking actions, replaceable actions, and cancelling actions
const queueDependencies = createDependencyMap();

// in memory queue of actions that need to be played against the server
let queue: QueuedAction[] = [];

// list of actions that need to have their submitProcessor executed
let submitProcessorQueue: QueuedAction[] = [];

// at some point, the BE services are going to fall over if we flood them with too
// many simultaneous actions.  it's debatable what the "correct" throttle value is, but
// we need *something*.
const maxRunningActions = 5;

// the queue should execute when actions are submitted and completed, but as a backup to that
// run a sweeper thread to see if the queue can be nudged along
const RunQueueBackupIntervalMs = 30000;

// actions are queued with an auto-incremented db id.  maxActionIdLoadedSoFar is the largest one we know about
// so far, so when reading new actions from the db only read actions with greater ids.  since we never purge actions
// from memory until they complete we should never have to re-read actions with a lower id than this
let maxActionIdLoadedSoFar = -1;

// actions are dispatched in a circular queue fashion.  lastActionIdVisited is the last action we tried to run.
let lastActionIdVisited = -1;

// recent id remappings.  some actions cause id values to change, like createitem which changes and items
// id from a local id to a server id.  the queue will remap those ids in memory so that future actions
// use the new id, but it's possible the UX will dispatch a new action using the old id before the replacement happens
// we keep the replacements around for a while to handle that race.
let recentIdRemappings: Map<string, string>[] = [new Map(), new Map()];

let unsubscribeFromNetworkChange: () => void | undefined;

export async function startQueue(
    link: ApolloLink,
    callbacks: ActionQueueCallbacks,
    client: ApolloClient<NormalizedCacheObject>
) {
    isQueueStarted = true;
    executeLink = link;
    queueCallbacks = callbacks;
    contextClient = client;

    unsubscribeFromNetworkChange = subscribeToNetworkChange(handleNetworkChange);

    // set up a ping process for running the queue periodically even if the online event doesn't trigger.
    // use this opportunity to perform orphan detection, to handle cases where online actions never return and
    // could stall the queue.
    self.setInterval(async () => {
        trace.info('action queue interval sweeper', 'actionQueue');
        await queueCallbacks.updateRequestSettings();
        checkObsoleteLeaderStatus();
        await runQueue();
        detectOrphanedActions();
        ageOutRecentIdRemappings();
        reportQueueStatus(queue, ReportType.Periodic, 0);
    }, RunQueueBackupIntervalMs);

    // just in case we queued things before starting, signal once that this queue started to start
    // consuming queued actions
    return signalQueueChanged();
}

export async function enqueue(operation: Operation, onlineFallback: NextLink) {
    // Operation type isn't serializable because the context is behind a getContext method
    let simpleOp: QueuedOperation = {
        extensions: operation.extensions,
        operationName: operation.operationName,
        query: operation.query,
        variables: operation.variables,
        context: toBasicContext(operation.getContext()) as unknown as QueuedActionContext,
    };

    // so, make a copy that is serializable and also because we don't want to share in memory
    // state with the operation as it follows the 'offline' resolver execution path
    // We use structuredClone rather than JSON.parse(JSON.stringify()) because it can handle objects like Blob
    // that are sometimes used in variables
    try {
        simpleOp = self.structuredClone(simpleOp);
    } catch (error) {
        // Fall back to using JSON.stringify to try to avoid losing the action, but also log an error since we might
        // have flattened an object which will cause the action to fail
        const issues = findAllSerializationErrors(simpleOp);
        if (issues.length > 0) {
            (error as TraceErrorObject).additionalInfo = { serialziationIssue: { ...issues[0] } };
        }
        errorThatWillCauseAlert('ActionQueueCloneError', error);

        simpleOp = JSON.parse(JSON.stringify(simpleOp));
    }
    const blockingKeys = Array.from(simpleOp.context.queuedAction.blockingKeys).filter(bk => !!bk);

    const submitProcessorName = simpleOp.context.queuedAction.submitProcessor?.name;
    const submitProcessorKey = simpleOp.context.queuedAction.submitProcessor?.key || null;

    // force backwards compatibility.  generally speaking we should try really hard to avoid changing
    // action schemas in a way that would be not backwards compatible to old scripts, but this pattern
    // should help (?) try to encourage that and, if it is necessary, to enforce that things queued
    // to disk that can be processed by earlier script versions
    const queueableAction: Omit<QueuedAction & QueuedActionV0 & QueuedActionV1, 'id'> = {
        version: 2,
        operation: simpleOp,
        opName: simpleOp.operationName,
        blockingKeys,
        resultProccesor: simpleOp.context.queuedAction.resultProcessor,
        attempts: 0,
        cachePolicy: simpleOp.context.queuedAction.cachePolicy,
        uuid: simpleOp.context.queuedAction.uuid,
        lastAttempt: 0,
        rejectionLog: { records: {} },
        submitProcessor: simpleOp.context.queuedAction.submitProcessor,

        // v1 compatibility
        replacesKey: submitProcessorName === 'Replace' ? submitProcessorKey : null,
        cancelKey: submitProcessorName === 'Cancel' ? submitProcessorKey : null,

        // v0 compatibility
        depends: blockingKeys,
    };

    // persisting the action gives us the auto-incremented id and allows us to rehydrate the
    // in memory queue across app cycle boundaries
    try {
        const queuedActionDb = await getQueuedActionDb();
        const id = await trackQueueDbHealth(() => queuedActionDb.actions.put(queueableAction));

        trace.info(`queued action ${queueableAction.uuid} ${id}`, 'actionQueue');
        logActionEvent('queued action', queueableAction);
    } catch (e) {
        // something catastrophically wrong with the db.  instead of dropping the online
        // part of the action, run it online with best effort
        if (isAlertableQueueError(e)) {
            /* eslint-disable-next-line owa-custom-rules/no-error-dynamic-event-names -- (https://aka.ms/OWALintWiki)
             * The error name (message) must be a string literal (no variables in it).
             *	> Error names can only be a string literals. Use the diagnosticInfo to add custom data. */
            errorThatWillCauseAlert(e);
        }

        executeOnlineFallback(queueableAction, ApolloLink.from([onlineFallback]));
    } finally {
        // let other tabs (and ourself) know the queue changed because something got added to it
        return signalQueueChanged().catch(e => {
            if (isAlertableQueueError(e)) {
                /* eslint-disable-next-line owa-custom-rules/no-error-dynamic-event-names -- (https://aka.ms/OWALintWiki)
                 * The error name (message) must be a string literal (no variables in it).
                 *	> Error names can only be a string literals. Use the diagnosticInfo to add custom data. */
                errorThatWillCauseAlert(e);
            }
        });
    }
}

// local queue changed.  tell other tabs and handle it here, too.
function signalQueueChanged() {
    if (queueCallbacks) {
        queueCallbacks.actionQueueChanged();
    }

    return handleQueueChanged();
}

// when another tab queues an action to the database, it uses the broadcast channel to tell other tabs.
// this tabs broadcast handler for that event calls this function
export function handleQueueChanged() {
    return runQueue();
}

async function runQueue(): Promise<void> {
    if (!isQueueRunnable()) {
        checkObsoleteLeaderStatus();
    } else {
        try {
            // load any new items from disk
            const beforeLength = queue.length;
            await loadQueue();
            const afterLength = queue.length;

            // find the next action to start dispatching from given where the queue left off last time.
            const lastActionIdx = queue.findIndex(a => a.id > lastActionIdVisited);

            // last action might have been at the highest index, which rolls index around to 0
            const startIdx = lastActionIdx === -1 ? 0 : lastActionIdx;
            const total = queue.length;
            let visited = 0;

            for (
                let i = startIdx;
                visited < total && runningActions.size < maxRunningActions;
                i = (i + 1) % total, ++visited
            ) {
                const action = queue[i];
                lastActionIdVisited = action.id;
                tryRunQueuedAction(action);
            }

            queueCallbacks?.updateDiagnosticPanel(getActionQueueSummary);

            // report the queue status to telemetry
            if (afterLength > beforeLength) {
                reportQueueStatus(queue, ReportType.Waxing, afterLength - beforeLength);
            }
        } catch (e) {
            if (isAlertableQueueError(e)) {
                throw e;
            }
        }
    }
}

function tryRunQueuedAction(action: QueuedAction) {
    if (isActionRunnable(action)) {
        trace.info(`running action ${action.uuid}`, 'actionQueue');
        logActionEvent('running action', action);

        // track that this action is being run so we don't run it again
        runningActions.set(action.id, { ...action, timeStarted: getTime('action_start') });

        // run it and complete the queued action when it finishes
        internalRunAction(action, (result: QueuedActionResult) =>
            completeQueuedAction(action, result)
        );
    }
}

function internalRunAction(
    action: Omit<QueuedAction, 'id'>,
    completeHandler: (result: QueuedActionResult) => Promise<void>,
    link: ApolloLink = executeLink
) {
    // make a copy of the operation & context so we can mutate it by adding things to the context the resolvers
    // need, but aren't queueable/clonable
    const context = fromBasicContext(
        JSON.parse(JSON.stringify(action.operation.context))
    ) as unknown as QueuedActionContext;
    const executableOperation = {
        ...action.operation,
        context,
    };

    // this tells the links to route this for online execution
    executableOperation.context.queuedAction.state = 'OnlineExecution';

    // some resolvers expect a gql client available on the context
    (executableOperation.context as unknown as ResolverContext).client = contextClient;

    try {
        let fetchResult: FetchResult<Record<string, any>, Record<string, any>, Record<string, any>>;
        const sub = ApolloLink.execute(link, executableOperation).subscribe({
            next: (r: FetchResult<any, any, any>) => {
                fetchResult = r;
            },
            error: (fetchError: Error) => {
                trace.info(`error ${action.uuid} ${fetchError.message}`, 'actionQueue');
                logActionEvent('action error', action, { error: fetchError.message });
                sub.unsubscribe();
                completeHandler({ fetchError });
            },
            complete: () => {
                trace.info(`completed normally ${action.uuid}`, 'actionQueue');
                logActionEvent('action completed', action);
                sub.unsubscribe();
                completeHandler({ fetchResult });
            },
        });
    } catch (e) {
        const fetchError: Error = e;
        const msg = `${action.opName} ${action.uuid} failed to execute.`;
        trace.warn(msg, 'actionQueue');

        // we shouldn't error even trying to execute/subscribe the link, so alert if we do
        /* eslint-disable-next-line owa-custom-rules/no-error-dynamic-event-names -- (https://aka.ms/OWALintWiki)
         * The error name (message) must be a string literal (no variables in it).
         *	> Error names can only be a string literals. Use the diagnosticInfo to add custom data. */
        errorThatWillCauseAlert(msg, fetchError);
        completeHandler({ fetchError });
    }
}

function isActionRunnable(action: QueuedAction) {
    let runnable = true;
    if (runnable) {
        if (runningActions.has(action.id)) {
            // not runnable if we're already running it
            trace.info(`${action.uuid} already running`, 'actionQueue');
            runnable = false;
        } else if (queueDependencies.isBlockedByEarlierAction(action)) {
            // not runnable if it shares a blocking key with an earlier action that hasn't finished
            trace.info(`${action.uuid} blocked by earlier action`, 'actionQueue');
            logActionEvent('action blocked', action);
            runnable = false;
        } else if (action.blockingKeys.some(k => isClientId(k))) {
            // not runnable if it has a client id that hasn't been replaced with a server id
            trace.info(`${action.uuid} blocked by clientid`, 'actionQueue');
            logActionEvent('action blocked by clientid', action);
            runnable = false;
        } else if (getThrottleState(action.rejectionLog) !== 'Runnable') {
            // not runnable if it's being throttled by recent rejection history
            trace.info(`${action.uuid} throttled`, 'actionQueue');
            logActionEvent('action throttled', action, { ...action.rejectionLog.latest });
            runnable = false;
        }
    }

    return runnable;
}

function isQueueRunnable(): boolean {
    const rv = isQueueStarted && isLeader() && runningActions.size < maxRunningActions;
    if (!rv) {
        trace.info(
            `queue not runnable ${isQueueStarted} leader: ${isLeader()}, running: ${
                runningActions.size
            }`,
            'actionQueue'
        );
    }

    return rv;
}

async function completeQueuedAction(
    action: QueuedAction,
    result: QueuedActionResult,
    forceAccept: boolean = false
): Promise<void> {
    // the result processor for the action determines whether this action gets removed from the queue
    // (both in memory and persisted) because the results are considered final or if the action needs to
    // be kept in the queue for re-execution at a future time (e.g., typically because the network was down)
    let runQueueAfterComplete = true;
    try {
        delete (action as Partial<RunningAction>).timeStarted;

        // force accept things like orphaned actions -- don't want custom result processors to keep these in the queue
        const processed = forceAccept
            ? {
                  fetchResult: result.fetchResult,
                  fetchError: result.fetchError,
                  idChanges: NO_ID_CHANGES,
              }
            : await processResult(action, result);

        if (isResultAccepted(processed)) {
            trace.info(`action result ${action.uuid} accepted.`, 'actionQueue');
            logActionEvent('action result accepted', action);

            // NOTE: we want to update the in memory data structures: dependencies, queue, and id remappings all
            // in the same tick so that when actions are unblocked/reattempted, they have the proper dependencies
            // and id remappings.  If we interleave async actions then an action might be come runnable before its ids
            // are updated, for example.  After the in-memory state is updated, then do async work to persist the changes to the disk

            // remove from in memory dependency tree to unblock dependent actions
            queueDependencies.remove(action);

            // remove from in memory queue to stop trying to run it
            const beforeLength = queue.length;
            for (let i = 0; i < queue.length; ++i) {
                if (queue[i].id === action.id) {
                    queue.splice(i, 1);
                    break;
                }
            }
            const afterLength = queue.length;

            // update in memory actions that reference ids this action remapped
            const modifiedActions = updateQueuedIds(processed);

            // remove from persisted queue so that another leader won't try to run this
            const queuedActionDb = await getQueuedActionDb();
            await queuedActionDb.actions.delete(action.id);

            // Remove any tombstone entries for this action uuid
            await removeTombstones(
                await getDatabase(action.operation.variables?.mailboxInfo),
                action.uuid,
                !!processed?.fetchResult?.data /* isActionSucceeded */
            );

            // call back the result to main for writing into the UI cache, etc.
            // update the actions on disk that need their ids remapped
            await persistRemappedIds(modifiedActions);

            // call back the result to main
            publishAcceptedResult(processed, action);

            // report the queue status to telemetry and diagnostic pane
            if (afterLength < beforeLength) {
                reportQueueStatus(queue, ReportType.Waning, beforeLength - afterLength);
            }
        } else {
            trace.info(
                `action result ${action.uuid} not accepted with code ${processed.rejectCode}`,
                'actionQueue'
            );
            logActionEvent('action result rejected', action, {
                code: processed.rejectCode,
            });

            // this just avoids spamming attempts from the queue if we appear to be offline or need to refresh the canary, etc
            runQueueAfterComplete = false;

            // if the action was rejected because of a canary, refresh the canary
            if (processed.rejectCode === 'Canary') {
                await queueCallbacks.updateRequestSettings();
            }

            // keep in the queue and record when we last ran this, for usage TBD (orphan/quarantine requests at some point)
            action.attempts++;
            action.lastAttempt = getTime('action_attempted');

            // keep an updated rejection log for this action
            action.rejectionLog = updateRejectionLog(action.rejectionLog, processed);

            const queuedActionDb = await getQueuedActionDb();
            await queuedActionDb.actions.update(action.id, {
                attempts: action.attempts,
                lastAttempt: action.lastAttempt,
                rejectionLog: action.rejectionLog,
            });

            // mainly a test hook to give visibility into queue rejection, but could be used for logging TBD
            publishRejectedResult(processed, action);
        }
    } finally {
        // this gets removed from running datastructure regardless of whether its results are "acceptable"
        // if it is accepted, it should not be removed from running actions before being removed from the queue,
        // otherwise it might execute again
        runningActions.delete(action.id);
        queueCallbacks?.updateDiagnosticPanel(getActionQueueSummary);
    }

    // after completing an action, try to run things again because previously blocking dependencies
    // might have resolved, but don't be so eager if the last action failed because of offline.
    // there's a backup timer to sweep again if we do get connectivity back.
    if (runQueueAfterComplete) {
        runQueue();
    }
}

async function loadQueue(): Promise<void> {
    const queuedActionDb = await getQueuedActionDb();
    const rawActions: (QueuedAction | QueuedActionV0)[] = await trackQueueDbHealth(() =>
        queuedActionDb.actions
            .where(QueuedActionId)
            .above(maxActionIdLoadedSoFar)
            .sortBy(QueuedActionId)
    );

    // upgrade any actions to the current version
    const upgradedActions = rawActions.map(coerceVersion);

    // filter out actions that have already been loaded into memory.  the db query does this, but
    // it's possible the db query is called again before the await completes
    const newActions = upgradedActions.filter(
        a => isWellFormed(a) && a.id > maxActionIdLoadedSoFar
    );

    // track actions that are not well formed
    const corruptActions = upgradedActions.filter(a => !isWellFormed(a));

    if (newActions.length) {
        // update recently remapped ids in newly loaded actions.  this is to try to handle
        // the race condition where the ux submits an action with a previously remapped
        // id before learning about the change
        const remappedActions = updateIds(newActions, recentIdRemappings[0]);
        remappedActions.push(...updateIds(newActions, recentIdRemappings[1]));

        // add the newly loaded actions to the in memory queue.
        queue.push(...newActions);
        maxActionIdLoadedSoFar = queue[queue.length - 1].id;

        // add any newly read actions to the dependency mapping
        newActions.forEach(action => {
            queueDependencies.add(action);
        });

        // put newly loaded actions that have submit processors into the submit processor queue
        submitProcessorQueue.push(...newActions.filter(a => !!a.submitProcessor));

        // persist any id updates
        if (remappedActions.length > 0) {
            persistRemappedIds(remappedActions).catch(e => {
                // don't need to halt the queue running if these can't be persisted
                trace.warn(`failed to persist remapped ids ${e.message}`, 'actionQueue');
            });
        }
    }

    await runSubmitProcessors();

    // try to delete any corrupt actions from the db but don't need to block on it.
    // it should be very rare.
    if (corruptActions.length) {
        const corruptIds = corruptActions.map(a => a.id);
        logUsage('actionqueue_corrupt_actions', { count: corruptIds.length });
        queuedActionDb.actions.bulkDelete(corruptIds).catch(e => {
            trace.warn(`failed to delete corrupt actions ${e.message}`, 'actionQueue');
        });
    }
}

type KeyChange = {
    key: number;
    changes: Partial<QueuedAction>;
};

function updateQueuedIds(accepted: AcceptedQueuedResult) {
    // remap ids of in memory queued actions
    const modifiedActions = updateIds(queue, accepted.idChanges);

    // remap ids of in memory dependencies
    queueDependencies.update(accepted.idChanges);

    // remember recently remapped ids to apply to newly submitted actions
    for (const change of accepted.idChanges) {
        recentIdRemappings[0].set(change[0], change[1]);
    }

    return modifiedActions;
}

async function persistRemappedIds(modifiedActions: QueuedAction[]) {
    if (modifiedActions.length > 0) {
        const dbChanges: KeyChange[] = modifiedActions.map(a => ({
            key: a.id,
            changes: {
                operation: a.operation,
                blockingKeys: a.blockingKeys,
                submitProcessor: a.submitProcessor,
            },
        }));

        const queuedActionDb = await getQueuedActionDb();
        await queuedActionDb.actions.bulkUpdate(dbChanges as any);
    }
}

function publishAcceptedResult(accepted: AcceptedQueuedResult, action: Omit<QueuedAction, 'id'>) {
    // native error objects don't clone, so they have to be manually serialized to send across the bridge
    if (accepted.fetchResult) {
        accepted.fetchResult = serializeResultErrors(accepted.fetchResult);
    }

    if (accepted.fetchError) {
        accepted.fetchError = serializeError(accepted.fetchError);
    }

    // fire and forget -- don't need to wait for broadcast result
    queueCallbacks.queuedActionAccepted(
        action.operation,
        action.uuid,
        action.cachePolicy,
        accepted
    );
}

function publishRejectedResult(rejected: RejectedQueuedResult, action: QueuedAction) {
    // fire and forget -- don't need to wait for broadcast result
    queueCallbacks.queuedActionRejected?.(rejected, action.operation);
}

function executeOnlineFallback(queueableAction: Omit<QueuedAction, 'id'>, fallback: ApolloLink) {
    internalRunAction(
        queueableAction,
        async (result: QueuedActionResult) => {
            trace.info(`ran backup online result for ${queueableAction.uuid}.`, 'actionQueue');
            logActionEvent('online fallback completed', queueableAction);

            const processed = await processResult(queueableAction, result);
            if (isResultAccepted(processed)) {
                logActionEvent('online fallback accepted', queueableAction);
                publishAcceptedResult(processed, queueableAction);
            }

            return Promise.resolve();
        },
        fallback
    );
}

type OrphanedAction = [QueuedAction, QueuedActionResult];
function detectOrphanedActions() {
    const orphans: OrphanedAction[] = [];

    // one type of orphan is an action that has been running for a long time without completing, taking up a running action slot
    runningActions.forEach(runningAction => {
        const now = getTime('action_orphan');
        const duration = now - runningAction.timeStarted;
        const opName = runningAction?.operation?.operationName || 'unknown';
        const uuid = runningAction.uuid;
        if (duration > ORPHANED_ACTION_THRESHOLD_MS) {
            const fetchError = new Error("Action running didn't complete within orphan interval.");
            logUsage('actionqueue_orphan_duration', { duration, opName, uuid });
            trace.warn(fetchError.message, 'actionQueue');
            orphans.push([runningAction, { fetchError }]);
        }
    });

    // another type of orphan is if the lowest id action in the queue has an unmapped blocking clientid
    // because there's no hope of an action with a lower id ever completing and remapping its clientid
    if (runningActions.size === 0 && queue.length > 0) {
        // the queue is always sorted by id in memory so the lowest id action is always at index 0
        const lowestIdAction = queue[0];

        if (lowestIdAction.blockingKeys.some(k => isClientId(k))) {
            const opName = lowestIdAction?.operation?.operationName || 'unknown';
            const uuid = lowestIdAction.uuid;
            const fetchError = new Error('Action has an unmappable clientid.');
            logUsage('actionqueue_orphan_clientid', { opName, uuid });
            trace.warn(fetchError.message, 'actionQueue');
            orphans.push([lowestIdAction, { fetchError }]);
        }
    }

    // another type of orphan is one that has exceeded the final throttle threshold for a given error
    queue.forEach(action => {
        const throttleState = getThrottleState(action.rejectionLog);
        if (throttleState === 'Dead') {
            const opName = action?.operation?.operationName || 'unknown';
            const fetchError = new Error('Action has exceeded throttle threshold.');
            const latest = action.rejectionLog.latest;
            logUsage('actionqueue_orphan_throttle', { opName, ...latest });
            trace.warn(fetchError.message, 'actionQueue');
            orphans.push([action, { fetchError }]);
        }
    });

    // force complete all the ophaned actions with errors
    orphans.forEach(([action, error]) => {
        logActionEvent('orphaned action detected', action, { error: error.fetchError?.message });
        completeQueuedAction(action, error, true /* force accept */);
    });
}

function handleNetworkChange(networkAvailable: boolean): void {
    if (networkAvailable) {
        trace.info('detected connectivity change +', 'actionQueue');
        signalQueueChanged();
    }
}

// run the submit processor that are in the submit processor queue
async function runSubmitProcessors() {
    if (submitProcessorQueue.length > 0) {
        const edits = new Map<number, QueuedActionEdit>();
        const retries = new Array<QueuedAction>();

        for (const action of submitProcessorQueue) {
            const rv = processSubmit(
                action,
                queue,
                queueDependencies.getSubmitKeyActions,
                (id: number) => runningActions.has(id)
            );

            if (rv === 'Retry') {
                // if the submit processor needs to retry, keep it around
                retries.push(action);
            } else {
                // gather the edits to apply.  if an action would be edited multiple times, the last edit wins
                for (const edit of rv) {
                    edits.set(edit.id, edit);
                }
            }
        }

        // save the retries for another pass
        submitProcessorQueue = retries;

        // if there were edits to apply to the queue, apply them.
        // these should all be done in the same tick so that the queue is in a consistent state before runQueue executes again
        if (edits.size > 0) {
            const updatedQueue: QueuedAction[] = [];
            const deletedActions: QueuedAction[] = [];
            const cancelledActions: QueuedAction[] = [];
            const updatedActions: QueuedActionUpdate[] = [];

            for (let i = 0; i < queue.length; ++i) {
                const action = queue[i];
                const edit = edits.get(action.id) || { kind: 'None' };
                const kind = edit.kind;
                switch (kind) {
                    case 'Delete':
                        deletedActions.push(action);
                        queueDependencies.remove(action);
                        break;
                    case 'Cancel':
                        cancelledActions.push(action);
                        deletedActions.push(action);
                        queueDependencies.remove(action);
                        break;
                    case 'Update':
                        updatedActions.push(edit);
                        updatedQueue.push(action);
                        break;
                    case 'None':
                        updatedQueue.push(action);
                        break;
                    default:
                        assertNever(kind);
                }
            }

            queue = updatedQueue;
            const queuedActionDb = await getQueuedActionDb();

            // now that the queue is in a consistent state in memory, persist the changes to the action queue db
            if (deletedActions.length > 0) {
                await queuedActionDb.actions.bulkDelete(deletedActions.map(a => a.id));
            }

            if (updatedActions.length > 0) {
                await queuedActionDb.actions.bulkUpdate(
                    updatedActions.map(a => ({ key: a.id, changes: a }))
                );
            }

            // remove tombstones
            for (const action of deletedActions) {
                await removeTombstones(
                    await getDatabase(action.operation.variables?.mailboxInfo),
                    action.uuid,
                    true /* isActionSucceeded */
                );
            }

            // publish any cancelled actions with a cancel status
            cancelledActions.forEach(action => {
                logActionEvent('action cancelled', action);
                const acceptedResult: AcceptedQueuedResult = {
                    fetchError: new Error('Action cancelled'),
                    idChanges: NO_ID_CHANGES,
                };

                publishAcceptedResult(acceptedResult, action);
            });
        }
    }
}

function ageOutRecentIdRemappings() {
    // swap oldest and newest and reset newest to be empty
    recentIdRemappings.reverse();
    recentIdRemappings[0].clear();
}

// try to coerce an known action blob to action of the current version
function coerceVersion(action: Record<string, any>): QueuedAction {
    if (isCurrentAction(action)) {
        return action;
    } else if (isV0Action(action)) {
        // found a v0 action to upgrade to current
        return {
            ...action,
            version: 2, // it's upgraded now, in case we persist it back to disk
            blockingKeys: action.depends,
            opName: action.operation?.operationName || 'unknown',
            rejectionLog: { records: {} },
            submitProcessor: undefined,
        };
    } else if (isV1Action(action)) {
        // found a v1 action to upgrade to current
        const submitProcessor: SubmitProcessor | undefined = action.replacesKey
            ? { name: 'Replace', key: action.replacesKey }
            : action.cancelKey
            ? { name: 'Cancel', key: action.cancelKey }
            : undefined;
        return {
            ...action,
            version: 2, // it's upgraded now, in case we persist it back to disk
            submitProcessor,
        };
    } else {
        // some (future) version we don't know about.  whoever created it should have made it backwards
        // compatible, but if not, at least try to set defaults to allow this to run.
        const src: QueuedAction = action as QueuedAction;
        const rv: Required<QueuedAction> = {
            version: src.version,
            operation: src.operation,
            opName: src.opName || src.operation?.operationName,
            attempts: src.attempts || 0,
            blockingKeys: src.blockingKeys || [],
            resultProccesor: src.resultProccesor || 'Default',
            cachePolicy: src.cachePolicy || 'no-cache',
            id: src.id,
            uuid: src.uuid || 'invalid',
            lastAttempt: 0,
            submitProcessor: src.submitProcessor,
            rejectionLog: src.rejectionLog || { records: {} },
        };

        return rv;
    }
}

function isV0Action(action: Record<string, any>): action is QueuedActionV0 {
    return !action.version || action.version === 0;
}

function isV1Action(action: Record<string, any>): action is QueuedActionV1 {
    return action.version === 1;
}

function isCurrentAction(action: Record<string, any>): action is QueuedAction {
    return action.version === 2;
}

function logActionEvent(msg: string, action: Omit<QueuedAction, 'id'>, data?: LogData) {
    emitSyncEvent(`actionQueue:${msg}`, { uuid: action.uuid, operation: action.opName, ...data });
}

function logQueueEvent(msg: string, data?: LogData) {
    emitSyncEvent(`actionQueue:${msg}`, {
        ...data,
    });
}

// a bit of a corner case, but if there's a worker leader who is doing action queue work and gets deposed
// we should clean up the in memory queue state in case its ever activated as leader again
function checkObsoleteLeaderStatus() {
    if (!isLeader() && queue.length > 0 && runningActions.size === 0) {
        logQueueEvent('queue_obsolete_leader', { size: queue.length });
        queueReset();
    }
}

function queueReset() {
    queue.length = 0;
    submitProcessorQueue.length = 0;
    queueDependencies.clear();
    maxActionIdLoadedSoFar = -1;
    lastActionIdVisited = -1;
}

export function testReset() {
    /* eslint-disable-next-line owa-custom-rules/no-recasting-null-undefined  -- (https://aka.ms/OWALintWiki)
     * Justification: test code
     */
    queueCallbacks = <any>undefined;
    /* eslint-disable-next-line owa-custom-rules/no-recasting-null-undefined  -- (https://aka.ms/OWALintWiki)
     * Justification: test code
     */
    executeLink = <any>undefined;
    recentIdRemappings = [new Map(), new Map()];
    unsubscribeFromNetworkChange?.();
    isQueueStarted = false;
    runningActions.clear();

    queueReset();
}

function getActionQueueSummary() {
    return {
        isStarted: isQueueStarted,
        isLeader: isLeader(),
        queue: queue.map(action => ({
            id: action.id,
            opName: action.opName,
            uuid: action.uuid,
            attempts: action.attempts,
            lastAttempt: new Date(action.lastAttempt).toUTCString(),
            isBlocked: queueDependencies.isBlockedByEarlierAction(action),
            lastError: action.rejectionLog.latest?.code,
            lastErrorCount: action.rejectionLog.latest?.totalCount,
            blockingKeys: action.blockingKeys,
            variables: action.operation.variables,
        })),
        running: Array.from(runningActions.keys()),
    };
}

function isWellFormed(action: QueuedAction): boolean {
    return action && !!action.operation;
}
