package org.apache.iotdb.session.subscription.consumer;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.class */
public class SubscriptionPushConsumer extends SubscriptionConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPushConsumer.class);
    private final AckStrategy ackStrategy;
    private final ConsumeListener consumeListener;
    private final long autoPollIntervalMs;
    private final long autoPollTimeoutMs;
    private final AtomicBoolean isClosed;

    /* loaded from: input_file:org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer$AutoPollWorker.class */
    class AutoPollWorker implements Runnable {
        AutoPollWorker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (SubscriptionPushConsumer.this.isClosed() || SubscriptionPushConsumer.this.subscribedTopics.isEmpty()) {
                return;
            }
            try {
                List<SubscriptionMessage> multiplePoll = SubscriptionPushConsumer.this.multiplePoll(SubscriptionPushConsumer.this.subscribedTopics.keySet(), SubscriptionPushConsumer.this.autoPollTimeoutMs);
                if (multiplePoll.isEmpty()) {
                    SubscriptionPushConsumer.LOGGER.info("SubscriptionPushConsumer {} poll empty message from topics {} after {} millisecond(s)", new Object[]{this, SubscriptionPushConsumer.this.subscribedTopics.keySet(), Long.valueOf(SubscriptionPushConsumer.this.autoPollTimeoutMs)});
                    return;
                }
                if (SubscriptionPushConsumer.this.ackStrategy.equals(AckStrategy.BEFORE_CONSUME)) {
                    SubscriptionPushConsumer.this.ack(multiplePoll);
                }
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                for (SubscriptionMessage subscriptionMessage : multiplePoll) {
                    try {
                        if (Objects.equals(ConsumeResult.SUCCESS, SubscriptionPushConsumer.this.consumeListener.onReceive(subscriptionMessage))) {
                            arrayList.add(subscriptionMessage);
                        } else {
                            SubscriptionPushConsumer.LOGGER.warn("Consumer listener result failure when consuming message: {}", subscriptionMessage);
                            arrayList2.add(subscriptionMessage);
                        }
                    } catch (Exception e) {
                        SubscriptionPushConsumer.LOGGER.warn("Consumer listener raised an exception while consuming message: {}", subscriptionMessage, e);
                        arrayList2.add(subscriptionMessage);
                    }
                }
                if (SubscriptionPushConsumer.this.ackStrategy.equals(AckStrategy.AFTER_CONSUME)) {
                    SubscriptionPushConsumer.this.ack(arrayList);
                    SubscriptionPushConsumer.this.nack((Iterable<SubscriptionMessage>) arrayList2);
                }
            } catch (Exception e2) {
                SubscriptionPushConsumer.LOGGER.warn("something unexpected happened when auto poll messages...", e2);
            }
        }
    }

    /* loaded from: input_file:org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer$Builder.class */
    public static class Builder extends SubscriptionConsumer.Builder {
        private AckStrategy ackStrategy = AckStrategy.defaultValue();
        private ConsumeListener consumeListener = subscriptionMessage -> {
            return ConsumeResult.SUCCESS;
        };
        private long autoPollIntervalMs = 100;
        private long autoPollTimeoutMs = 10000;

        @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.Builder
        public Builder host(String str) {
            super.host(str);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.Builder
        public Builder port(int i) {
            super.port(i);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.Builder
        public Builder nodeUrls(List<String> list) {
            super.nodeUrls(list);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.Builder
        public Builder username(String str) {
            super.username(str);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.Builder
        public Builder password(String str) {
            super.password(str);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.Builder
        public Builder consumerId(String str) {
            super.consumerId(str);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.Builder
        public Builder consumerGroupId(String str) {
            super.consumerGroupId(str);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.Builder
        public Builder heartbeatIntervalMs(long j) {
            super.heartbeatIntervalMs(j);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.Builder
        public Builder endpointsSyncIntervalMs(long j) {
            super.endpointsSyncIntervalMs(j);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.Builder
        public Builder fileSaveDir(String str) {
            super.fileSaveDir(str);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.Builder
        public Builder fileSaveFsync(boolean z) {
            super.fileSaveFsync(z);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.Builder
        public Builder thriftMaxFrameSize(int i) {
            super.thriftMaxFrameSize(i);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.Builder
        public Builder maxPollParallelism(int i) {
            super.maxPollParallelism(i);
            return this;
        }

        public Builder ackStrategy(AckStrategy ackStrategy) {
            this.ackStrategy = ackStrategy;
            return this;
        }

        public Builder consumeListener(ConsumeListener consumeListener) {
            this.consumeListener = consumeListener;
            return this;
        }

        public Builder autoPollIntervalMs(long j) {
            this.autoPollIntervalMs = Math.max(j, 1L);
            return this;
        }

        public Builder autoPollTimeoutMs(long j) {
            this.autoPollTimeoutMs = Math.max(j, 1000L);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.Builder
        public SubscriptionPullConsumer buildPullConsumer() {
            throw new SubscriptionException("SubscriptionPushConsumer.Builder do not support build pull consumer.");
        }

        @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.Builder
        public SubscriptionPushConsumer buildPushConsumer() {
            return new SubscriptionPushConsumer(this);
        }

        @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.Builder
        public /* bridge */ /* synthetic */ SubscriptionConsumer.Builder nodeUrls(List list) {
            return nodeUrls((List<String>) list);
        }
    }

    protected SubscriptionPushConsumer(Builder builder) {
        super(builder);
        this.isClosed = new AtomicBoolean(true);
        this.ackStrategy = builder.ackStrategy;
        this.consumeListener = builder.consumeListener;
        this.autoPollIntervalMs = builder.autoPollIntervalMs;
        this.autoPollTimeoutMs = builder.autoPollTimeoutMs;
    }

    public SubscriptionPushConsumer(Properties properties) {
        this(properties, (AckStrategy) properties.getOrDefault("ack-strategy", AckStrategy.defaultValue()), (ConsumeListener) properties.getOrDefault("consume-listener", subscriptionMessage -> {
            return ConsumeResult.SUCCESS;
        }), ((Long) properties.getOrDefault("auto-poll-interval-ms", 100L)).longValue(), ((Long) properties.getOrDefault("auto-poll-timeout-ms", 10000L)).longValue());
    }

    private SubscriptionPushConsumer(Properties properties, AckStrategy ackStrategy, ConsumeListener consumeListener, long j, long j2) {
        super(new Builder().ackStrategy(ackStrategy).consumeListener(consumeListener).autoPollIntervalMs(j).autoPollTimeoutMs(j2), properties);
        this.isClosed = new AtomicBoolean(true);
        this.ackStrategy = ackStrategy;
        this.consumeListener = consumeListener;
        this.autoPollIntervalMs = Math.max(j, 1L);
        this.autoPollTimeoutMs = Math.max(j2, 1000L);
    }

    @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer
    public synchronized void open() throws SubscriptionException {
        if (this.isClosed.get()) {
            super.open();
            this.isClosed.set(false);
            submitAutoPollWorker();
        }
    }

    @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.isClosed.get()) {
            return;
        }
        super.close();
        this.isClosed.set(true);
    }

    @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer
    boolean isClosed() {
        return this.isClosed.get();
    }

    private void submitAutoPollWorker() {
        ScheduledFuture[] scheduledFutureArr = {SubscriptionExecutorServiceManager.submitAutoPollWorker(() -> {
            if (!isClosed()) {
                new AutoPollWorker().run();
            } else if (Objects.nonNull(scheduledFutureArr[0])) {
                scheduledFutureArr[0].cancel(false);
                LOGGER.info("SubscriptionPushConsumer {} cancel auto poll worker", this);
            }
        }, this.autoPollIntervalMs)};
        LOGGER.info("SubscriptionPushConsumer {} submit auto poll worker", this);
    }

    public String toString() {
        return "SubscriptionPushConsumer" + coreReportMessage();
    }

    @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer
    protected Map<String, String> coreReportMessage() {
        Map<String, String> coreReportMessage = super.coreReportMessage();
        coreReportMessage.put("ackStrategy", this.ackStrategy.toString());
        return coreReportMessage;
    }

    @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer
    protected Map<String, String> allReportMessage() {
        Map<String, String> allReportMessage = super.allReportMessage();
        allReportMessage.put("ackStrategy", this.ackStrategy.toString());
        allReportMessage.put("autoPollIntervalMs", String.valueOf(this.autoPollIntervalMs));
        allReportMessage.put("autoPollTimeoutMs", String.valueOf(this.autoPollTimeoutMs));
        return allReportMessage;
    }

    @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer
    public /* bridge */ /* synthetic */ void unsubscribe(Set set) throws SubscriptionException {
        super.unsubscribe((Set<String>) set);
    }

    @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer
    public /* bridge */ /* synthetic */ void unsubscribe(String[] strArr) throws SubscriptionException {
        super.unsubscribe(strArr);
    }

    @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer
    public /* bridge */ /* synthetic */ void unsubscribe(String str) throws SubscriptionException {
        super.unsubscribe(str);
    }

    @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer
    public /* bridge */ /* synthetic */ void subscribe(Set set) throws SubscriptionException {
        super.subscribe((Set<String>) set);
    }

    @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer
    public /* bridge */ /* synthetic */ void subscribe(String[] strArr) throws SubscriptionException {
        super.subscribe(strArr);
    }

    @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer
    public /* bridge */ /* synthetic */ void subscribe(String str) throws SubscriptionException {
        super.subscribe(str);
    }

    @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer
    public /* bridge */ /* synthetic */ String getConsumerGroupId() {
        return super.getConsumerGroupId();
    }

    @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer
    public /* bridge */ /* synthetic */ String getConsumerId() {
        return super.getConsumerId();
    }

    @Override // org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer
    public /* bridge */ /* synthetic */ boolean allSnapshotTopicMessagesHaveBeenConsumed() {
        return super.allSnapshotTopicMessagesHaveBeenConsumed();
    }
}
