package org.springframework.cloud.stream.binder.kafka;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.DefaultPollableMessageSource;
import org.springframework.cloud.stream.binder.EmbeddedHeaderUtils;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.Lifecycle;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
import org.springframework.integration.support.AcknowledgmentCallback;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.StaticMessageHeaderAccessor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.class */
public class KafkaMessageChannelBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner> implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
    public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
    public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
    public static final String X_ORIGINAL_TOPIC = "x-original-topic";
    private final KafkaBinderConfigurationProperties configurationProperties;
    private final Map<String, TopicInformation> topicsInUse;
    private final KafkaTransactionManager<byte[], byte[]> transactionManager;
    private ProducerListener<byte[], byte[]> producerListener;
    private KafkaExtendedBindingProperties extendedBindingProperties;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder$DlqSender.class */
    public final class DlqSender<K, V> {
        private final KafkaTemplate<K, V> kafkaTemplate;
        private final String dlqName;

        DlqSender(KafkaTemplate<K, V> kafkaTemplate, String str) {
            this.kafkaTemplate = kafkaTemplate;
            this.dlqName = str;
        }

        void sendToDlq(ConsumerRecord<?, ?> consumerRecord, Headers headers) {
            Object key = consumerRecord.key();
            Object value = consumerRecord.value();
            ProducerRecord producerRecord = new ProducerRecord(this.dlqName, Integer.valueOf(consumerRecord.partition()), key, value, headers);
            final StringBuilder append = new StringBuilder().append(" a message with key='").append(KafkaMessageChannelBinder.this.toDisplayString(ObjectUtils.nullSafeToString(key), 50)).append("'").append(" and payload='").append(KafkaMessageChannelBinder.this.toDisplayString(ObjectUtils.nullSafeToString(value), 50)).append("'").append(" received from ").append(consumerRecord.partition());
            ListenableFuture listenableFuture = null;
            try {
                listenableFuture = this.kafkaTemplate.send(producerRecord);
                listenableFuture.addCallback(new ListenableFutureCallback<SendResult<K, V>>() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.DlqSender.1
                    public void onFailure(Throwable th) {
                        KafkaMessageChannelBinder.this.logger.error("Error sending to DLQ " + append.toString(), th);
                    }

                    public void onSuccess(SendResult<K, V> sendResult) {
                        if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) {
                            KafkaMessageChannelBinder.this.logger.debug("Sent to DLQ " + append.toString());
                        }
                    }
                });
            } catch (Exception e) {
                if (listenableFuture == null) {
                    KafkaMessageChannelBinder.this.logger.error("Error sending to DLQ " + append.toString(), e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder$ProducerConfigurationMessageHandler.class */
    public final class ProducerConfigurationMessageHandler extends KafkaProducerMessageHandler<byte[], byte[]> implements Lifecycle {
        private boolean running;
        private final ProducerFactory<byte[], byte[]> producerFactory;

        ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String str, ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties, ProducerFactory<byte[], byte[]> producerFactory) {
            super(kafkaTemplate);
            this.running = true;
            setTopicExpression(new LiteralExpression(str));
            setMessageKeyExpression(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getMessageKeyExpression());
            setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
            if (extendedProducerProperties.isPartitioned()) {
                setPartitionIdExpression(new SpelExpressionParser().parseExpression("headers['scst_partition']"));
            }
            if (((KafkaProducerProperties) extendedProducerProperties.getExtension()).isSync()) {
                setSync(true);
            }
            this.producerFactory = producerFactory;
        }

        public void start() {
            try {
                super.onInit();
            } catch (Exception e) {
                this.logger.error("Initialization errors: ", e);
                throw new RuntimeException(e);
            }
        }

        public void stop() {
            if (this.producerFactory instanceof Lifecycle) {
                this.producerFactory.stop();
            }
            this.running = false;
        }

        public boolean isRunning() {
            return this.running;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder$TopicInformation.class */
    public static class TopicInformation {
        private final String consumerGroup;
        private final Collection<PartitionInfo> partitionInfos;

        TopicInformation(String str, Collection<PartitionInfo> collection) {
            this.consumerGroup = str;
            this.partitionInfos = collection;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public String getConsumerGroup() {
            return this.consumerGroup;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isConsumerTopic() {
            return this.consumerGroup != null;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Collection<PartitionInfo> getPartitionInfos() {
            return this.partitionInfos;
        }
    }

    public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, KafkaTopicProvisioner kafkaTopicProvisioner) {
        super(headersToMap(kafkaBinderConfigurationProperties), kafkaTopicProvisioner);
        this.topicsInUse = new ConcurrentHashMap();
        this.extendedBindingProperties = new KafkaExtendedBindingProperties();
        this.configurationProperties = kafkaBinderConfigurationProperties;
        if (StringUtils.hasText(kafkaBinderConfigurationProperties.getTransaction().getTransactionIdPrefix())) {
            this.transactionManager = new KafkaTransactionManager<>(getProducerFactory(kafkaBinderConfigurationProperties.getTransaction().getTransactionIdPrefix(), new ExtendedProducerProperties<>(kafkaBinderConfigurationProperties.getTransaction().getProducer())));
        } else {
            this.transactionManager = null;
        }
    }

    private static String[] headersToMap(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
        String[] strArr;
        if (ObjectUtils.isEmpty(kafkaBinderConfigurationProperties.getHeaders())) {
            strArr = BinderHeaders.STANDARD_HEADERS;
        } else {
            String[] strArr2 = (String[]) Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0, BinderHeaders.STANDARD_HEADERS.length + kafkaBinderConfigurationProperties.getHeaders().length);
            System.arraycopy(kafkaBinderConfigurationProperties.getHeaders(), 0, strArr2, BinderHeaders.STANDARD_HEADERS.length, kafkaBinderConfigurationProperties.getHeaders().length);
            strArr = strArr2;
        }
        return strArr;
    }

    public void setExtendedBindingProperties(KafkaExtendedBindingProperties kafkaExtendedBindingProperties) {
        this.extendedBindingProperties = kafkaExtendedBindingProperties;
    }

    public void setProducerListener(ProducerListener<byte[], byte[]> producerListener) {
        this.producerListener = producerListener;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, TopicInformation> getTopicsInUse() {
        return this.topicsInUse;
    }

    /* renamed from: getExtendedConsumerProperties, reason: merged with bridge method [inline-methods] */
    public KafkaConsumerProperties m3getExtendedConsumerProperties(String str) {
        return this.extendedBindingProperties.getExtendedConsumerProperties(str);
    }

    /* renamed from: getExtendedProducerProperties, reason: merged with bridge method [inline-methods] */
    public KafkaProducerProperties m2getExtendedProducerProperties(String str) {
        return this.extendedBindingProperties.getExtendedProducerProperties(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageHandler createProducerMessageHandler(ProducerDestination producerDestination, ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties, MessageChannel messageChannel) throws Exception {
        ProducerFactory producerFactory = this.transactionManager != null ? this.transactionManager.getProducerFactory() : getProducerFactory(null, extendedProducerProperties);
        Collection partitionsForTopic = this.provisioningProvider.getPartitionsForTopic(extendedProducerProperties.getPartitionCount(), false, () -> {
            Producer createProducer = producerFactory.createProducer();
            List partitionsFor = createProducer.partitionsFor(producerDestination.getName());
            createProducer.close();
            ((DisposableBean) producerFactory).destroy();
            return partitionsFor;
        });
        this.topicsInUse.put(producerDestination.getName(), new TopicInformation(null, partitionsForTopic));
        if (extendedProducerProperties.getPartitionCount() < partitionsForTopic.size()) {
            if (this.logger.isInfoEnabled()) {
                this.logger.info("The `partitionCount` of the producer for topic " + producerDestination.getName() + " is " + extendedProducerProperties.getPartitionCount() + ", smaller than the actual partition count of " + partitionsForTopic.size() + " of the topic. The larger number will be used instead.");
            }
            extendedProducerProperties.setPartitionCount(partitionsForTopic.size());
        }
        KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory);
        if (this.producerListener != null) {
            kafkaTemplate.setProducerListener(this.producerListener);
        }
        ProducerConfigurationMessageHandler producerConfigurationMessageHandler = new ProducerConfigurationMessageHandler(kafkaTemplate, producerDestination.getName(), extendedProducerProperties, producerFactory);
        if (messageChannel != null) {
            producerConfigurationMessageHandler.setSendFailureChannel(messageChannel);
        }
        KafkaHeaderMapper kafkaHeaderMapper = null;
        if (this.configurationProperties.getHeaderMapperBeanName() != null) {
            kafkaHeaderMapper = (KafkaHeaderMapper) getApplicationContext().getBean(this.configurationProperties.getHeaderMapperBeanName(), KafkaHeaderMapper.class);
        }
        if (extendedProducerProperties.getHeaderMode() != null && !HeaderMode.headers.equals(extendedProducerProperties.getHeaderMode())) {
            kafkaHeaderMapper = null;
        } else if (kafkaHeaderMapper == null) {
            String[] headerPatterns = ((KafkaProducerProperties) extendedProducerProperties.getExtension()).getHeaderPatterns();
            if (headerPatterns == null || headerPatterns.length <= 0) {
                kafkaHeaderMapper = new BinderHeaderMapper();
            } else {
                LinkedList linkedList = new LinkedList(Arrays.asList(headerPatterns));
                if (!linkedList.contains("!timestamp")) {
                    linkedList.add(0, "!timestamp");
                }
                if (!linkedList.contains("!id")) {
                    linkedList.add(0, "!id");
                }
                kafkaHeaderMapper = new BinderHeaderMapper((String[]) linkedList.toArray(new String[linkedList.size()]));
            }
        }
        producerConfigurationMessageHandler.setHeaderMapper(kafkaHeaderMapper);
        return producerConfigurationMessageHandler;
    }

    protected DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(String str, ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties) {
        HashMap hashMap = new HashMap();
        hashMap.put("retries", 0);
        hashMap.put("buffer.memory", 33554432);
        hashMap.put("key.serializer", ByteArraySerializer.class);
        hashMap.put("value.serializer", ByteArraySerializer.class);
        hashMap.put("acks", String.valueOf(this.configurationProperties.getRequiredAcks()));
        if (!ObjectUtils.isEmpty(this.configurationProperties.getProducerConfiguration())) {
            hashMap.putAll(this.configurationProperties.getProducerConfiguration());
        }
        if (ObjectUtils.isEmpty(hashMap.get("bootstrap.servers"))) {
            hashMap.put("bootstrap.servers", this.configurationProperties.getKafkaConnectionString());
        }
        if (ObjectUtils.isEmpty(hashMap.get("batch.size"))) {
            hashMap.put("batch.size", String.valueOf(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getBufferSize()));
        }
        if (ObjectUtils.isEmpty(hashMap.get("linger.ms"))) {
            hashMap.put("linger.ms", String.valueOf(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getBatchTimeout()));
        }
        if (ObjectUtils.isEmpty(hashMap.get("compression.type"))) {
            hashMap.put("compression.type", ((KafkaProducerProperties) extendedProducerProperties.getExtension()).getCompressionType().toString());
        }
        if (!ObjectUtils.isEmpty(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getConfiguration())) {
            hashMap.putAll(((KafkaProducerProperties) extendedProducerProperties.getExtension()).getConfiguration());
        }
        DefaultKafkaProducerFactory<byte[], byte[]> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(hashMap);
        if (str != null) {
            defaultKafkaProducerFactory.setTransactionIdPrefix(str);
        }
        return defaultKafkaProducerFactory;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageProducer createConsumerEndpoint(ConsumerDestination consumerDestination, String str, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        Collection<PartitionInfo> collection;
        boolean z = !StringUtils.hasText(str);
        Assert.isTrue((z && ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq()) ? false : true, "DLQ support is not available for anonymous subscriptions");
        String str2 = z ? "anonymous." + UUID.randomUUID().toString() : str;
        ConsumerFactory<?, ?> createKafkaConsumerFactory = createKafkaConsumerFactory(z, str2, extendedConsumerProperties);
        Collection<PartitionInfo> partitionInfo = getPartitionInfo(consumerDestination, extendedConsumerProperties, createKafkaConsumerFactory, extendedConsumerProperties.getInstanceCount() * extendedConsumerProperties.getConcurrency());
        boolean isAutoRebalanceEnabled = ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAutoRebalanceEnabled();
        if (isAutoRebalanceEnabled || extendedConsumerProperties.getInstanceCount() == 1) {
            collection = partitionInfo;
        } else {
            collection = new ArrayList();
            for (PartitionInfo partitionInfo2 : partitionInfo) {
                if (partitionInfo2.partition() % extendedConsumerProperties.getInstanceCount() == extendedConsumerProperties.getInstanceIndex()) {
                    collection.add(partitionInfo2);
                }
            }
        }
        this.topicsInUse.put(consumerDestination.getName(), new TopicInformation(str, collection));
        Assert.isTrue(!CollectionUtils.isEmpty(collection), "A list of partitions must be provided");
        ContainerProperties containerProperties = (z || ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAutoRebalanceEnabled()) ? new ContainerProperties(new String[]{consumerDestination.getName()}) : new ContainerProperties(getTopicPartitionInitialOffsets(collection));
        if (this.transactionManager != null) {
            containerProperties.setTransactionManager(this.transactionManager);
        }
        containerProperties.setIdleEventInterval(Long.valueOf(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getIdleEventInterval()));
        int min = Math.min(extendedConsumerProperties.getConcurrency(), collection.size());
        resetOffsets(extendedConsumerProperties, createKafkaConsumerFactory, isAutoRebalanceEnabled, containerProperties);
        ConcurrentMessageListenerContainer concurrentMessageListenerContainer = new ConcurrentMessageListenerContainer(createKafkaConsumerFactory, containerProperties) { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.1
            public void stop(Runnable runnable) {
                super.stop(runnable);
            }
        };
        concurrentMessageListenerContainer.setConcurrency(min);
        if (getApplicationEventPublisher() != null) {
            concurrentMessageListenerContainer.setApplicationEventPublisher(getApplicationEventPublisher());
        } else if (getApplicationContext() != null) {
            concurrentMessageListenerContainer.setApplicationEventPublisher(getApplicationContext());
        }
        concurrentMessageListenerContainer.setBeanName(consumerDestination.getName() + ".container");
        if (((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAutoCommitOffset()) {
            concurrentMessageListenerContainer.getContainerProperties().setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
            if (((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAckEachRecord()) {
                concurrentMessageListenerContainer.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
            }
        } else {
            concurrentMessageListenerContainer.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
            concurrentMessageListenerContainer.getContainerProperties().setAckOnError(false);
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Listened partitions: " + StringUtils.collectionToCommaDelimitedString(collection));
        }
        KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter(concurrentMessageListenerContainer);
        kafkaMessageDrivenChannelAdapter.setMessageConverter(getMessageConverter(extendedConsumerProperties));
        kafkaMessageDrivenChannelAdapter.setBeanFactory(getBeanFactory());
        AbstractMessageChannelBinder.ErrorInfrastructure registerErrorInfrastructure = registerErrorInfrastructure(consumerDestination, str2, extendedConsumerProperties);
        if (extendedConsumerProperties.getMaxAttempts() > 1) {
            kafkaMessageDrivenChannelAdapter.setRetryTemplate(buildRetryTemplate(extendedConsumerProperties));
            kafkaMessageDrivenChannelAdapter.setRecoveryCallback(registerErrorInfrastructure.getRecoverer());
        } else {
            kafkaMessageDrivenChannelAdapter.setErrorChannel(registerErrorInfrastructure.getErrorChannel());
        }
        return kafkaMessageDrivenChannelAdapter;
    }

    private void resetOffsets(ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties, ConsumerFactory<?, ?> consumerFactory, boolean z, ContainerProperties containerProperties) {
        boolean isResetOffsets = ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isResetOffsets();
        final Object obj = consumerFactory.getConfigurationProperties().get("auto.offset.reset");
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        if (!"earliest".equals(obj) && "!latest".equals(obj)) {
            this.logger.warn("no (or unknown) auto.offset.reset property cannot reset");
            isResetOffsets = false;
        }
        if (z && isResetOffsets) {
            containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.2
                public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> collection) {
                }

                public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> collection) {
                }

                public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> collection) {
                    if (atomicBoolean.getAndSet(false)) {
                        if ("earliest".equals(obj)) {
                            consumer.seekToBeginning(collection);
                        } else if ("latest".equals(obj)) {
                            consumer.seekToEnd(collection);
                        }
                    }
                }
            });
        } else if (isResetOffsets) {
            ((List) Arrays.stream(containerProperties.getTopicPartitions()).map(topicPartitionInitialOffset -> {
                return new TopicPartitionInitialOffset(topicPartitionInitialOffset.topic(), topicPartitionInitialOffset.partition(), Long.valueOf("earliest".equals(obj) ? 0L : Long.MAX_VALUE));
            }).collect(Collectors.toList())).toArray(containerProperties.getTopicPartitions());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractMessageChannelBinder.PolledConsumerResources createPolledConsumerResources(String str, String str2, ConsumerDestination consumerDestination, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        boolean z = !StringUtils.hasText(str2);
        Assert.isTrue((z && ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq()) ? false : true, "DLQ support is not available for anonymous subscriptions");
        ConsumerFactory<?, ?> createKafkaConsumerFactory = createKafkaConsumerFactory(z, z ? "anonymous." + UUID.randomUUID().toString() : str2, extendedConsumerProperties);
        KafkaMessageSource kafkaMessageSource = new KafkaMessageSource(createKafkaConsumerFactory, new String[]{consumerDestination.getName()});
        kafkaMessageSource.setMessageConverter(getMessageConverter(extendedConsumerProperties));
        kafkaMessageSource.setRawMessageHeader(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq());
        this.topicsInUse.put(consumerDestination.getName(), new TopicInformation(str2, getPartitionInfo(consumerDestination, extendedConsumerProperties, createKafkaConsumerFactory, -1)));
        kafkaMessageSource.setRebalanceListener(new ConsumerRebalanceListener() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.3
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                KafkaMessageChannelBinder.this.logger.info("Revoked: " + collection);
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                KafkaMessageChannelBinder.this.logger.info("Assigned: " + collection);
            }
        });
        return new AbstractMessageChannelBinder.PolledConsumerResources(kafkaMessageSource, registerErrorInfrastructure(consumerDestination, str2, extendedConsumerProperties, true));
    }

    protected void postProcessPollableSource(DefaultPollableMessageSource defaultPollableMessageSource) {
        defaultPollableMessageSource.setAttributesProvider((attributeAccessor, message) -> {
            Object obj = message.getHeaders().get("kafka_data");
            if (obj != null) {
                attributeAccessor.setAttribute("kafka_data", obj);
            }
        });
    }

    private MessagingMessageConverter getMessageConverter(ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        MessagingMessageConverter messagingMessageConverter;
        if (((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getConverterBeanName() == null) {
            messagingMessageConverter = new MessagingMessageConverter();
            KafkaConsumerProperties.StandardHeaders standardHeaders = ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getStandardHeaders();
            messagingMessageConverter.setGenerateMessageId(KafkaConsumerProperties.StandardHeaders.id.equals(standardHeaders) || KafkaConsumerProperties.StandardHeaders.both.equals(standardHeaders));
            messagingMessageConverter.setGenerateTimestamp(KafkaConsumerProperties.StandardHeaders.timestamp.equals(standardHeaders) || KafkaConsumerProperties.StandardHeaders.both.equals(standardHeaders));
        } else {
            try {
                messagingMessageConverter = (MessagingMessageConverter) getApplicationContext().getBean(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getConverterBeanName(), MessagingMessageConverter.class);
            } catch (NoSuchBeanDefinitionException e) {
                throw new IllegalStateException("Converter bean not present in application context", e);
            }
        }
        messagingMessageConverter.setHeaderMapper(getHeaderMapper(extendedConsumerProperties));
        return messagingMessageConverter;
    }

    private KafkaHeaderMapper getHeaderMapper(ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        KafkaHeaderMapper kafkaHeaderMapper = null;
        if (this.configurationProperties.getHeaderMapperBeanName() != null) {
            kafkaHeaderMapper = (KafkaHeaderMapper) getApplicationContext().getBean(this.configurationProperties.getHeaderMapperBeanName(), KafkaHeaderMapper.class);
        }
        if (kafkaHeaderMapper == null) {
            KafkaHeaderMapper kafkaHeaderMapper2 = new BinderHeaderMapper() { // from class: org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.4
                @Override // org.springframework.cloud.stream.binder.kafka.BinderHeaderMapper
                public void toHeaders(Headers headers, Map<String, Object> map) {
                    super.toHeaders(headers, map);
                    if (map.size() > 0) {
                        map.put("scst_nativeHeadersPresent", Boolean.TRUE);
                    }
                }
            };
            String[] trustedPackages = ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getTrustedPackages();
            if (!StringUtils.isEmpty(trustedPackages)) {
                kafkaHeaderMapper2.addTrustedPackages(trustedPackages);
            }
            kafkaHeaderMapper = kafkaHeaderMapper2;
        }
        return kafkaHeaderMapper;
    }

    private Collection<PartitionInfo> getPartitionInfo(ConsumerDestination consumerDestination, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties, ConsumerFactory<?, ?> consumerFactory, int i) {
        return this.provisioningProvider.getPartitionsForTopic(i, ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAutoRebalanceEnabled(), () -> {
            Consumer createConsumer = consumerFactory.createConsumer();
            List partitionsFor = createConsumer.partitionsFor(consumerDestination.getName());
            createConsumer.close();
            return partitionsFor;
        });
    }

    protected ErrorMessageStrategy getErrorMessageStrategy() {
        return new RawRecordHeaderErrorMessageStrategy();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageHandler getErrorMessageHandler(ConsumerDestination consumerDestination, String str, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        KafkaConsumerProperties kafkaConsumerProperties = (KafkaConsumerProperties) extendedConsumerProperties.getExtension();
        if (!kafkaConsumerProperties.isEnableDlq()) {
            return null;
        }
        KafkaProducerProperties dlqProducerProperties = kafkaConsumerProperties.getDlqProducerProperties();
        DlqSender dlqSender = new DlqSender(new KafkaTemplate(this.transactionManager != null ? this.transactionManager.getProducerFactory() : getProducerFactory(null, new ExtendedProducerProperties<>(dlqProducerProperties))), StringUtils.hasText(kafkaConsumerProperties.getDlqName()) ? kafkaConsumerProperties.getDlqName() : "error." + consumerDestination.getName() + "." + str);
        return message -> {
            ConsumerRecord consumerRecord = (ConsumerRecord) message.getHeaders().get("kafka_data", ConsumerRecord.class);
            if (extendedConsumerProperties.isUseNativeDecoding() && consumerRecord != null) {
                Map configuration = this.transactionManager == null ? dlqProducerProperties.getConfiguration() : this.configurationProperties.getTransaction().getProducer().getConfiguration();
                if (consumerRecord.key() != null && !consumerRecord.key().getClass().isInstance(byte[].class)) {
                    ensureDlqMessageCanBeProperlySerialized(configuration, map -> {
                        return !map.containsKey("key.serializer");
                    }, "Key");
                }
                if (consumerRecord.value() != null && !consumerRecord.value().getClass().isInstance(byte[].class)) {
                    ensureDlqMessageCanBeProperlySerialized(configuration, map2 -> {
                        return !map2.containsKey("value.serializer");
                    }, "Payload");
                }
            }
            if (consumerRecord == null) {
                this.logger.error("No raw record; cannot send to DLQ: " + message);
                return;
            }
            RecordHeaders recordHeaders = new RecordHeaders(consumerRecord.headers().toArray());
            AtomicReference atomicReference = new AtomicReference(consumerRecord);
            if (message.getPayload() instanceof Throwable) {
                Throwable th = (Throwable) message.getPayload();
                HeaderMode headerMode = extendedConsumerProperties.getHeaderMode();
                if (headerMode == null || HeaderMode.headers.equals(headerMode)) {
                    recordHeaders.add(new RecordHeader(X_ORIGINAL_TOPIC, consumerRecord.topic().getBytes(StandardCharsets.UTF_8)));
                    recordHeaders.add(new RecordHeader(X_EXCEPTION_MESSAGE, th.getMessage().getBytes(StandardCharsets.UTF_8)));
                    recordHeaders.add(new RecordHeader(X_EXCEPTION_STACKTRACE, getStackTraceAsString(th).getBytes(StandardCharsets.UTF_8)));
                } else if (HeaderMode.embeddedHeaders.equals(headerMode)) {
                    try {
                        MessageValues extractHeaders = EmbeddedHeaderUtils.extractHeaders(MessageBuilder.withPayload((byte[]) consumerRecord.value()).build(), false);
                        extractHeaders.put(X_ORIGINAL_TOPIC, consumerRecord.topic());
                        extractHeaders.put(X_EXCEPTION_MESSAGE, th.getMessage());
                        extractHeaders.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(th));
                        atomicReference.set(new ConsumerRecord(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.key(), EmbeddedHeaderUtils.embedHeaders(extractHeaders, EmbeddedHeaderUtils.headersToEmbed((String[]) new ArrayList(extractHeaders.keySet()).toArray(new String[extractHeaders.keySet().size()])))));
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }
            dlqSender.sendToDlq((ConsumerRecord) atomicReference.get(), recordHeaders);
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageHandler getPolledConsumerErrorMessageHandler(ConsumerDestination consumerDestination, String str, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        if (((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq()) {
            return getErrorMessageHandler(consumerDestination, str, extendedConsumerProperties);
        }
        MessageHandler errorMessageHandler = super.getErrorMessageHandler(consumerDestination, str, extendedConsumerProperties);
        return message -> {
            AcknowledgmentCallback acknowledgmentCallback;
            ConsumerRecord consumerRecord = (ConsumerRecord) message.getHeaders().get("kafka_data");
            if (!(message instanceof ErrorMessage)) {
                this.logger.error("Expected an ErrorMessage, not a " + message.getClass().toString() + " for: " + message);
                return;
            }
            if (consumerRecord == null) {
                if (errorMessageHandler != null) {
                    errorMessageHandler.handleMessage(message);
                }
            } else {
                if (!(message.getPayload() instanceof MessagingException) || (acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(((MessagingException) message.getPayload()).getFailedMessage())) == null) {
                    return;
                }
                if (isAutoCommitOnError(extendedConsumerProperties)) {
                    acknowledgmentCallback.acknowledge(AcknowledgmentCallback.Status.REJECT);
                } else {
                    acknowledgmentCallback.acknowledge(AcknowledgmentCallback.Status.REQUEUE);
                }
            }
        };
    }

    private static void ensureDlqMessageCanBeProperlySerialized(Map<String, String> map, Predicate<Map<String, String>> predicate, String str) {
        if (CollectionUtils.isEmpty(map) || predicate.test(map)) {
            throw new IllegalArgumentException("Native decoding is used on the consumer. " + str + " is not byte[] and no serializer is set on the DLQ producer.");
        }
    }

    protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean z, String str, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        HashMap hashMap = new HashMap();
        hashMap.put("key.deserializer", ByteArrayDeserializer.class);
        hashMap.put("value.deserializer", ByteArrayDeserializer.class);
        hashMap.put("enable.auto.commit", false);
        hashMap.put("auto.commit.interval.ms", 100);
        hashMap.put("auto.offset.reset", z ? "latest" : "earliest");
        hashMap.put("group.id", str);
        if (!ObjectUtils.isEmpty(this.configurationProperties.getConsumerConfiguration())) {
            hashMap.putAll(this.configurationProperties.getConsumerConfiguration());
        }
        if (ObjectUtils.isEmpty(hashMap.get("bootstrap.servers"))) {
            hashMap.put("bootstrap.servers", this.configurationProperties.getKafkaConnectionString());
        }
        if (!ObjectUtils.isEmpty(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getConfiguration())) {
            hashMap.putAll(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getConfiguration());
        }
        if (!ObjectUtils.isEmpty(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getStartOffset())) {
            hashMap.put("auto.offset.reset", ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getStartOffset().name());
        }
        return new DefaultKafkaConsumerFactory(hashMap);
    }

    private boolean isAutoCommitOnError(ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        return ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getAutoCommitOnError() != null ? ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getAutoCommitOnError().booleanValue() : ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isAutoCommitOffset() && ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).isEnableDlq();
    }

    private TopicPartitionInitialOffset[] getTopicPartitionInitialOffsets(Collection<PartitionInfo> collection) {
        TopicPartitionInitialOffset[] topicPartitionInitialOffsetArr = new TopicPartitionInitialOffset[collection.size()];
        int i = 0;
        for (PartitionInfo partitionInfo : collection) {
            int i2 = i;
            i++;
            topicPartitionInitialOffsetArr[i2] = new TopicPartitionInitialOffset(partitionInfo.topic(), partitionInfo.partition());
        }
        return topicPartitionInitialOffsetArr;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String toDisplayString(String str, int i) {
        return str.length() <= i ? str : str.substring(0, i) + "...";
    }

    private String getStackTraceAsString(Throwable th) {
        StringWriter stringWriter = new StringWriter();
        th.printStackTrace(new PrintWriter((Writer) stringWriter, true));
        return stringWriter.getBuffer().toString();
    }
}
