package com.kfyty.loveqq.framework.boot.data.redis.redisson.autoconfig.mq;

import com.kfyty.loveqq.framework.core.autoconfig.DestroyBean;
import com.kfyty.loveqq.framework.core.support.Triple;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;

/* loaded from: input_file:com/kfyty/loveqq/framework/boot/data/redis/redisson/autoconfig/mq/DefaultRedisMessageQueue.class */
public class DefaultRedisMessageQueue implements RedisMessageQueue, DestroyBean {
    private final RedissonClient redissonClient;
    private final ExecutorService executorService;
    private final Map<String, Triple<RBlockingQueue<Object>, RDelayedQueue<Object>, Queue<MessageListener>>> queueMap = new ConcurrentHashMap();

    public DefaultRedisMessageQueue(RedissonClient redissonClient, ExecutorService executorService) {
        this.redissonClient = redissonClient;
        this.executorService = executorService;
    }

    @Override // com.kfyty.loveqq.framework.boot.data.redis.redisson.autoconfig.mq.RedisMessageQueue
    public void send(String str, Object obj) {
        ((RDelayedQueue) obtainQueue(str).getValue()).offer(obj, 0L, TimeUnit.MILLISECONDS);
    }

    @Override // com.kfyty.loveqq.framework.boot.data.redis.redisson.autoconfig.mq.RedisMessageQueue
    public void send(String str, Object obj, long j, TimeUnit timeUnit) {
        ((RDelayedQueue) obtainQueue(str).getValue()).offer(obj, j, timeUnit);
    }

    @Override // com.kfyty.loveqq.framework.boot.data.redis.redisson.autoconfig.mq.RedisMessageQueue
    public void registryMessageListener(String str, MessageListener messageListener) {
        Triple<RBlockingQueue<Object>, RDelayedQueue<Object>, Queue<MessageListener>> obtainQueue = obtainQueue(str);
        synchronized (obtainQueue) {
            Queue queue = (Queue) obtainQueue.getTriple();
            queue.offer(messageListener);
            if (((Queue) obtainQueue.getTriple()).size() == 1) {
                ((RBlockingQueue) obtainQueue.getKey()).subscribeOnElements(obj -> {
                    return CompletableFuture.allOf((CompletableFuture[]) queue.stream().map(messageListener2 -> {
                        return CompletableFuture.runAsync(() -> {
                            messageListener2.onMessage(obj);
                        }, this.executorService);
                    }).toArray(i -> {
                        return new CompletableFuture[i];
                    }));
                });
            }
        }
    }

    public void destroy() {
        Iterator<Map.Entry<String, Triple<RBlockingQueue<Object>, RDelayedQueue<Object>, Queue<MessageListener>>>> it = this.queueMap.entrySet().iterator();
        while (it.hasNext()) {
            ((RDelayedQueue) it.next().getValue().getValue()).destroy();
        }
    }

    protected Triple<RBlockingQueue<Object>, RDelayedQueue<Object>, Queue<MessageListener>> obtainQueue(String str) {
        return this.queueMap.computeIfAbsent(str, str2 -> {
            RBlockingQueue blockingQueue = this.redissonClient.getBlockingQueue(str2);
            return new Triple(blockingQueue, this.redissonClient.getDelayedQueue(blockingQueue), new LinkedBlockingQueue());
        });
    }
}
