package org.apache.zookeeper.server.quorum;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.log4j.Logger;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.Leader;

/* loaded from: input_file:org/apache/zookeeper/server/quorum/FollowerHandler.class */
public class FollowerHandler extends Thread {
    private static final Logger LOG = Logger.getLogger(FollowerHandler.class);
    public final Socket sock;
    final Leader leader;
    long tickOfLastAck;
    protected long sid;
    final LinkedBlockingQueue<QuorumPacket> queuedPackets;
    private BinaryInputArchive ia;
    private BinaryOutputArchive oa;
    private BufferedOutputStream bufferedOutput;
    final QuorumPacket proposalOfDeath;

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getSid() {
        return this.sid;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FollowerHandler(Socket socket, Leader leader) throws IOException {
        super("FollowerHandler-" + socket.getRemoteSocketAddress());
        this.sid = 0L;
        this.queuedPackets = new LinkedBlockingQueue<>();
        this.proposalOfDeath = new QuorumPacket();
        this.sock = socket;
        this.leader = leader;
        leader.addFollowerHandler(this);
    }

    @Override // java.lang.Thread
    public String toString() {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("FollowerHandler ").append(this.sock);
        stringBuffer.append(" tickOfLastAck:").append(tickOfLastAck());
        stringBuffer.append(" synced?:").append(synced());
        stringBuffer.append(" queuedPacketLength:").append(this.queuedPackets.size());
        return stringBuffer.toString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendPackets() throws InterruptedException {
        long j = 16;
        while (true) {
            try {
                QuorumPacket poll = this.queuedPackets.poll();
                if (poll == null) {
                    this.bufferedOutput.flush();
                    poll = this.queuedPackets.take();
                }
                if (poll == this.proposalOfDeath) {
                    return;
                }
                if (poll.getType() == 5) {
                    j = 128;
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logQuorumPacket(LOG, j, 'o', poll);
                }
                this.oa.writeRecord(poll, "packet");
            } catch (IOException e) {
                if (this.sock.isClosed()) {
                    return;
                }
                LOG.warn("Unexpected exception", e);
                return;
            }
        }
    }

    public static String packetToString(QuorumPacket quorumPacket) {
        return null;
    }

    /* JADX INFO: Infinite loop detected, blocks: 50, insns: 0 */
    /* JADX WARN: Failed to find 'out' block for switch in B:64:0x02fb. Please report as an issue. */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v43, types: [java.util.LinkedList<org.apache.zookeeper.server.quorum.Leader$Proposal>] */
    /* JADX WARN: Type inference failed for: r0v44, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v51 */
    /* JADX WARN: Type inference failed for: r0v73, types: [org.apache.zookeeper.server.quorum.FollowerHandler$1] */
    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            try {
                try {
                    this.ia = BinaryInputArchive.getArchive(new BufferedInputStream(this.sock.getInputStream()));
                    this.bufferedOutput = new BufferedOutputStream(this.sock.getOutputStream());
                    this.oa = BinaryOutputArchive.getArchive(this.bufferedOutput);
                    QuorumPacket quorumPacket = new QuorumPacket();
                    this.ia.readRecord(quorumPacket, "packet");
                    if (quorumPacket.getType() != 11) {
                        LOG.error("First packet " + quorumPacket.toString() + " is not FOLLOWERINFO!");
                        LOG.warn("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********");
                        try {
                            this.queuedPackets.put(this.proposalOfDeath);
                        } catch (InterruptedException e) {
                            LOG.warn("Ignoring unexpected exception", e);
                        }
                        shutdown();
                        return;
                    }
                    if (quorumPacket.getData() != null) {
                        this.sid = ByteBuffer.wrap(quorumPacket.getData()).getLong();
                    } else {
                        this.sid = this.leader.followerCounter.getAndDecrement();
                    }
                    LOG.info("The follower sid: " + this.sid);
                    long zxid = quorumPacket.getZxid();
                    int i = 15;
                    boolean z = true;
                    long j = 0;
                    ?? r0 = this.leader.zk.committedLog;
                    synchronized (r0) {
                        if (this.leader.zk.committedLog.size() == 0) {
                            z = false;
                        } else if (this.leader.zk.maxCommittedLog >= zxid && this.leader.zk.minCommittedLog <= zxid) {
                            i = 13;
                            j = this.leader.zk.maxCommittedLog;
                            Iterator<Leader.Proposal> it = this.leader.zk.committedLog.iterator();
                            while (it.hasNext()) {
                                Leader.Proposal next = it.next();
                                if (next.packet.getZxid() > zxid) {
                                    queuePacket(next.packet);
                                    queuePacket(new QuorumPacket(4, next.packet.getZxid(), null, null));
                                }
                            }
                        }
                        r0 = r0;
                        long startForwarding = this.leader.startForwarding(this, zxid);
                        this.oa.writeRecord(new QuorumPacket(10, startForwarding, null, null), "packet");
                        this.bufferedOutput.flush();
                        if (zxid == startForwarding) {
                            i = 13;
                            j = startForwarding;
                        }
                        if (z && zxid > this.leader.zk.maxCommittedLog) {
                            i = 14;
                            j = this.leader.zk.maxCommittedLog;
                        }
                        this.oa.writeRecord(new QuorumPacket(i, j, null, null), "packet");
                        this.bufferedOutput.flush();
                        if (i == 15) {
                            LOG.warn("Sending snapshot last zxid of peer is 0x" + Long.toHexString(zxid) + "  zxid of leader is 0x" + Long.toHexString(startForwarding));
                            this.leader.zk.serializeSnapshot(this.oa);
                            this.oa.writeString("BenWasHere", "signature");
                        }
                        this.bufferedOutput.flush();
                        this.queuedPackets.add(new QuorumPacket(12, -1L, null, null));
                        new Thread() { // from class: org.apache.zookeeper.server.quorum.FollowerHandler.1
                            @Override // java.lang.Thread, java.lang.Runnable
                            public void run() {
                                Thread.currentThread().setName("Sender-" + FollowerHandler.this.sock.getRemoteSocketAddress());
                                try {
                                    FollowerHandler.this.sendPackets();
                                } catch (InterruptedException e2) {
                                    FollowerHandler.LOG.warn("Unexpected interruption", e2);
                                }
                            }
                        }.start();
                        while (true) {
                            QuorumPacket quorumPacket2 = new QuorumPacket();
                            this.ia.readRecord(quorumPacket2, "packet");
                            long j2 = 16;
                            if (quorumPacket2.getType() == 5) {
                                j2 = 128;
                            }
                            if (LOG.isTraceEnabled()) {
                                ZooTrace.logQuorumPacket(LOG, j2, 'i', quorumPacket2);
                            }
                            this.tickOfLastAck = this.leader.self.tick;
                            switch (quorumPacket2.getType()) {
                                case 1:
                                    ByteBuffer wrap = ByteBuffer.wrap(quorumPacket2.getData());
                                    long j3 = wrap.getLong();
                                    int i2 = wrap.getInt();
                                    int i3 = wrap.getInt();
                                    ByteBuffer slice = wrap.slice();
                                    if (i3 == 9) {
                                        this.leader.zk.submitRequest(new FollowerSyncRequest(this, j3, i2, i3, slice, quorumPacket2.getAuthinfo()));
                                    } else {
                                        this.leader.zk.submitRequest(null, j3, i3, i2, slice, quorumPacket2.getAuthinfo());
                                    }
                                case 3:
                                    this.leader.processAck(this.sid, quorumPacket2.getZxid(), this.sock.getLocalSocketAddress());
                                case ZooDefs.OpCode.setData /* 5 */:
                                    DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(quorumPacket2.getData()));
                                    while (dataInputStream.available() > 0) {
                                        this.leader.zk.touch(dataInputStream.readLong(), dataInputStream.readInt());
                                    }
                                case ZooDefs.OpCode.getACL /* 6 */:
                                    DataInputStream dataInputStream2 = new DataInputStream(new ByteArrayInputStream(quorumPacket2.getData()));
                                    long readLong = dataInputStream2.readLong();
                                    int readInt = dataInputStream2.readInt();
                                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                                    DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
                                    dataOutputStream.writeLong(readLong);
                                    boolean z2 = this.leader.zk.touch(readLong, readInt);
                                    if (LOG.isTraceEnabled()) {
                                        ZooTrace.logTraceMessage(LOG, 32L, "Session 0x" + Long.toHexString(readLong) + " is valid: " + z2);
                                    }
                                    dataOutputStream.writeBoolean(z2);
                                    quorumPacket2.setData(byteArrayOutputStream.toByteArray());
                                    this.queuedPackets.add(quorumPacket2);
                            }
                        }
                    }
                } catch (IOException e2) {
                    if (this.sock != null && !this.sock.isClosed()) {
                        LOG.error("Unexpected exception causing shutdown while sock still open", e2);
                    }
                    LOG.warn("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********");
                    try {
                        this.queuedPackets.put(this.proposalOfDeath);
                    } catch (InterruptedException e3) {
                        LOG.warn("Ignoring unexpected exception", e3);
                    }
                    shutdown();
                }
            } catch (InterruptedException e4) {
                LOG.error("Unexpected exception causing shutdown", e4);
                LOG.warn("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********");
                try {
                    this.queuedPackets.put(this.proposalOfDeath);
                } catch (InterruptedException e5) {
                    LOG.warn("Ignoring unexpected exception", e5);
                }
                shutdown();
            }
        } catch (Throwable th) {
            LOG.warn("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********");
            try {
                this.queuedPackets.put(this.proposalOfDeath);
            } catch (InterruptedException e6) {
                LOG.warn("Ignoring unexpected exception", e6);
            }
            shutdown();
            throw th;
        }
    }

    public void shutdown() {
        try {
            if (this.sock != null && !this.sock.isClosed()) {
                this.sock.close();
            }
        } catch (IOException e) {
            LOG.warn("Ignoring unexpected exception during socket close", e);
        }
        this.leader.removeFollowerHandler(this);
    }

    public long tickOfLastAck() {
        return this.tickOfLastAck;
    }

    public void ping() {
        queuePacket(new QuorumPacket(5, this.leader.lastProposed, null, null));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void queuePacket(QuorumPacket quorumPacket) {
        this.queuedPackets.add(quorumPacket);
    }

    public boolean synced() {
        return isAlive() && this.tickOfLastAck >= ((long) (this.leader.self.tick - this.leader.self.syncLimit));
    }
}
