/*
 * Decompiled with CFR 0.152.
 */
package shaded.org.apache.hadoop.hdfs.server.namenode.ha;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import shaded.org.apache.hadoop.classification.InterfaceAudience;
import shaded.org.apache.hadoop.classification.InterfaceStability;
import shaded.org.apache.hadoop.conf.Configuration;
import shaded.org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import shaded.org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import shaded.org.apache.hadoop.hdfs.server.namenode.EditLogInputException;
import shaded.org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import shaded.org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import shaded.org.apache.hadoop.hdfs.server.namenode.FSImage;
import shaded.org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import shaded.org.apache.hadoop.hdfs.server.namenode.NameNode;
import shaded.org.apache.hadoop.hdfs.server.namenode.ha.RemoteNameNodeInfo;
import shaded.org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import shaded.org.apache.hadoop.ipc.RPC;
import shaded.org.apache.hadoop.security.SecurityUtil;
import shaded.org.apache.hadoop.util.ExitUtil;
import shaded.org.apache.hadoop.util.Time;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class EditLogTailer {
    public static final Log LOG = LogFactory.getLog(EditLogTailer.class);
    private final EditLogTailerThread tailerThread = new EditLogTailerThread();
    private final Configuration conf;
    private final FSNamesystem namesystem;
    private final Iterator<RemoteNameNodeInfo> nnLookup;
    private FSEditLog editLog;
    private RemoteNameNodeInfo currentNN;
    private long lastRollTriggerTxId = -12345L;
    private long lastLoadedTxnId = -12345L;
    private long lastLoadTimeMs;
    private long lastRollTimeMs;
    private final long logRollPeriodMs;
    private final long rollEditsTimeoutMs;
    private final ExecutorService rollEditsRpcExecutor;
    private final long sleepTimeMs;
    private final long maxSleepTimeMs;
    private final int nnCount;
    private NamenodeProtocol cachedActiveProxy = null;
    private int nnLoopCount = 0;
    private int maxRetries;
    private final boolean inProgressOk;

    public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
        this.conf = conf;
        this.namesystem = namesystem;
        this.editLog = namesystem.getEditLog();
        this.lastLoadTimeMs = Time.monotonicNow();
        this.lastRollTimeMs = Time.monotonicNow();
        this.logRollPeriodMs = conf.getTimeDuration("dfs.ha.log-roll.period", 120L, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
        List<Object> nns = Collections.emptyList();
        if (this.logRollPeriodMs >= 0L) {
            try {
                nns = RemoteNameNodeInfo.getRemoteNameNodes(conf);
            }
            catch (IOException e) {
                throw new IllegalArgumentException("Remote NameNodes not correctly configured!", e);
            }
            for (RemoteNameNodeInfo remoteNameNodeInfo : nns) {
                InetSocketAddress ipc = NameNode.getServiceAddress(remoteNameNodeInfo.getConfiguration(), true);
                Preconditions.checkArgument((ipc.getPort() > 0 ? 1 : 0) != 0, (String)"Active NameNode must have an IPC port configured. Got address '%s'", (Object[])new Object[]{ipc});
                remoteNameNodeInfo.setIpcAddress(ipc);
            }
            LOG.info((Object)("Will roll logs on active node every " + this.logRollPeriodMs / 1000L + " seconds."));
        } else {
            LOG.info((Object)"Not going to trigger log rolls on active node because dfs.ha.log-roll.period is negative.");
        }
        this.sleepTimeMs = conf.getTimeDuration("dfs.ha.tail-edits.period", 60L, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
        long maxSleepTimeMsTemp = conf.getTimeDuration("dfs.ha.tail-edits.period.backoff-max", 0L, TimeUnit.MILLISECONDS);
        if (maxSleepTimeMsTemp > 0L && maxSleepTimeMsTemp < this.sleepTimeMs) {
            LOG.warn((Object)("dfs.ha.tail-edits.period.backoff-max was configured to be " + maxSleepTimeMsTemp + " ms, but this is less than " + "dfs.ha.tail-edits.period" + ". Disabling backoff when tailing edit logs."));
            this.maxSleepTimeMs = 0L;
        } else {
            this.maxSleepTimeMs = maxSleepTimeMsTemp;
        }
        this.maxRetries = conf.getInt("dfs.ha.tail-edits.namenode-retries", 3);
        if (this.maxRetries <= 0) {
            LOG.error((Object)"Specified a non-positive number of retries for the number of retries for the namenode connection when manipulating the edit log (dfs.ha.tail-edits.namenode-retries), setting to default: 3");
            this.maxRetries = 3;
        }
        this.inProgressOk = conf.getBoolean("dfs.ha.tail-edits.in-progress", false);
        this.nnCount = nns.size();
        this.nnLookup = Iterators.cycle(nns);
        this.rollEditsTimeoutMs = conf.getTimeDuration("dfs.ha.tail-edits.rolledits.timeout", 60L, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
        this.rollEditsRpcExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build());
        LOG.debug((Object)("logRollPeriodMs=" + this.logRollPeriodMs + " sleepTime=" + this.sleepTimeMs));
    }

    public void start() {
        this.tailerThread.start();
    }

    public void stop() throws IOException {
        this.tailerThread.setShouldRun(false);
        this.tailerThread.interrupt();
        try {
            this.tailerThread.join();
        }
        catch (InterruptedException e) {
            LOG.warn((Object)"Edit log tailer thread exited with an exception");
            throw new IOException(e);
        }
        finally {
            this.rollEditsRpcExecutor.shutdown();
        }
    }

    @VisibleForTesting
    FSEditLog getEditLog() {
        return this.editLog;
    }

    @VisibleForTesting
    public void setEditLog(FSEditLog editLog) {
        this.editLog = editLog;
    }

    public void catchupDuringFailover() throws IOException {
        Preconditions.checkState((this.tailerThread == null || !this.tailerThread.isAlive() ? 1 : 0) != 0, (Object)"Tailer thread should not be running once failover starts");
        SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<Void>(){

            @Override
            public Void run() throws Exception {
                long editsTailed = 0L;
                do {
                    try {
                        editsTailed = EditLogTailer.this.doTailEdits();
                    }
                    catch (InterruptedException e) {
                        throw new IOException(e);
                    }
                } while (editsTailed > 0L);
                return null;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public long doTailEdits() throws IOException, InterruptedException {
        this.namesystem.writeLockInterruptibly();
        try {
            Collection<EditLogInputStream> streams;
            FSImage image = this.namesystem.getFSImage();
            long lastTxnId = image.getLastAppliedTxId();
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("lastTxnId: " + lastTxnId));
            }
            try {
                streams = this.editLog.selectInputStreams(lastTxnId + 1L, 0L, null, this.inProgressOk, true);
            }
            catch (IOException ioe) {
                LOG.warn((Object)"Edits tailer failed to find any streams. Will try again later.", (Throwable)ioe);
                long l = 0L;
                this.namesystem.writeUnlock();
                return l;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("edit streams to load from: " + streams.size()));
            }
            long editsLoaded = 0L;
            try {
                editsLoaded = image.loadEdits(streams, this.namesystem);
            }
            catch (EditLogInputException elie) {
                try {
                    editsLoaded = elie.getNumEditsLoaded();
                    throw elie;
                }
                catch (Throwable throwable) {
                    if (editsLoaded > 0L || LOG.isDebugEnabled()) {
                        LOG.debug((Object)String.format("Loaded %d edits starting from txid %d ", editsLoaded, lastTxnId));
                    }
                    throw throwable;
                }
            }
            if (editsLoaded > 0L || LOG.isDebugEnabled()) {
                LOG.debug((Object)String.format("Loaded %d edits starting from txid %d ", editsLoaded, lastTxnId));
            }
            if (editsLoaded > 0L) {
                this.lastLoadTimeMs = Time.monotonicNow();
            }
            this.lastLoadedTxnId = image.getLastAppliedTxId();
            long l = editsLoaded;
            return l;
        }
        finally {
            this.namesystem.writeUnlock();
        }
    }

    public long getLastLoadTimeMs() {
        return this.lastLoadTimeMs;
    }

    private boolean tooLongSinceLastLoad() {
        return this.logRollPeriodMs >= 0L && Time.monotonicNow() - this.lastRollTimeMs > this.logRollPeriodMs;
    }

    @VisibleForTesting
    Callable<Void> getNameNodeProxy() {
        return new MultipleNameNodeProxy<Void>(){

            @Override
            protected Void doWork() throws IOException {
                EditLogTailer.this.cachedActiveProxy.rollEditLog();
                return null;
            }
        };
    }

    @VisibleForTesting
    void triggerActiveLogRoll() {
        LOG.info((Object)"Triggering log roll on remote NameNode");
        Future<Void> future = null;
        try {
            future = this.rollEditsRpcExecutor.submit(this.getNameNodeProxy());
            future.get(this.rollEditsTimeoutMs, TimeUnit.MILLISECONDS);
            this.lastRollTimeMs = Time.monotonicNow();
            this.lastRollTriggerTxId = this.lastLoadedTxnId;
        }
        catch (ExecutionException e) {
            LOG.warn((Object)"Unable to trigger a roll of the active NN", (Throwable)e);
        }
        catch (TimeoutException e) {
            if (future != null) {
                future.cancel(true);
            }
            LOG.warn((Object)String.format("Unable to finish rolling edits in %d ms", this.rollEditsTimeoutMs));
        }
        catch (InterruptedException e) {
            LOG.warn((Object)"Unable to trigger a roll of the active NN", (Throwable)e);
        }
    }

    @VisibleForTesting
    void sleep(long sleepTimeMillis) throws InterruptedException {
        Thread.sleep(sleepTimeMillis);
    }

    @VisibleForTesting
    abstract class MultipleNameNodeProxy<T>
    implements Callable<T> {
        MultipleNameNodeProxy() {
        }

        protected abstract T doWork() throws IOException;

        @Override
        public T call() throws IOException {
            EditLogTailer.this.nnLoopCount = 0;
            while ((EditLogTailer.this.cachedActiveProxy = this.getActiveNodeProxy()) != null) {
                try {
                    T ret = this.doWork();
                    return ret;
                }
                catch (IOException e) {
                    LOG.warn((Object)("Exception from remote name node " + EditLogTailer.this.currentNN + ", try next."), (Throwable)e);
                    EditLogTailer.this.cachedActiveProxy = null;
                    EditLogTailer.this.nnLoopCount++;
                }
            }
            throw new IOException("Cannot find any valid remote NN to service request!");
        }

        private NamenodeProtocol getActiveNodeProxy() throws IOException {
            if (EditLogTailer.this.cachedActiveProxy == null) {
                while (true) {
                    if (EditLogTailer.this.nnLoopCount / EditLogTailer.this.nnCount >= EditLogTailer.this.maxRetries) {
                        return null;
                    }
                    EditLogTailer.this.currentNN = (RemoteNameNodeInfo)EditLogTailer.this.nnLookup.next();
                    try {
                        int rpcTimeout = EditLogTailer.this.conf.getInt("dfs.ha.log-roll.rpc.timeout", 20000);
                        NamenodeProtocolPB proxy = RPC.waitForProxy(NamenodeProtocolPB.class, RPC.getProtocolVersion(NamenodeProtocolPB.class), EditLogTailer.this.currentNN.getIpcAddress(), EditLogTailer.this.conf, rpcTimeout, Long.MAX_VALUE);
                        EditLogTailer.this.cachedActiveProxy = new NamenodeProtocolTranslatorPB(proxy);
                    }
                    catch (IOException e) {
                        LOG.info((Object)("Failed to reach " + EditLogTailer.this.currentNN), (Throwable)e);
                        EditLogTailer.this.nnLoopCount++;
                        continue;
                    }
                    break;
                }
            }
            assert (EditLogTailer.this.cachedActiveProxy != null);
            return EditLogTailer.this.cachedActiveProxy;
        }
    }

    private class EditLogTailerThread
    extends Thread {
        private volatile boolean shouldRun;

        private EditLogTailerThread() {
            super("Edit log tailer");
            this.shouldRun = true;
        }

        private void setShouldRun(boolean shouldRun) {
            this.shouldRun = shouldRun;
        }

        @Override
        public void run() {
            SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Object>(){

                @Override
                public Object run() {
                    EditLogTailerThread.this.doWork();
                    return null;
                }
            });
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doWork() {
            long currentSleepTimeMs = EditLogTailer.this.sleepTimeMs;
            while (this.shouldRun) {
                long editsTailed = 0L;
                try {
                    boolean triggeredLogRoll = false;
                    if (EditLogTailer.this.tooLongSinceLastLoad() && EditLogTailer.this.lastRollTriggerTxId < EditLogTailer.this.lastLoadedTxnId) {
                        EditLogTailer.this.triggerActiveLogRoll();
                        triggeredLogRoll = true;
                    }
                    if (!this.shouldRun) break;
                    EditLogTailer.this.namesystem.cpLockInterruptibly();
                    try {
                        editsTailed = EditLogTailer.this.doTailEdits();
                    }
                    finally {
                        EditLogTailer.this.namesystem.cpUnlock();
                    }
                    if (triggeredLogRoll) {
                        EditLogTailer.this.namesystem.getFSImage().getStorage().updateNameDirSize();
                    }
                }
                catch (EditLogInputException elie) {
                    LOG.warn((Object)"Error while reading edits from disk. Will try again.", (Throwable)elie);
                }
                catch (InterruptedException ie) {
                    continue;
                }
                catch (Throwable t) {
                    LOG.fatal((Object)"Unknown error encountered while tailing edits. Shutting down standby NN.", t);
                    ExitUtil.terminate(1, t);
                }
                try {
                    currentSleepTimeMs = editsTailed == 0L && EditLogTailer.this.maxSleepTimeMs > 0L ? Math.min(EditLogTailer.this.maxSleepTimeMs, (currentSleepTimeMs == 0L ? 1L : currentSleepTimeMs) * 2L) : EditLogTailer.this.sleepTimeMs;
                    EditLogTailer.this.sleep(currentSleepTimeMs);
                }
                catch (InterruptedException e) {
                    LOG.warn((Object)"Edit log tailer interrupted", (Throwable)e);
                }
            }
        }
    }
}

