package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SurrogateAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;

/* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.class */
abstract class PartialSegmentMergeTask<S extends ShardSpec> extends PerfectRollupWorkerTask {
    private static final Logger LOG = new Logger(PartialSegmentMergeTask.class);
    private final PartialSegmentMergeIOConfig ioConfig;
    private final int numAttempts;
    private final String supervisorTaskId;
    private final String subtaskSpecId;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartialSegmentMergeTask(@Nullable String str, String str2, TaskResource taskResource, String str3, @Nullable String str4, DataSchema dataSchema, PartialSegmentMergeIOConfig partialSegmentMergeIOConfig, ParallelIndexTuningConfig parallelIndexTuningConfig, int i, Map<String, Object> map) {
        super(str, str2, taskResource, dataSchema, parallelIndexTuningConfig, map);
        Preconditions.checkArgument(!dataSchema.getGranularitySpec().inputIntervals().isEmpty(), "Missing intervals in granularitySpec");
        this.subtaskSpecId = str4;
        this.ioConfig = partialSegmentMergeIOConfig;
        this.numAttempts = i;
        this.supervisorTaskId = str3;
    }

    @JsonProperty
    public int getNumAttempts() {
        return this.numAttempts;
    }

    @JsonProperty
    public String getSupervisorTaskId() {
        return this.supervisorTaskId;
    }

    @Override // org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask
    @JsonProperty
    public String getSubtaskSpecId() {
        return this.subtaskSpecId;
    }

    @Override // org.apache.druid.indexing.common.task.Task
    public boolean isReady(TaskActionClient taskActionClient) {
        return true;
    }

    @Override // org.apache.druid.indexing.common.task.AbstractBatchIndexTask
    public TaskStatus runTask(TaskToolbox taskToolbox) throws Exception {
        HashMap hashMap = new HashMap();
        for (PartitionLocation partitionLocation : this.ioConfig.getPartitionLocations()) {
            ((List) hashMap.computeIfAbsent(partitionLocation.getInterval(), interval -> {
                return new Int2ObjectOpenHashMap();
            }).computeIfAbsent(partitionLocation.getBucketId(), i -> {
                return new ArrayList();
            })).add(partitionLocation);
        }
        List list = (List) taskToolbox.getTaskActionClient().submit(new SurrogateAction(this.supervisorTaskId, new LockListAction()));
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(list.size());
        list.forEach(taskLock -> {
            if (taskLock.isRevoked()) {
                throw new ISE("Lock[%s] is revoked", new Object[]{taskLock});
            }
            String str = (String) newHashMapWithExpectedSize.put(taskLock.getInterval(), taskLock.getVersion());
            if (str != null) {
                throw new ISE("Unexpected state: Two versions([%s], [%s]) for the same interval[%s]", new Object[]{taskLock.getVersion(), str, taskLock.getInterval()});
            }
        });
        Stopwatch createStarted = Stopwatch.createStarted();
        Map<Interval, Int2ObjectMap<List<File>>> fetchSegmentFiles = fetchSegmentFiles(taskToolbox, hashMap);
        long elapsed = createStarted.elapsed(TimeUnit.SECONDS);
        createStarted.stop();
        LOG.info("Fetch took [%s] seconds", new Object[]{Long.valueOf(elapsed)});
        ParallelIndexSupervisorTaskClient build = taskToolbox.getSupervisorTaskClientFactory().build(new ClientBasedTaskInfoProvider(taskToolbox.getIndexingServiceClient()), getId(), 1, getTuningConfig().getChatHandlerTimeout(), getTuningConfig().getChatHandlerNumRetries());
        File persistDir = taskToolbox.getPersistDir();
        FileUtils.deleteQuietly(persistDir);
        FileUtils.forceMkdir(persistDir);
        build.report(this.supervisorTaskId, new PushedSegmentsReport(getId(), Collections.emptySet(), mergeAndPushSegments(taskToolbox, getDataSchema(), getTuningConfig(), persistDir, newHashMapWithExpectedSize, fetchSegmentFiles)));
        return TaskStatus.success(getId());
    }

    private Map<Interval, Int2ObjectMap<List<File>>> fetchSegmentFiles(TaskToolbox taskToolbox, Map<Interval, Int2ObjectMap<List<PartitionLocation>>> map) throws IOException {
        File indexingTmpDir = taskToolbox.getIndexingTmpDir();
        FileUtils.deleteQuietly(indexingTmpDir);
        FileUtils.forceMkdir(indexingTmpDir);
        HashMap hashMap = new HashMap();
        for (Map.Entry<Interval, Int2ObjectMap<List<PartitionLocation>>> entry : map.entrySet()) {
            Interval key = entry.getKey();
            ObjectIterator it = entry.getValue().int2ObjectEntrySet().iterator();
            while (it.hasNext()) {
                Int2ObjectMap.Entry entry2 = (Int2ObjectMap.Entry) it.next();
                int intKey = entry2.getIntKey();
                File file = FileUtils.getFile(indexingTmpDir, new String[]{key.getStart().toString(), key.getEnd().toString(), Integer.toString(intKey)});
                FileUtils.forceMkdir(file);
                Iterator it2 = ((List) entry2.getValue()).iterator();
                while (it2.hasNext()) {
                    ((List) ((Int2ObjectMap) hashMap.computeIfAbsent(key, interval -> {
                        return new Int2ObjectOpenHashMap();
                    })).computeIfAbsent(intKey, i -> {
                        return new ArrayList();
                    })).add(taskToolbox.getShuffleClient().fetchSegmentFile(file, this.supervisorTaskId, (PartitionLocation) it2.next()));
                }
            }
        }
        return hashMap;
    }

    abstract S createShardSpec(TaskToolbox taskToolbox, Interval interval, int i);

    private Set<DataSegment> mergeAndPushSegments(TaskToolbox taskToolbox, DataSchema dataSchema, ParallelIndexTuningConfig parallelIndexTuningConfig, File file, Map<Interval, String> map, Map<Interval, Int2ObjectMap<List<File>>> map2) throws Exception {
        DataSegmentPusher segmentPusher = taskToolbox.getSegmentPusher();
        HashSet hashSet = new HashSet();
        for (Map.Entry<Interval, Int2ObjectMap<List<File>>> entry : map2.entrySet()) {
            Interval key = entry.getKey();
            ObjectIterator it = entry.getValue().int2ObjectEntrySet().iterator();
            while (it.hasNext()) {
                Int2ObjectMap.Entry entry2 = (Int2ObjectMap.Entry) it.next();
                int intKey = entry2.getIntKey();
                Pair<File, List<String>> mergeSegmentsInSamePartition = mergeSegmentsInSamePartition(dataSchema, parallelIndexTuningConfig, taskToolbox.getIndexIO(), taskToolbox.getIndexMergerV9(), (List) entry2.getValue(), parallelIndexTuningConfig.getMaxNumSegmentsToMerge(), file, 0);
                List list = (List) Arrays.stream(dataSchema.getAggregators()).map((v0) -> {
                    return v0.getName();
                }).collect(Collectors.toList());
                hashSet.add((DataSegment) RetryUtils.retry(() -> {
                    return segmentPusher.push((File) mergeSegmentsInSamePartition.lhs, new DataSegment(getDataSource(), key, (String) Preconditions.checkNotNull(ParallelIndexSupervisorTask.findVersion(map, key), "version for interval[%s]", new Object[]{key}), (Map) null, (List) mergeSegmentsInSamePartition.rhs, list, createShardSpec(taskToolbox, key, intKey), (Integer) null, 0L), false);
                }, th -> {
                    return !(th instanceof NullPointerException) && (th instanceof Exception);
                }, 5));
            }
        }
        return hashSet;
    }

    private static Pair<File, List<String>> mergeSegmentsInSamePartition(DataSchema dataSchema, ParallelIndexTuningConfig parallelIndexTuningConfig, IndexIO indexIO, IndexMergerV9 indexMergerV9, List<File> list, int i, File file, int i2) throws IOException {
        int i3 = i2;
        ArrayList arrayList = new ArrayList();
        List list2 = null;
        int i4 = 0;
        while (true) {
            int i5 = i4;
            if (i5 >= list.size()) {
                break;
            }
            List<File> subList = list.subList(i5, Math.min(i5 + i, list.size()));
            ArrayList arrayList2 = new ArrayList(subList.size());
            Closer create = Closer.create();
            for (File file2 : subList) {
                QueryableIndex loadIndex = indexIO.loadIndex(file2);
                arrayList2.add(loadIndex);
                create.register(() -> {
                    loadIndex.close();
                    file2.delete();
                });
            }
            if (i >= list.size()) {
                list2 = IndexMerger.getMergedDimensionsFromQueryableIndexes(arrayList2, dataSchema.getDimensionsSpec());
            }
            int i6 = i3;
            i3++;
            arrayList.add(indexMergerV9.mergeQueryableIndex(arrayList2, dataSchema.getGranularitySpec().isRollup(), dataSchema.getAggregators(), new File(file, StringUtils.format("merged_%d", new Object[]{Integer.valueOf(i6)})), parallelIndexTuningConfig.getIndexSpec(), parallelIndexTuningConfig.getSegmentWriteOutMediumFactory(), parallelIndexTuningConfig.getMaxColumnsToMerge()));
            create.close();
            i4 = i5 + i;
        }
        return arrayList.size() == 1 ? Pair.of(arrayList.get(0), Preconditions.checkNotNull(list2, "dimensionNames")) : mergeSegmentsInSamePartition(dataSchema, parallelIndexTuningConfig, indexIO, indexMergerV9, arrayList, i, file, i3);
    }
}
