package org.apache.druid.indexing.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ChildrenDeletable;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.curator.cache.PathChildrenCacheFactory;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Duration;
import org.joda.time.Period;

/* loaded from: input_file:org/apache/druid/indexing/overlord/RemoteTaskRunner.class */
public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer {
    private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class);
    private static final Joiner JOINER = Joiner.on("/");
    private final ObjectMapper jsonMapper;
    private final RemoteTaskRunnerConfig config;
    private final Duration shutdownTimeout;
    private final IndexerZkConfig indexerZkConfig;
    private final CuratorFramework cf;
    private final PathChildrenCacheFactory workerStatusPathChildrenCacheFactory;
    private final PathChildrenCache workerPathCache;
    private final HttpClient httpClient;
    private final Supplier<WorkerBehaviorConfig> workerConfigRef;
    private final ExecutorService runPendingTasksExec;
    private final ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy;
    private ProvisioningService provisioningService;
    private final ConcurrentMap<String, ZkWorker> zkWorkers = new ConcurrentHashMap();
    private final ConcurrentMap<String, Task> pendingTaskPayloads = new ConcurrentHashMap();
    private final RemoteTaskRunnerWorkQueue pendingTasks = new RemoteTaskRunnerWorkQueue();
    private final RemoteTaskRunnerWorkQueue runningTasks = new RemoteTaskRunnerWorkQueue();
    private final RemoteTaskRunnerWorkQueue completeTasks = new RemoteTaskRunnerWorkQueue();
    private final ConcurrentMap<String, ZkWorker> lazyWorkers = new ConcurrentHashMap();
    private final Set<ZkWorker> blackListedWorkers = Collections.synchronizedSet(new HashSet());
    private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
    private final ConcurrentMap<String, String> workersWithUnacknowledgedTask = new ConcurrentHashMap();
    private final ConcurrentMap<String, String> tryAssignTasks = new ConcurrentHashMap();
    private final Object statusLock = new Object();
    private final LifecycleLock lifecycleLock = new LifecycleLock();
    private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap();
    private final ExecutorService workerStatusPathChildrenCacheExecutor = PathChildrenCacheFactory.Builder.createDefaultExecutor();
    private final ListeningScheduledExecutorService cleanupExec = MoreExecutors.listeningDecorator(ScheduledExecutors.fixed(1, "RemoteTaskRunner-Scheduled-Cleanup--%d"));

    /* renamed from: org.apache.druid.indexing.overlord.RemoteTaskRunner$5, reason: invalid class name */
    /* loaded from: input_file:org/apache/druid/indexing/overlord/RemoteTaskRunner$5.class */
    static /* synthetic */ class AnonymousClass5 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type = new int[PathChildrenCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_UPDATED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_REMOVED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.INITIALIZED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CONNECTION_LOST.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    public RemoteTaskRunner(ObjectMapper objectMapper, RemoteTaskRunnerConfig remoteTaskRunnerConfig, IndexerZkConfig indexerZkConfig, CuratorFramework curatorFramework, PathChildrenCacheFactory.Builder builder, HttpClient httpClient, Supplier<WorkerBehaviorConfig> supplier, ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy) {
        this.jsonMapper = objectMapper;
        this.config = remoteTaskRunnerConfig;
        this.shutdownTimeout = remoteTaskRunnerConfig.getTaskShutdownLinkTimeout().toStandardDuration();
        this.indexerZkConfig = indexerZkConfig;
        this.cf = curatorFramework;
        this.workerPathCache = builder.build().make(curatorFramework, indexerZkConfig.getAnnouncementsPath());
        this.workerStatusPathChildrenCacheFactory = builder.withExecutorService(this.workerStatusPathChildrenCacheExecutor).withShutdownExecutorOnClose(false).build();
        this.httpClient = httpClient;
        this.workerConfigRef = supplier;
        this.provisioningStrategy = provisioningStrategy;
        this.runPendingTasksExec = Execs.multiThreaded(remoteTaskRunnerConfig.getPendingTasksRunnerNumThreads(), "rtr-pending-tasks-runner-%d");
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    @LifecycleStart
    public void start() {
        try {
            if (this.lifecycleLock.canStart()) {
                try {
                    MutableInt mutableInt = new MutableInt(1);
                    Object obj = new Object();
                    this.workerPathCache.getListenable().addListener((curatorFramework, pathChildrenCacheEvent) -> {
                        List<String> of;
                        switch (AnonymousClass5.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                            case 1:
                                Worker worker = (Worker) this.jsonMapper.readValue(pathChildrenCacheEvent.getData().getData(), Worker.class);
                                synchronized (obj) {
                                    mutableInt.increment();
                                }
                                Futures.addCallback(addWorker(worker), new FutureCallback<ZkWorker>() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunner.1
                                    public void onSuccess(ZkWorker zkWorker) {
                                        synchronized (obj) {
                                            mutableInt.decrement();
                                            obj.notifyAll();
                                        }
                                    }

                                    public void onFailure(Throwable th) {
                                        synchronized (obj) {
                                            mutableInt.decrement();
                                            obj.notifyAll();
                                        }
                                    }
                                });
                                return;
                            case 2:
                                updateWorker((Worker) this.jsonMapper.readValue(pathChildrenCacheEvent.getData().getData(), Worker.class));
                                return;
                            case 3:
                                removeWorker((Worker) this.jsonMapper.readValue(pathChildrenCacheEvent.getData().getData(), Worker.class));
                                return;
                            case 4:
                                try {
                                    of = (List) this.cf.getChildren().forPath(this.indexerZkConfig.getStatusPath());
                                } catch (KeeperException.NoNodeException e) {
                                    of = ImmutableList.of();
                                }
                                for (String str : of) {
                                    String join = JOINER.join(this.indexerZkConfig.getAnnouncementsPath(), str, new Object[0]);
                                    String join2 = JOINER.join(this.indexerZkConfig.getStatusPath(), str, new Object[0]);
                                    if (!this.zkWorkers.containsKey(str) && this.cf.checkExists().forPath(join) == null) {
                                        try {
                                            scheduleTasksCleanupForWorker(str, (List) this.cf.getChildren().forPath(join2));
                                        } catch (Exception e2) {
                                            log.warn(e2, "Could not schedule cleanup for worker[%s] during startup (maybe someone removed the status znode[%s]?). Skipping.", new Object[]{str, join2});
                                        }
                                    }
                                }
                                synchronized (obj) {
                                    mutableInt.decrement();
                                    obj.notifyAll();
                                }
                                return;
                            case 5:
                            case 6:
                            case 7:
                            default:
                                return;
                        }
                    });
                    this.workerPathCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
                    synchronized (obj) {
                        while (mutableInt.intValue() > 0) {
                            obj.wait();
                        }
                    }
                    ScheduledExecutors.scheduleAtFixedRate(this.cleanupExec, Period.ZERO.toStandardDuration(), this.config.getWorkerBlackListCleanupPeriod().toStandardDuration(), this::checkBlackListedNodes);
                    this.provisioningService = this.provisioningStrategy.makeProvisioningService(this);
                    this.lifecycleLock.started();
                    this.lifecycleLock.exitStart();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        } catch (Throwable th) {
            this.lifecycleLock.exitStart();
            throw th;
        }
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    @LifecycleStop
    public void stop() {
        try {
            if (this.lifecycleLock.canStop()) {
                try {
                    this.provisioningService.close();
                    Closer create = Closer.create();
                    Iterator<ZkWorker> it = this.zkWorkers.values().iterator();
                    while (it.hasNext()) {
                        create.register(it.next());
                    }
                    create.register(this.workerPathCache);
                    try {
                        create.close();
                        this.workerStatusPathChildrenCacheExecutor.shutdown();
                        if (this.runPendingTasksExec != null) {
                            this.runPendingTasksExec.shutdown();
                        }
                        if (this.cleanupExec != null) {
                            this.cleanupExec.shutdown();
                        }
                    } catch (Throwable th) {
                        this.workerStatusPathChildrenCacheExecutor.shutdown();
                        throw th;
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        } finally {
            this.lifecycleLock.exitStop();
        }
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public List<Pair<Task, ListenableFuture<TaskStatus>>> restore() {
        return ImmutableList.of();
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public void registerListener(TaskRunnerListener taskRunnerListener, Executor executor) {
        Iterator<Pair<TaskRunnerListener, Executor>> it = this.listeners.iterator();
        while (it.hasNext()) {
            if (((TaskRunnerListener) it.next().lhs).getListenerId().equals(taskRunnerListener.getListenerId())) {
                throw new ISE("Listener [%s] already registered", new Object[]{taskRunnerListener.getListenerId()});
            }
        }
        Pair<TaskRunnerListener, Executor> of = Pair.of(taskRunnerListener, executor);
        synchronized (this.statusLock) {
            for (Map.Entry<String, RemoteTaskRunnerWorkItem> entry : this.runningTasks.entrySet()) {
                TaskRunnerUtils.notifyLocationChanged(ImmutableList.of(of), entry.getKey(), entry.getValue().getLocation());
            }
            log.info("Registered listener [%s]", new Object[]{taskRunnerListener.getListenerId()});
            this.listeners.add(of);
        }
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public void unregisterListener(String str) {
        Iterator<Pair<TaskRunnerListener, Executor>> it = this.listeners.iterator();
        while (it.hasNext()) {
            Pair<TaskRunnerListener, Executor> next = it.next();
            if (((TaskRunnerListener) next.lhs).getListenerId().equals(str)) {
                this.listeners.remove(next);
                log.info("Unregistered listener [%s]", new Object[]{str});
                return;
            }
        }
    }

    @Override // org.apache.druid.indexing.overlord.WorkerTaskRunner
    public Collection<ImmutableWorkerInfo> getWorkers() {
        return getImmutableWorkerFromZK(this.zkWorkers.values());
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public Collection<RemoteTaskRunnerWorkItem> getRunningTasks() {
        return ImmutableList.copyOf(this.runningTasks.values());
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public Collection<RemoteTaskRunnerWorkItem> getPendingTasks() {
        return ImmutableList.copyOf(this.pendingTasks.values());
    }

    @Override // org.apache.druid.indexing.overlord.WorkerTaskRunner
    public Collection<Task> getPendingTaskPayloads() {
        return ImmutableList.copyOf(this.pendingTaskPayloads.values());
    }

    @Override // org.apache.druid.indexing.overlord.WorkerTaskRunner
    public RemoteTaskRunnerConfig getConfig() {
        return this.config;
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public Collection<RemoteTaskRunnerWorkItem> getKnownTasks() {
        return ImmutableList.copyOf(Iterables.concat(this.pendingTasks.values(), this.runningTasks.values(), this.completeTasks.values()));
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    @Nullable
    public RunnerTaskState getRunnerTaskState(String str) {
        if (this.pendingTasks.containsKey(str)) {
            return RunnerTaskState.PENDING;
        }
        if (this.runningTasks.containsKey(str)) {
            return RunnerTaskState.RUNNING;
        }
        if (this.completeTasks.containsKey(str)) {
            return RunnerTaskState.NONE;
        }
        return null;
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public TaskLocation getTaskLocation(String str) {
        return this.pendingTasks.containsKey(str) ? this.pendingTasks.get(str).getLocation() : this.runningTasks.containsKey(str) ? this.runningTasks.get(str).getLocation() : this.completeTasks.containsKey(str) ? this.completeTasks.get(str).getLocation() : TaskLocation.unknown();
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public Optional<ScalingStats> getScalingStats() {
        return Optional.fromNullable(this.provisioningService.getStats());
    }

    public ZkWorker findWorkerRunningTask(String str) {
        for (ZkWorker zkWorker : this.zkWorkers.values()) {
            if (zkWorker.isRunningTask(str)) {
                return zkWorker;
            }
        }
        return null;
    }

    public boolean isWorkerRunningTask(ZkWorker zkWorker, String str) {
        return ((ZkWorker) Preconditions.checkNotNull(zkWorker, "worker")).isRunningTask(str);
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public ListenableFuture<TaskStatus> run(Task task) {
        RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem = this.pendingTasks.get(task.getId());
        if (remoteTaskRunnerWorkItem != null) {
            log.info("Assigned a task[%s] that is already pending!", new Object[]{task.getId()});
            runPendingTasks();
            return remoteTaskRunnerWorkItem.getResult();
        }
        RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem2 = this.runningTasks.get(task.getId());
        if (remoteTaskRunnerWorkItem2 == null) {
            RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem3 = this.completeTasks.get(task.getId());
            if (remoteTaskRunnerWorkItem3 != null) {
                return remoteTaskRunnerWorkItem3.getResult();
            }
            RemoteTaskRunnerWorkItem addPendingTask = addPendingTask(task);
            runPendingTasks();
            return addPendingTask.getResult();
        }
        ZkWorker findWorkerRunningTask = findWorkerRunningTask(task.getId());
        if (findWorkerRunningTask == null) {
            log.warn("Told to run task[%s], but no worker has started running it yet.", new Object[]{task.getId()});
        } else {
            log.info("Task[%s] already running on %s.", new Object[]{task.getId(), findWorkerRunningTask.getWorker().getHost()});
            TaskAnnouncement taskAnnouncement = findWorkerRunningTask.getRunningTasks().get(task.getId());
            if (taskAnnouncement.getTaskStatus().isComplete()) {
                taskComplete(remoteTaskRunnerWorkItem2, findWorkerRunningTask, taskAnnouncement.getTaskStatus());
            }
        }
        return remoteTaskRunnerWorkItem2.getResult();
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public void shutdown(String str, String str2) {
        log.info("Shutdown [%s] because: [%s]", new Object[]{str, str2});
        if (!this.lifecycleLock.awaitStarted(1L, TimeUnit.SECONDS)) {
            log.info("This TaskRunner is stopped or not yet started. Ignoring shutdown command for task: %s", new Object[]{str});
            return;
        }
        if (this.pendingTasks.remove(str) != null) {
            this.pendingTaskPayloads.remove(str);
            log.info("Removed task from pending queue: %s", new Object[]{str});
            return;
        }
        if (this.completeTasks.containsKey(str)) {
            cleanup(str);
            return;
        }
        ZkWorker findWorkerRunningTask = findWorkerRunningTask(str);
        if (findWorkerRunningTask == null) {
            log.info("Can't shutdown! No worker running task %s", new Object[]{str});
            return;
        }
        URL url = null;
        try {
            url = TaskRunnerUtils.makeWorkerURL(findWorkerRunningTask.getWorker(), "/druid/worker/v1/task/%s/shutdown", str);
            StatusResponseHolder statusResponseHolder = (StatusResponseHolder) this.httpClient.go(new Request(HttpMethod.POST, url), StatusResponseHandler.getInstance(), this.shutdownTimeout).get();
            log.info("Sent shutdown message to worker: %s, status %s, response: %s", new Object[]{findWorkerRunningTask.getWorker().getHost(), statusResponseHolder.getStatus(), statusResponseHolder.getContent()});
            if (!HttpResponseStatus.OK.equals(statusResponseHolder.getStatus())) {
                log.error("Shutdown failed for %s! Are you sure the task was running?", new Object[]{str});
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RE(e, "Interrupted posting shutdown to [%s] for task [%s]", new Object[]{url, str});
        } catch (Exception e2) {
            throw new RE(e2, "Error in handling post to [%s] for task [%s]", new Object[]{findWorkerRunningTask.getWorker().getHost(), str});
        }
    }

    public Optional<ByteSource> streamTaskLog(String str, long j) {
        ZkWorker findWorkerRunningTask = findWorkerRunningTask(str);
        if (findWorkerRunningTask == null) {
            return Optional.absent();
        }
        final URL makeWorkerURL = TaskRunnerUtils.makeWorkerURL(findWorkerRunningTask.getWorker(), "/druid/worker/v1/task/%s/log?offset=%s", str, Long.toString(j));
        return Optional.of(new ByteSource() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunner.2
            public InputStream openStream() throws IOException {
                try {
                    return (InputStream) RemoteTaskRunner.this.httpClient.go(new Request(HttpMethod.GET, makeWorkerURL), new InputStreamResponseHandler()).get();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (ExecutionException e2) {
                    Throwables.propagateIfPossible(e2.getCause(), IOException.class);
                    throw new RuntimeException(e2);
                }
            }
        });
    }

    public Optional<ByteSource> streamTaskReports(String str) {
        if (findWorkerRunningTask(str) == null) {
            return Optional.absent();
        }
        final URL makeTaskLocationURL = TaskRunnerUtils.makeTaskLocationURL(this.runningTasks.get(str).getLocation(), "/druid/worker/v1/chat/%s/liveReports", str);
        return Optional.of(new ByteSource() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunner.3
            public InputStream openStream() throws IOException {
                try {
                    return (InputStream) RemoteTaskRunner.this.httpClient.go(new Request(HttpMethod.GET, makeTaskLocationURL), new InputStreamResponseHandler()).get();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (ExecutionException e2) {
                    Throwables.propagateIfPossible(e2.getCause(), IOException.class);
                    throw new RuntimeException(e2);
                }
            }
        });
    }

    @VisibleForTesting
    RemoteTaskRunnerWorkItem addPendingTask(Task task) {
        log.info("Added pending task %s", new Object[]{task.getId()});
        RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(task.getId(), task.getType(), null, null, task.getDataSource());
        this.pendingTaskPayloads.put(task.getId(), task);
        this.pendingTasks.put(task.getId(), remoteTaskRunnerWorkItem);
        return remoteTaskRunnerWorkItem;
    }

    @VisibleForTesting
    void runPendingTasks() {
        this.runPendingTasksExec.submit(() -> {
            try {
                ArrayList newArrayList = Lists.newArrayList(this.pendingTasks.values());
                sortByInsertionTime(newArrayList);
                Iterator it = newArrayList.iterator();
                while (it.hasNext()) {
                    runPendingTask((RemoteTaskRunnerWorkItem) it.next());
                }
                return null;
            } catch (Exception e) {
                log.makeAlert(e, "Exception in running pending tasks", new Object[0]).emit();
                return null;
            }
        });
    }

    @VisibleForTesting
    void runPendingTask(RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem) {
        String taskId = remoteTaskRunnerWorkItem.getTaskId();
        try {
            if (this.tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
                try {
                    Task task = this.pendingTaskPayloads.get(taskId);
                    if (task != null && tryAssignTask(task, remoteTaskRunnerWorkItem)) {
                        this.pendingTaskPayloads.remove(taskId);
                    }
                    this.tryAssignTasks.remove(taskId);
                } catch (Exception e) {
                    log.makeAlert(e, "Exception while trying to assign task", new Object[0]).addData("taskId", remoteTaskRunnerWorkItem.getTaskId()).emit();
                    RemoteTaskRunnerWorkItem remove = this.pendingTasks.remove(taskId);
                    if (remove != null) {
                        taskComplete(remove, null, TaskStatus.failure(taskId, StringUtils.format("Failed to assign this task. See overlord logs for more details.", new Object[0])));
                    }
                    this.tryAssignTasks.remove(taskId);
                }
            }
        } catch (Throwable th) {
            this.tryAssignTasks.remove(taskId);
            throw th;
        }
    }

    @VisibleForTesting
    static void sortByInsertionTime(List<RemoteTaskRunnerWorkItem> list) {
        Collections.sort(list, Comparator.comparing((v0) -> {
            return v0.getQueueInsertionTime();
        }));
    }

    private void cleanup(String str) {
        Worker worker;
        if (this.lifecycleLock.awaitStarted(1L, TimeUnit.SECONDS)) {
            RemoteTaskRunnerWorkItem remove = this.completeTasks.remove(str);
            if (remove == null || (worker = remove.getWorker()) == null) {
                log.makeAlert("Asked to cleanup nonexistent task", new Object[0]).addData("taskId", str).emit();
                return;
            }
            String host = worker.getHost();
            log.info("Cleaning up task[%s] on worker[%s]", new Object[]{str, host});
            String join = JOINER.join(this.indexerZkConfig.getStatusPath(), host, new Object[]{str});
            try {
                ((ChildrenDeletable) this.cf.delete().guaranteed()).forPath(join);
            } catch (Exception e) {
                throw new RuntimeException(e);
            } catch (KeeperException.NoNodeException e2) {
                log.info("Tried to delete status path[%s] that didn't exist! Must've gone away already?", new Object[]{join});
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    private boolean tryAssignTask(Task task, RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem) throws Exception {
        WorkerSelectStrategy workerSelectStrategy;
        Preconditions.checkNotNull(task, "task");
        Preconditions.checkNotNull(remoteTaskRunnerWorkItem, "taskRunnerWorkItem");
        Preconditions.checkArgument(task.getId().equals(remoteTaskRunnerWorkItem.getTaskId()), "task id != workItem id");
        if (this.runningTasks.containsKey(task.getId()) || findWorkerRunningTask(task.getId()) != null) {
            log.info("Task[%s] already running.", new Object[]{task.getId()});
            return true;
        }
        WorkerBehaviorConfig workerBehaviorConfig = (WorkerBehaviorConfig) this.workerConfigRef.get();
        if (workerBehaviorConfig == null || workerBehaviorConfig.getSelectStrategy() == null) {
            workerSelectStrategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;
            log.debug("No worker selection strategy set. Using default of [%s]", new Object[]{workerSelectStrategy.getClass().getSimpleName()});
        } else {
            workerSelectStrategy = workerBehaviorConfig.getSelectStrategy();
        }
        ZkWorker zkWorker = null;
        try {
            synchronized (this.workersWithUnacknowledgedTask) {
                ImmutableWorkerInfo findWorkerForTask = workerSelectStrategy.findWorkerForTask(this.config, ImmutableMap.copyOf(getWorkersEligibleToRunTasks()), task);
                if (findWorkerForTask != null && this.workersWithUnacknowledgedTask.putIfAbsent(findWorkerForTask.getWorker().getHost(), task.getId()) == null) {
                    zkWorker = this.zkWorkers.get(findWorkerForTask.getWorker().getHost());
                }
            }
            if (zkWorker != null) {
                boolean announceTask = announceTask(task, zkWorker, remoteTaskRunnerWorkItem);
                if (zkWorker != null) {
                    this.workersWithUnacknowledgedTask.remove(zkWorker.getWorker().getHost());
                    runPendingTasks();
                }
                return announceTask;
            }
            log.debug("Unsuccessful task-assign attempt for task [%s] on workers [%s]. Workers to ack tasks are [%s].", new Object[]{task.getId(), this.zkWorkers.values(), this.workersWithUnacknowledgedTask});
            if (zkWorker != null) {
                this.workersWithUnacknowledgedTask.remove(zkWorker.getWorker().getHost());
                runPendingTasks();
            }
            return false;
        } catch (Throwable th) {
            if (zkWorker != null) {
                this.workersWithUnacknowledgedTask.remove(zkWorker.getWorker().getHost());
                runPendingTasks();
            }
            throw th;
        }
    }

    Map<String, ImmutableWorkerInfo> getWorkersEligibleToRunTasks() {
        return Maps.transformEntries(Maps.filterEntries(this.zkWorkers, entry -> {
            return (this.lazyWorkers.containsKey(entry.getKey()) || this.workersWithUnacknowledgedTask.containsKey(entry.getKey()) || this.blackListedWorkers.contains(entry.getValue())) ? false : true;
        }), (str, zkWorker) -> {
            return zkWorker.toImmutable();
        });
    }

    private boolean announceTask(Task task, ZkWorker zkWorker, RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem) throws Exception {
        Preconditions.checkArgument(task.getId().equals(remoteTaskRunnerWorkItem.getTaskId()), "task id != workItem id");
        String host = zkWorker.getWorker().getHost();
        synchronized (this.statusLock) {
            if (!this.zkWorkers.containsKey(host) || this.lazyWorkers.containsKey(host)) {
                log.info("Not assigning task to already removed worker[%s]", new Object[]{host});
                return false;
            }
            log.info("Coordinator asking Worker[%s] to add task[%s]", new Object[]{host, task.getId()});
            CuratorUtils.createIfNotExists(this.cf, JOINER.join(this.indexerZkConfig.getTasksPath(), host, new Object[]{task.getId()}), CreateMode.EPHEMERAL, this.jsonMapper.writeValueAsBytes(task), this.config.getMaxZnodeBytes());
            RemoteTaskRunnerWorkItem remove = this.pendingTasks.remove(task.getId());
            if (remove == null) {
                log.makeAlert("Ignoring null work item from pending task queue", new Object[0]).addData("taskId", task.getId()).emit();
                return false;
            }
            RemoteTaskRunnerWorkItem withWorker = remove.withWorker(zkWorker.getWorker(), null);
            this.runningTasks.put(task.getId(), withWorker);
            log.info("Task %s switched from pending to running (on [%s])", new Object[]{task.getId(), withWorker.getWorker().getHost()});
            TaskRunnerUtils.notifyStatusChanged(this.listeners, task.getId(), TaskStatus.running(task.getId()));
            Stopwatch createStarted = Stopwatch.createStarted();
            while (true) {
                if (isWorkerRunningTask(zkWorker, task.getId())) {
                    break;
                }
                long millis = this.config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
                this.statusLock.wait(millis);
                long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
                if (elapsed >= millis) {
                    log.makeAlert("Task assignment timed out on worker [%s], never ran task [%s]! Timeout: (%s >= %s)!", new Object[]{host, task.getId(), Long.valueOf(elapsed), this.config.getTaskAssignmentTimeout()}).emit();
                    taskComplete(remoteTaskRunnerWorkItem, zkWorker, TaskStatus.failure(task.getId(), StringUtils.format("The worker that this task is assigned did not start it in timeout[%s]. See overlord logs for more details.", new Object[]{this.config.getTaskAssignmentTimeout()})));
                    break;
                }
            }
            return true;
        }
    }

    private boolean cancelWorkerCleanup(String str) {
        ScheduledFuture remove = this.removedWorkerCleanups.remove(str);
        if (remove != null) {
            log.info("Cancelling Worker[%s] scheduled task cleanup", new Object[]{str});
            remove.cancel(false);
        }
        return remove != null;
    }

    private ListenableFuture<ZkWorker> addWorker(Worker worker) {
        log.info("Worker[%s] reportin' for duty!", new Object[]{worker.getHost()});
        try {
            cancelWorkerCleanup(worker.getHost());
            PathChildrenCache make = this.workerStatusPathChildrenCacheFactory.make(this.cf, JOINER.join(this.indexerZkConfig.getStatusPath(), worker.getHost(), new Object[0]));
            SettableFuture<ZkWorker> create = SettableFuture.create();
            ZkWorker zkWorker = new ZkWorker(worker, make, this.jsonMapper);
            zkWorker.addListener(getStatusListener(worker, zkWorker, create));
            zkWorker.start();
            return create;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    PathChildrenCacheListener getStatusListener(Worker worker, ZkWorker zkWorker, SettableFuture<ZkWorker> settableFuture) {
        return (curatorFramework, pathChildrenCacheEvent) -> {
            RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem;
            synchronized (this.statusLock) {
                try {
                } catch (Exception e) {
                    String str = null;
                    if (pathChildrenCacheEvent.getData() != null) {
                        str = pathChildrenCacheEvent.getData().getPath();
                    }
                    log.makeAlert(e, "Failed to handle new worker status", new Object[0]).addData("worker", zkWorker.getWorker().getHost()).addData("znode", str).addData("eventType", pathChildrenCacheEvent.getType().toString()).emit();
                }
                switch (AnonymousClass5.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                    case 1:
                    case 2:
                        if (pathChildrenCacheEvent.getData() == null) {
                            log.error("Unexpected null for event.getData() in handle new worker status for [%s]", new Object[]{pathChildrenCacheEvent.getType().toString()});
                            log.makeAlert("Unexpected null for event.getData() in handle new worker status", new Object[0]).addData("worker", zkWorker.getWorker().getHost()).addData("eventType", pathChildrenCacheEvent.getType().toString()).emit();
                            return;
                        }
                        String nodeFromPath = ZKPaths.getNodeFromPath(pathChildrenCacheEvent.getData().getPath());
                        TaskAnnouncement taskAnnouncement = (TaskAnnouncement) this.jsonMapper.readValue(pathChildrenCacheEvent.getData().getData(), TaskAnnouncement.class);
                        log.info("Worker[%s] wrote %s status for task [%s] on [%s]", new Object[]{zkWorker.getWorker().getHost(), taskAnnouncement.getTaskStatus().getStatusCode(), nodeFromPath, taskAnnouncement.getTaskLocation()});
                        this.statusLock.notifyAll();
                        RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem2 = this.runningTasks.get(nodeFromPath);
                        if (remoteTaskRunnerWorkItem2 != null) {
                            remoteTaskRunnerWorkItem = remoteTaskRunnerWorkItem2;
                        } else {
                            RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem3 = new RemoteTaskRunnerWorkItem(nodeFromPath, taskAnnouncement.getTaskType(), zkWorker.getWorker(), TaskLocation.unknown(), taskAnnouncement.getTaskDataSource());
                            RemoteTaskRunnerWorkItem putIfAbsent = this.runningTasks.putIfAbsent(nodeFromPath, remoteTaskRunnerWorkItem3);
                            if (putIfAbsent == null) {
                                log.warn("Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s", new Object[]{zkWorker.getWorker().getHost(), nodeFromPath});
                                remoteTaskRunnerWorkItem = remoteTaskRunnerWorkItem3;
                            } else {
                                remoteTaskRunnerWorkItem = putIfAbsent;
                            }
                        }
                        if (!taskAnnouncement.getTaskLocation().equals(remoteTaskRunnerWorkItem.getLocation())) {
                            remoteTaskRunnerWorkItem.setLocation(taskAnnouncement.getTaskLocation());
                            TaskRunnerUtils.notifyLocationChanged(this.listeners, nodeFromPath, taskAnnouncement.getTaskLocation());
                        }
                        if (taskAnnouncement.getTaskStatus().isComplete()) {
                            taskComplete(remoteTaskRunnerWorkItem, zkWorker, taskAnnouncement.getTaskStatus());
                            runPendingTasks();
                        }
                        return;
                    case 3:
                        if (pathChildrenCacheEvent.getData() == null) {
                            log.error("Unexpected null for event.getData() in handle new worker status for [%s]", new Object[]{pathChildrenCacheEvent.getType().toString()});
                            log.makeAlert("Unexpected null for event.getData() in handle new worker status", new Object[0]).addData("worker", zkWorker.getWorker().getHost()).addData("eventType", pathChildrenCacheEvent.getType().toString()).emit();
                            return;
                        }
                        String nodeFromPath2 = ZKPaths.getNodeFromPath(pathChildrenCacheEvent.getData().getPath());
                        RemoteTaskRunnerWorkItem remove = this.runningTasks.remove(nodeFromPath2);
                        if (remove != null) {
                            log.warn("Task[%s] just disappeared!", new Object[]{nodeFromPath2});
                            TaskStatus failure = TaskStatus.failure(nodeFromPath2, "The worker that this task was assigned disappeared. See overlord logs for more details.");
                            remove.setResult(failure);
                            TaskRunnerUtils.notifyStatusChanged(this.listeners, nodeFromPath2, failure);
                        } else {
                            log.info("Task[%s] went bye bye.", new Object[]{nodeFromPath2});
                        }
                        return;
                    case 4:
                        if (this.zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
                            settableFuture.set(zkWorker);
                        } else {
                            String format = StringUtils.format("This should not happen...tried to add already-existing worker[%s]", new Object[]{worker.getHost()});
                            log.makeAlert(format, new Object[0]).addData("workerHost", worker.getHost()).addData("workerIp", worker.getIp()).emit();
                            settableFuture.setException(new IllegalStateException(format));
                        }
                        runPendingTasks();
                        return;
                    case 5:
                    case 6:
                    case 7:
                    default:
                        return;
                }
            }
        };
    }

    private void updateWorker(Worker worker) {
        ZkWorker zkWorker = this.zkWorkers.get(worker.getHost());
        if (zkWorker == null) {
            log.warn("Worker[%s] updated its announcement but we didn't have a ZkWorker for it. Ignoring.", new Object[]{worker.getHost()});
        } else {
            log.info("Worker[%s] updated its announcement from[%s] to[%s].", new Object[]{worker.getHost(), zkWorker.getWorker(), worker});
            zkWorker.setWorker(worker);
        }
    }

    private void removeWorker(Worker worker) {
        log.info("Kaboom! Worker[%s] removed!", new Object[]{worker.getHost()});
        ZkWorker zkWorker = this.zkWorkers.get(worker.getHost());
        try {
            if (zkWorker != null) {
                try {
                    scheduleTasksCleanupForWorker(worker.getHost(), getAssignedTasks(worker));
                    try {
                        zkWorker.close();
                    } catch (Exception e) {
                        log.error(e, "Exception closing worker[%s]!", new Object[]{worker.getHost()});
                    }
                    this.zkWorkers.remove(worker.getHost());
                    checkBlackListedNodes();
                } catch (Exception e2) {
                    throw new RuntimeException(e2);
                }
            }
            this.lazyWorkers.remove(worker.getHost());
        } catch (Throwable th) {
            try {
                zkWorker.close();
            } catch (Exception e3) {
                log.error(e3, "Exception closing worker[%s]!", new Object[]{worker.getHost()});
            }
            this.zkWorkers.remove(worker.getHost());
            checkBlackListedNodes();
            throw th;
        }
    }

    private void scheduleTasksCleanupForWorker(final String str, List<String> list) {
        cancelWorkerCleanup(str);
        final ScheduledFuture schedule = this.cleanupExec.schedule(() -> {
            log.info("Running scheduled cleanup for Worker[%s]", new Object[]{str});
            try {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    String str2 = (String) it.next();
                    String join = JOINER.join(this.indexerZkConfig.getTasksPath(), str, new Object[]{str2});
                    String join2 = JOINER.join(this.indexerZkConfig.getStatusPath(), str, new Object[]{str2});
                    if (this.cf.checkExists().forPath(join) != null) {
                        ((ChildrenDeletable) this.cf.delete().guaranteed()).forPath(join);
                    }
                    if (this.cf.checkExists().forPath(join2) != null) {
                        ((ChildrenDeletable) this.cf.delete().guaranteed()).forPath(join2);
                    }
                    log.info("Failing task[%s]", new Object[]{str2});
                    RemoteTaskRunnerWorkItem remove = this.runningTasks.remove(str2);
                    if (remove != null) {
                        TaskStatus failure = TaskStatus.failure(str2, StringUtils.format("Canceled for worker cleanup. See overlord logs for more details.", new Object[0]));
                        remove.setResult(failure);
                        TaskRunnerUtils.notifyStatusChanged(this.listeners, str2, failure);
                    } else {
                        log.warn("RemoteTaskRunner has no knowledge of task[%s]", new Object[]{str2});
                    }
                }
                if (this.cf.checkExists().forPath(JOINER.join(this.indexerZkConfig.getStatusPath(), str, new Object[0])) != null) {
                    ((ChildrenDeletable) this.cf.delete().guaranteed()).forPath(JOINER.join(this.indexerZkConfig.getStatusPath(), str, new Object[0]));
                }
            } catch (Exception e) {
                log.makeAlert("Exception while cleaning up worker[%s]", new Object[]{str}).emit();
                throw new RuntimeException(e);
            }
        }, this.config.getTaskCleanupTimeout().toStandardDuration().getMillis(), TimeUnit.MILLISECONDS);
        this.removedWorkerCleanups.put(str, schedule);
        Futures.addCallback(schedule, new FutureCallback<Object>() { // from class: org.apache.druid.indexing.overlord.RemoteTaskRunner.4
            public void onSuccess(Object obj) {
                RemoteTaskRunner.this.removedWorkerCleanups.remove(str, schedule);
            }

            public void onFailure(Throwable th) {
                RemoteTaskRunner.this.removedWorkerCleanups.remove(str, schedule);
            }
        });
    }

    private void taskComplete(RemoteTaskRunnerWorkItem remoteTaskRunnerWorkItem, @Nullable ZkWorker zkWorker, TaskStatus taskStatus) {
        Preconditions.checkNotNull(remoteTaskRunnerWorkItem, "taskRunnerWorkItem");
        Preconditions.checkNotNull(taskStatus, "taskStatus");
        if (zkWorker != null) {
            log.info("Worker[%s] completed task[%s] with status[%s]", new Object[]{zkWorker.getWorker().getHost(), taskStatus.getId(), taskStatus.getStatusCode()});
            zkWorker.setLastCompletedTaskTime(DateTimes.nowUtc());
        } else {
            log.info("Workerless task[%s] completed with status[%s]", new Object[]{taskStatus.getId(), taskStatus.getStatusCode()});
        }
        RemoteTaskRunnerWorkItem put = this.completeTasks.put(taskStatus.getId(), remoteTaskRunnerWorkItem);
        RemoteTaskRunnerWorkItem remove = this.runningTasks.remove(taskStatus.getId());
        if (put != null && remove != null) {
            log.warn("This is not the first complete event for task[%s], but it was still known as running. Ignoring the previously known running status.", new Object[]{taskStatus.getId()});
        }
        if (put != null) {
            try {
                TaskState statusCode = ((TaskStatus) put.getResult().get(1L, TimeUnit.MILLISECONDS)).getStatusCode();
                if (taskStatus.getStatusCode() != statusCode) {
                    log.warn("The state of the new task complete event is different from its last known state. New state[%s], last known state[%s]", new Object[]{taskStatus.getStatusCode(), statusCode});
                }
                return;
            } catch (InterruptedException e) {
                log.warn(e, "Interrupted while getting the last known task status.", new Object[0]);
                Thread.currentThread().interrupt();
                return;
            } catch (ExecutionException | TimeoutException e2) {
                log.warn(e2, "Failed to get the last known task status. Ignoring this failure.", new Object[0]);
                return;
            }
        }
        if (zkWorker != null) {
            if (taskStatus.isSuccess()) {
                zkWorker.resetContinuouslyFailedTasksCount();
                if (this.blackListedWorkers.remove(zkWorker)) {
                    zkWorker.setBlacklistedUntil(null);
                    log.info("[%s] removed from blacklist because a task finished with SUCCESS", new Object[]{zkWorker.getWorker()});
                }
            } else if (taskStatus.isFailure()) {
                zkWorker.incrementContinuouslyFailedTasksCount();
            }
            synchronized (this.blackListedWorkers) {
                if (zkWorker.getContinuouslyFailedTasksCount() > this.config.getMaxRetriesBeforeBlacklist() && this.blackListedWorkers.size() <= (this.zkWorkers.size() * (this.config.getMaxPercentageBlacklistWorkers() / 100.0d)) - 1.0d) {
                    zkWorker.setBlacklistedUntil(DateTimes.nowUtc().plus(this.config.getWorkerBlackListBackoffTime()));
                    if (this.blackListedWorkers.add(zkWorker)) {
                        log.info("Blacklisting [%s] until [%s] after [%,d] failed tasks in a row.", new Object[]{zkWorker.getWorker(), zkWorker.getBlacklistedUntil(), Integer.valueOf(zkWorker.getContinuouslyFailedTasksCount())});
                    }
                }
            }
        }
        remoteTaskRunnerWorkItem.setResult(taskStatus);
        TaskRunnerUtils.notifyStatusChanged(this.listeners, taskStatus.getId(), taskStatus);
    }

    @Override // org.apache.druid.indexing.overlord.WorkerTaskRunner
    public Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> predicate, int i) {
        ImmutableList<Worker> workerFromZK;
        if (i < 1) {
            return Collections.emptyList();
        }
        synchronized (this.statusLock) {
            for (Map.Entry<String, ZkWorker> entry : this.zkWorkers.entrySet()) {
                ZkWorker value = entry.getValue();
                try {
                    if (getAssignedTasks(value.getWorker()).isEmpty() && predicate.apply(value.toImmutable())) {
                        log.info("Adding Worker[%s] to lazySet!", new Object[]{value.getWorker().getHost()});
                        this.lazyWorkers.put(entry.getKey(), value);
                        if (this.lazyWorkers.size() == i) {
                            break;
                        }
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            workerFromZK = getWorkerFromZK(this.lazyWorkers.values());
        }
        return workerFromZK;
    }

    protected List<String> getAssignedTasks(Worker worker) throws Exception {
        ArrayList newArrayList = Lists.newArrayList((Iterable) this.cf.getChildren().forPath(JOINER.join(this.indexerZkConfig.getTasksPath(), worker.getHost(), new Object[0])));
        for (Map.Entry<String, RemoteTaskRunnerWorkItem> entry : this.runningTasks.entrySet()) {
            if (entry.getValue() == null) {
                log.error("Huh? null work item for [%s]", new Object[]{entry.getKey()});
            } else if (entry.getValue().getWorker() == null) {
                log.error("Huh? no worker for [%s]", new Object[]{entry.getKey()});
            } else if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) {
                log.info("[%s]: Found [%s] running", new Object[]{worker.getHost(), entry.getKey()});
                newArrayList.add(entry.getKey());
            }
        }
        log.info("[%s]: Found %d tasks assigned", new Object[]{worker.getHost(), Integer.valueOf(newArrayList.size())});
        return newArrayList;
    }

    @Override // org.apache.druid.indexing.overlord.WorkerTaskRunner
    public Collection<Worker> getLazyWorkers() {
        return getWorkerFromZK(this.lazyWorkers.values());
    }

    private static ImmutableList<ImmutableWorkerInfo> getImmutableWorkerFromZK(Collection<ZkWorker> collection) {
        return ImmutableList.copyOf(Collections2.transform(collection, (v0) -> {
            return v0.toImmutable();
        }));
    }

    private static ImmutableList<Worker> getWorkerFromZK(Collection<ZkWorker> collection) {
        return ImmutableList.copyOf(Collections2.transform(collection, (v0) -> {
            return v0.getWorker();
        }));
    }

    public Collection<ImmutableWorkerInfo> getBlackListedWorkers() {
        ImmutableList<ImmutableWorkerInfo> immutableWorkerFromZK;
        synchronized (this.blackListedWorkers) {
            immutableWorkerFromZK = getImmutableWorkerFromZK(this.blackListedWorkers);
        }
        return immutableWorkerFromZK;
    }

    private boolean shouldRemoveNodeFromBlackList(ZkWorker zkWorker) {
        if (this.blackListedWorkers.size() > this.zkWorkers.size() * (this.config.getMaxPercentageBlacklistWorkers() / 100.0d)) {
            log.info("Removing [%s] from blacklist because percentage of blacklisted workers exceeds [%d]", new Object[]{zkWorker.getWorker(), Integer.valueOf(this.config.getMaxPercentageBlacklistWorkers())});
            return true;
        }
        long millis = zkWorker.getBlacklistedUntil().getMillis() - getCurrentTimeMillis();
        if (millis <= 0) {
            log.info("Removing [%s] from blacklist because backoff time elapsed", new Object[]{zkWorker.getWorker()});
            return true;
        }
        log.info("[%s] still blacklisted for [%,ds]", new Object[]{zkWorker.getWorker(), Long.valueOf(millis / 1000)});
        return false;
    }

    @VisibleForTesting
    void checkBlackListedNodes() {
        boolean z = false;
        synchronized (this.blackListedWorkers) {
            Iterator<ZkWorker> it = this.blackListedWorkers.iterator();
            while (it.hasNext()) {
                ZkWorker next = it.next();
                if (shouldRemoveNodeFromBlackList(next)) {
                    it.remove();
                    next.resetContinuouslyFailedTasksCount();
                    next.setBlacklistedUntil(null);
                    z = true;
                }
            }
        }
        if (z) {
            runPendingTasks();
        }
    }

    @VisibleForTesting
    protected long getCurrentTimeMillis() {
        return System.currentTimeMillis();
    }

    @VisibleForTesting
    ConcurrentMap<String, ScheduledFuture> getRemovedWorkerCleanups() {
        return this.removedWorkerCleanups;
    }

    @VisibleForTesting
    RemoteTaskRunnerConfig getRemoteTaskRunnerConfig() {
        return this.config;
    }

    @VisibleForTesting
    Map<String, String> getWorkersWithUnacknowledgedTask() {
        return this.workersWithUnacknowledgedTask;
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public long getTotalTaskSlotCount() {
        long j = 0;
        while (getWorkers().iterator().hasNext()) {
            j += r0.next().getWorker().getCapacity();
        }
        return j;
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public long getIdleTaskSlotCount() {
        long j = 0;
        while (getWorkersEligibleToRunTasks().values().iterator().hasNext()) {
            j += r0.next().getAvailableCapacity();
        }
        return j;
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public long getUsedTaskSlotCount() {
        long j = 0;
        while (getWorkers().iterator().hasNext()) {
            j += r0.next().getCurrCapacityUsed();
        }
        return j;
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public long getLazyTaskSlotCount() {
        long j = 0;
        while (getLazyWorkers().iterator().hasNext()) {
            j += r0.next().getCapacity();
        }
        return j;
    }

    @Override // org.apache.druid.indexing.overlord.TaskRunner
    public long getBlacklistedTaskSlotCount() {
        long j = 0;
        while (getBlackListedWorkers().iterator().hasNext()) {
            j += r0.next().getWorker().getCapacity();
        }
        return j;
    }
}
