/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.mirror;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.mirror.MirrorSourceTaskConfig;
import org.apache.kafka.connect.mirror.MirrorUtils;
import org.apache.kafka.connect.mirror.OffsetSync;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class OffsetSyncWriter
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(OffsetSyncWriter.class);
    private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10;
    private final Map<TopicPartition, OffsetSync> delayedOffsetSyncs = new LinkedHashMap<TopicPartition, OffsetSync>();
    private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new LinkedHashMap<TopicPartition, OffsetSync>();
    private final Semaphore outstandingOffsetSyncs;
    private final KafkaProducer<byte[], byte[]> offsetProducer;
    private final String offsetSyncsTopic;
    private final long maxOffsetLag;
    private final Map<TopicPartition, PartitionState> partitionStates = new HashMap<TopicPartition, PartitionState>();

    public OffsetSyncWriter(MirrorSourceTaskConfig config) {
        this.outstandingOffsetSyncs = new Semaphore(10);
        this.offsetSyncsTopic = config.offsetSyncsTopic();
        this.offsetProducer = MirrorUtils.newProducer(config.offsetSyncsTopicProducerConfig());
        this.maxOffsetLag = config.maxOffsetLag();
    }

    public OffsetSyncWriter(KafkaProducer<byte[], byte[]> producer, String offsetSyncsTopic, Semaphore outstandingOffsetSyncs, long maxOffsetLag) {
        this.offsetProducer = producer;
        this.offsetSyncsTopic = offsetSyncsTopic;
        this.outstandingOffsetSyncs = outstandingOffsetSyncs;
        this.maxOffsetLag = maxOffsetLag;
    }

    @Override
    public void close() {
        Utils.closeQuietly(this.offsetProducer, (String)"offset producer");
    }

    public long maxOffsetLag() {
        return this.maxOffsetLag;
    }

    public Map<TopicPartition, PartitionState> partitionStates() {
        return this.partitionStates;
    }

    private void sendOffsetSync(OffsetSync offsetSync) {
        ProducerRecord record = new ProducerRecord(this.offsetSyncsTopic, Integer.valueOf(0), (Object)offsetSync.recordKey(), (Object)offsetSync.recordValue());
        this.offsetProducer.send(record, (x, e) -> {
            if (e != null) {
                LOG.error("Failure sending offset sync.", (Throwable)e);
            } else {
                LOG.trace("Sync'd offsets for {}: {}=={}", new Object[]{offsetSync.topicPartition(), offsetSync.upstreamOffset(), offsetSync.downstreamOffset()});
            }
            this.outstandingOffsetSyncs.release();
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void firePendingOffsetSyncs() {
        while (true) {
            OffsetSync pendingOffsetSync;
            OffsetSyncWriter offsetSyncWriter = this;
            synchronized (offsetSyncWriter) {
                Iterator<OffsetSync> syncIterator = this.pendingOffsetSyncs.values().iterator();
                if (!syncIterator.hasNext()) {
                    LOG.trace("No more pending offset syncs");
                    return;
                }
                pendingOffsetSync = syncIterator.next();
                if (!this.outstandingOffsetSyncs.tryAcquire()) {
                    LOG.trace("Too many in-flight offset syncs; will try to send remaining offset syncs later");
                    return;
                }
                syncIterator.remove();
            }
            this.sendOffsetSync(pendingOffsetSync);
            LOG.trace("Dispatched offset sync for {}", (Object)pendingOffsetSync.topicPartition());
        }
    }

    synchronized void promoteDelayedOffsetSyncs() {
        this.pendingOffsetSyncs.putAll(this.delayedOffsetSyncs);
        this.delayedOffsetSyncs.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
        PartitionState partitionState = this.partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(this.maxOffsetLag));
        OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
        if (partitionState.update(upstreamOffset, downstreamOffset)) {
            OffsetSyncWriter offsetSyncWriter = this;
            synchronized (offsetSyncWriter) {
                this.delayedOffsetSyncs.remove(topicPartition);
                this.pendingOffsetSyncs.put(topicPartition, offsetSync);
            }
            partitionState.reset();
        } else {
            OffsetSyncWriter offsetSyncWriter = this;
            synchronized (offsetSyncWriter) {
                this.delayedOffsetSyncs.put(topicPartition, offsetSync);
            }
        }
    }

    protected Map<TopicPartition, OffsetSync> getDelayedOffsetSyncs() {
        return this.delayedOffsetSyncs;
    }

    protected Map<TopicPartition, OffsetSync> getPendingOffsetSyncs() {
        return this.pendingOffsetSyncs;
    }

    static class PartitionState {
        long previousUpstreamOffset = -1L;
        long previousDownstreamOffset = -1L;
        long lastSyncDownstreamOffset = -1L;
        long maxOffsetLag;
        boolean shouldSyncOffsets;

        PartitionState(long maxOffsetLag) {
            this.maxOffsetLag = maxOffsetLag;
        }

        boolean update(long upstreamOffset, long downstreamOffset) {
            boolean truncatedDownstreamTopic;
            boolean noPreviousSyncThisLifetime = this.lastSyncDownstreamOffset == -1L;
            boolean translatedOffsetTooStale = downstreamOffset - (this.lastSyncDownstreamOffset + 1L) >= this.maxOffsetLag;
            boolean skippedUpstreamRecord = upstreamOffset - this.previousUpstreamOffset != 1L;
            boolean bl = truncatedDownstreamTopic = downstreamOffset < this.previousDownstreamOffset;
            if (noPreviousSyncThisLifetime || translatedOffsetTooStale || skippedUpstreamRecord || truncatedDownstreamTopic) {
                this.lastSyncDownstreamOffset = downstreamOffset;
                this.shouldSyncOffsets = true;
            }
            this.previousUpstreamOffset = upstreamOffset;
            this.previousDownstreamOffset = downstreamOffset;
            return this.shouldSyncOffsets;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof PartitionState)) {
                return false;
            }
            PartitionState that = (PartitionState)o;
            return this.previousUpstreamOffset == that.previousUpstreamOffset && this.previousDownstreamOffset == that.previousDownstreamOffset && this.lastSyncDownstreamOffset == that.lastSyncDownstreamOffset && this.maxOffsetLag == that.maxOffsetLag && this.shouldSyncOffsets == that.shouldSyncOffsets;
        }

        public int hashCode() {
            return Objects.hash(this.previousUpstreamOffset, this.previousDownstreamOffset, this.lastSyncDownstreamOffset, this.maxOffsetLag, this.shouldSyncOffsets);
        }

        void reset() {
            this.shouldSyncOffsets = false;
        }
    }
}

