package org.apache.iotdb.db.engine.trigger.sink.mqtt;

import org.apache.iotdb.db.engine.trigger.sink.api.Handler;
import org.apache.iotdb.db.engine.trigger.sink.exception.SinkException;
import org.apache.iotdb.tsfile.utils.Binary;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;

/* loaded from: input_file:org/apache/iotdb/db/engine/trigger/sink/mqtt/MQTTHandler.class */
public class MQTTHandler implements Handler<MQTTConfiguration, MQTTEvent> {
    private BlockingConnection connection;
    private String payloadFormatter;

    @Override // org.apache.iotdb.db.engine.trigger.sink.api.Handler
    public void open(MQTTConfiguration mQTTConfiguration) throws Exception {
        MQTT mqtt = new MQTT();
        mqtt.setHost(mQTTConfiguration.getHost(), mQTTConfiguration.getPort());
        mqtt.setUserName(mQTTConfiguration.getUsername());
        mqtt.setPassword(mQTTConfiguration.getPassword());
        this.connection = mqtt.blockingConnection();
        this.connection.connect();
        this.payloadFormatter = generatePayloadFormatter(mQTTConfiguration);
    }

    private static String generatePayloadFormatter(MQTTConfiguration mQTTConfiguration) throws SinkException {
        return String.format("{\"device\":\"%s\",\"measurements\":[%s]%s", mQTTConfiguration.getDevice(), arrayToJson(mQTTConfiguration.getMeasurements()), ",\"timestamp\":%d,\"values\":[%s]}");
    }

    private static String arrayToJson(Object[] objArr) throws SinkException {
        if (objArr.length <= 0) {
            throw new SinkException("The number of measurements should be positive.");
        }
        StringBuilder sb = new StringBuilder(objectToJson(objArr[0]));
        for (int i = 1; i < objArr.length; i++) {
            sb.append(',').append(objectToJson(objArr[i]));
        }
        return sb.toString();
    }

    private static String objectToJson(Object obj) {
        return ((obj instanceof String) || (obj instanceof Binary)) ? '\"' + obj.toString() + '\"' : obj.toString();
    }

    @Override // org.apache.iotdb.db.engine.trigger.sink.api.Handler
    public void close() throws Exception {
        this.connection.disconnect();
    }

    @Override // org.apache.iotdb.db.engine.trigger.sink.api.Handler
    public void onEvent(MQTTEvent mQTTEvent) throws Exception {
        this.connection.publish(mQTTEvent.getTopic(), String.format(this.payloadFormatter, Long.valueOf(mQTTEvent.getTimestamp()), arrayToJson(mQTTEvent.getValues())).getBytes(), mQTTEvent.getQoS(), mQTTEvent.retain());
    }
}
