/*
 * Decompiled with CFR 0.152.
 */
package org.sourceid.saml20.service.impl.grouprpc.adaptive;

import com.pingidentity.common.util.consistent.Range;
import com.pingidentity.common.util.consistent.RangeList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.Address;
import org.jgroups.util.RspList;
import org.sourceid.config.ConfigStore;
import org.sourceid.saml20.service.RangeRecords;
import org.sourceid.saml20.service.StateService;
import org.sourceid.saml20.service.StateServiceId;
import org.sourceid.saml20.service.StateServiceRegistry;
import org.sourceid.saml20.service.impl.grouprpc.BaseGroupRpc;
import org.sourceid.saml20.service.impl.grouprpc.adaptive.AddressNode;
import org.sourceid.saml20.service.impl.grouprpc.adaptive.DownloadBatch;
import org.sourceid.saml20.service.impl.grouprpc.adaptive.DownloadServiceStateResult;
import org.sourceid.saml20.service.impl.grouprpc.adaptive.PurgeBatch;
import org.sourceid.saml20.service.impl.grouprpc.adaptive.RangeAndReplicas;
import org.sourceid.saml20.service.impl.grouprpc.adaptive.RebalanceState;
import org.sourceid.saml20.service.impl.grouprpc.adaptive.RebalanceTask;
import org.sourceid.saml20.state.AdaptiveClusteringConfig;

public class RebalanceWorker
extends Thread {
    private static final Log log = LogFactory.getLog(RebalanceWorker.class);
    private static final Class[] RETRIEVE_STATE_RECORDS_SIG = new Class[]{StateServiceId.class, Collection.class};
    private static final String RETRIEVE_STATE_RECORDS = "retrieveStateRecords";
    private static final String RECORD_IMPORT_DELAY_MILLIS = "RecordImportDelayMillis";
    private static final long REBALANCE_DELAY_MILLIS = 2000L;
    private RebalanceState rebalanceState = null;
    private BlockingQueue<RebalanceTask> taskQueue = new LinkedBlockingQueue<RebalanceTask>();
    private boolean initialRebalanceStarted = false;
    private boolean initialRebalanceComplete = false;
    private Object initialRebalanceSignal = new Object();
    private BaseGroupRpc groupRpc;
    private AdaptiveClusteringConfig configProps;
    private ConfigStore configStore;
    private StateServiceRegistry stateServiceRegistry;

    public RebalanceWorker(BaseGroupRpc groupRpc, StateServiceRegistry serviceRegistry, AdaptiveClusteringConfig configProps, ConfigStore configStore) {
        super("AdaptiveClusteringRebalanceThread");
        this.setDaemon(true);
        this.groupRpc = groupRpc;
        this.configProps = configProps;
        this.configStore = configStore;
        this.stateServiceRegistry = serviceRegistry;
        this.rebalanceState = new RebalanceState(serviceRegistry);
    }

    @Override
    public void run() {
        log.debug((Object)"Rebalance worker thread starting up");
        while (true) {
            this.doRebalance(true);
        }
    }

    public void submit(RebalanceTask task) {
        try {
            this.taskQueue.put(task);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    public boolean isRebalanceInProgress() {
        return this.rebalanceState.hasCurrentTask() || !this.taskQueue.isEmpty();
    }

    public boolean hasRange(StateServiceId serviceId, Range range) {
        return this.rebalanceState.hasRange(serviceId, range);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitForInitialRebalanceToComplete() {
        try {
            Object object = this.initialRebalanceSignal;
            synchronized (object) {
                long timeout = System.currentTimeMillis() + 10000L;
                while (!this.initialRebalanceStarted && System.currentTimeMillis() < timeout) {
                    this.initialRebalanceSignal.wait(1000L);
                }
                if (!this.initialRebalanceStarted) {
                    log.warn((Object)"Initial rebalance did not start within the timeout period");
                } else {
                    log.debug((Object)"Waiting for initial rebalance to complete");
                    while (!this.initialRebalanceComplete) {
                        this.initialRebalanceSignal.wait();
                    }
                }
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    protected boolean doRebalance(boolean canBlock) {
        if (this.checkForRebalanceTask(canBlock)) {
            this.setInitialRebalanceStarted();
            boolean done = false;
            try {
                done = this.doRebalanceBatch();
            }
            catch (Throwable t) {
                log.error((Object)"Unexpected error in rebalance worker", t);
                done = true;
            }
            if (done) {
                this.rebalanceState.handleRebalanceComplete();
                log.debug((Object)"Completed rebalance task");
                this.setInitialRebalanceComplete();
            }
            return done;
        }
        return true;
    }

    protected void onRestartRebalance() {
    }

    protected DownloadServiceStateResult downloadServiceState(Address address, StateService service, Collection<Range> ranges) {
        log.debug((Object)("Downloading state records from " + address + " for service " + service.getServiceId() + (String)(log.isTraceEnabled() ? " and ranges " + ranges.toString() : "")));
        Collection rangeRecordsList = null;
        try {
            RspList<Collection<RangeRecords>> rspList = this.retrieveStateRecords(address, service.getServiceId(), ranges);
            rangeRecordsList = (Collection)rspList.getFirst();
            if (rangeRecordsList == null) {
                throw new IOException("No response to download request within the timeout period");
            }
        }
        catch (Exception e) {
            log.error((Object)"Error downloading state records", (Throwable)e);
            return new DownloadServiceStateResult(Collections.emptyList(), ranges, true);
        }
        this.doImportDelay();
        Collection rangesNotFound = rangeRecordsList.stream().filter(rr -> !rr.isRangeFound()).map(RangeRecords::getRange).collect(Collectors.toList());
        Collection receivedRangeRecords = rangeRecordsList.stream().filter(RangeRecords::isRangeFound).collect(Collectors.toList());
        Collection receivedRanges = receivedRangeRecords.stream().map(RangeRecords::getRange).collect(Collectors.toList());
        log.debug((Object)("Importing state records into " + service.getServiceId() + " for " + receivedRanges.size() + " received ranges" + (String)(log.isTraceEnabled() ? ": " + receivedRanges : "")));
        for (RangeRecords rangeRecords : receivedRangeRecords) {
            if (rangeRecords.getRecords().size() <= 0) continue;
            service.importRecords(rangeRecords.getRecords());
            log.trace((Object)("Imported " + rangeRecords.getRecords().size() + " records for range " + rangeRecords.getRange()));
        }
        this.rebalanceState.addDownloadedRanges(service.getServiceId(), new RangeList(receivedRanges));
        ArrayList<Range> remainingRanges = new ArrayList<Range>(ranges);
        remainingRanges.removeAll(receivedRanges);
        remainingRanges.removeAll(rangesNotFound);
        return new DownloadServiceStateResult(remainingRanges, rangesNotFound, false);
    }

    protected RspList<Collection<RangeRecords>> retrieveStateRecords(Address address, StateServiceId serviceId, Collection<Range> ranges) {
        Vector<Address> addresses = new Vector<Address>(Collections.singleton(address));
        return this.groupRpc.callRemoteMethods(addresses, RETRIEVE_STATE_RECORDS, RETRIEVE_STATE_RECORDS_SIG, true, this.configProps.getRebalanceRpcTimeout(), new Object[]{serviceId, ranges});
    }

    protected long getRebalanceDelayMillis() {
        return 2000L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setInitialRebalanceStarted() {
        Object object = this.initialRebalanceSignal;
        synchronized (object) {
            this.initialRebalanceStarted = true;
            this.initialRebalanceSignal.notify();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setInitialRebalanceComplete() {
        Object object = this.initialRebalanceSignal;
        synchronized (object) {
            this.initialRebalanceComplete = true;
            this.initialRebalanceSignal.notify();
        }
    }

    private boolean checkForRebalanceTask(boolean canBlock) {
        RebalanceTask task = null;
        if (canBlock && !this.rebalanceState.hasCurrentTask()) {
            try {
                task = this.taskQueue.take();
            }
            catch (InterruptedException interruptedException) {}
        } else {
            task = (RebalanceTask)this.taskQueue.poll();
        }
        if (task != null) {
            try {
                Thread.sleep(this.getRebalanceDelayMillis());
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (this.rebalanceState.hasCurrentTask()) {
                this.onRestartRebalance();
            }
            this.rebalanceState.handleNewTask(task);
        }
        return this.rebalanceState.hasCurrentTask();
    }

    private boolean doRebalanceBatch() {
        Collection ranges;
        PurgeBatch purgeBatch = this.rebalanceState.removePurgeBatch();
        while (purgeBatch != null) {
            this.purgeRanges(purgeBatch.getServiceId(), purgeBatch.getRangeList());
            purgeBatch = this.rebalanceState.removePurgeBatch();
        }
        DownloadBatch downloadBatch = this.rebalanceState.removeDownloadBatch();
        if (downloadBatch == null) {
            return true;
        }
        StateService<?> service = this.stateServiceRegistry.getService(downloadBatch.getServiceId());
        Address address = ((AddressNode)downloadBatch.getNode()).getAddress();
        DownloadServiceStateResult result = this.downloadServiceState(address, service, ranges = (Collection)downloadBatch.getRangesAndReplicas().stream().map(RangeAndReplicas::getRange).collect(Collectors.toList()));
        if (!result.getMissingRanges().isEmpty()) {
            log.trace((Object)("The following ranges for service " + service.getServiceId() + " could not be retrieved from " + address + ": " + result.getMissingRanges()));
            Collection missingRangesAndReplicas = downloadBatch.getRangesAndReplicas().stream().filter(rnr -> result.getMissingRanges().contains(rnr.getRange())).collect(Collectors.toList());
            this.rebalanceState.addRangesToDownload(downloadBatch.getServiceId(), missingRangesAndReplicas);
        }
        if (result.isErrorOccurred()) {
            log.error((Object)("Error while downloading state records from " + address + ", ranges will be reallocated to a different node"));
            this.rebalanceState.reallocateRanges(downloadBatch.getNode());
        } else {
            Collection remainingRangesAndReplicas = downloadBatch.getRangesAndReplicas().stream().filter(rnr -> result.getRemainingRanges().contains(rnr.getRange())).collect(Collectors.toList());
            if (!remainingRangesAndReplicas.isEmpty()) {
                this.rebalanceState.addRangesToDownload(downloadBatch.getNode(), downloadBatch.getServiceId(), remainingRangesAndReplicas);
            }
        }
        return false;
    }

    private void purgeRanges(StateServiceId serviceId, RangeList toPurge) {
        log.debug((Object)("Purging ranges for service " + serviceId + (String)(log.isTraceEnabled() ? ", ranges=" + toPurge : "")));
        StateService<?> service = this.stateServiceRegistry.getService(serviceId);
        for (Range range : toPurge.getRanges()) {
            service.purgeRecords(range.getStart(), range.getEnd());
        }
        this.rebalanceState.removeDownloadedRanges(serviceId, toPurge);
        log.debug((Object)("Finished purging ranges for service " + serviceId));
    }

    private void doImportDelay() {
        long importDelayMillis = this.configStore.getLongValue(RECORD_IMPORT_DELAY_MILLIS, 0L);
        if (importDelayMillis > 0L) {
            try {
                log.debug((Object)("Sleeping " + importDelayMillis + " milliseconds before importing records"));
                Thread.sleep(importDelayMillis);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

