package ai.vital.prime.service.queue;

import ai.vital.vitalservice.QueueConsumer;
import ai.vital.vitalservice.VitalStatus;
import ai.vital.vitalservice.query.ResultList;
import ai.vital.vitalsigns.java.VitalJavaSerializationUtils;
import ai.vital.vitalsigns.model.GraphObject;
import ai.vital.vitalsigns.model.VITAL_GraphContainerObject;
import ai.vital.vitalsigns.model.VitalApp;
import ai.vital.vitalsigns.model.VitalOrganization;
import ai.vital.vitalsigns.model.properties.Property_hasAppID;
import ai.vital.vitalsigns.model.properties.Property_hasOrganizationID;
import ai.vital.vitalsigns.model.property.IntegerProperty;
import ai.vital.vitalsigns.model.property.StringProperty;
import java.io.Serializable;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/vital/prime/service/queue/KafkaQueueInterface.class */
public class KafkaQueueInterface implements QueueInterface {
    private KafkaProducer<String, byte[]> producer;
    private int maxMessageBytesLength;
    private static final Logger log = LoggerFactory.getLogger(KafkaQueueInterface.class);
    public static final Integer DEFAULT_maxMessageBytesLength = 990000;
    private Properties producerProperties = new Properties();
    private Properties consumerProperties = new Properties();
    private Map<String, List<ConsumerWrapper>> consumers = new HashMap();

    /* loaded from: input_file:ai/vital/prime/service/queue/KafkaQueueInterface$ConsumerWrapper.class */
    static class ConsumerWrapper {
        int num;
        KafkaConsumer<String, byte[]> kConsumer;
        QueueConsumer vConsumer;
        boolean running = false;
        boolean threadStopped = false;
        Thread thread = null;
        private List<String> topicNames;
        private List<String> fullTopicNames;
        private Properties properties;

        public ConsumerWrapper(int i, List<String> list, List<String> list2, Properties properties, QueueConsumer queueConsumer) {
            this.num = i;
            this.properties = properties;
            this.topicNames = list;
            this.fullTopicNames = list2;
            this.vConsumer = queueConsumer;
        }

        public boolean close(long j) {
            this.running = false;
            int i = 0;
            long j2 = j >= 50 ? j / 50 : 0L;
            while (this.thread.isAlive() && i < j2) {
                try {
                    Thread.sleep(50L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                i++;
            }
            if (this.thread.isAlive()) {
                KafkaQueueInterface.log.error("Couldn't stop the consumer thread within {}ms", Long.valueOf(j));
                return false;
            }
            KafkaQueueInterface.log.info("Consumer thread stopped successfully after {}ms", Long.valueOf(i * 50));
            return true;
        }

        public void start() {
            if (this.running) {
                throw new RuntimeException("Consumer already started");
            }
            this.running = true;
            this.thread = new Thread() { // from class: ai.vital.prime.service.queue.KafkaQueueInterface.ConsumerWrapper.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    List list = ConsumerWrapper.this.fullTopicNames;
                    ConsumerWrapper.this.kConsumer = new KafkaConsumer<>(ConsumerWrapper.this.properties, new StringDeserializer(), new ByteArrayDeserializer());
                    ConsumerWrapper.this.kConsumer.subscribe(list);
                    while (ConsumerWrapper.this.running) {
                        Iterator it = ConsumerWrapper.this.kConsumer.poll(100L).iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            long currentTimeMillis = System.currentTimeMillis();
                            String str = consumerRecord.topic();
                            String str2 = str.split("__")[2];
                            byte[] bArr = (byte[]) consumerRecord.value();
                            KafkaQueueInterface.log.info("Consumer #{}, kafka topic = {} record partition = {} offset = {}, timestamp = {} key = {}, value = {}", Integer.valueOf(ConsumerWrapper.this.num), str, Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()), consumerRecord.key(), Integer.valueOf(bArr.length));
                            try {
                                List list2 = (List) VitalJavaSerializationUtils.deserialize(bArr);
                                if (list2 == null) {
                                    KafkaQueueInterface.log.error("Null graph objects list message - skipping");
                                } else {
                                    ResultList resultList = new ResultList();
                                    Iterator it2 = list2.iterator();
                                    while (it2.hasNext()) {
                                        resultList.addResult((GraphObject) it2.next());
                                    }
                                    ConsumerWrapper.this.vConsumer.messageReceived(str2, resultList);
                                    KafkaQueueInterface.log.info("Consumer #{}, processed kafka record partition = {} offset = {}, timestamp = {} key = {} time {}ms", Integer.valueOf(ConsumerWrapper.this.num), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()), consumerRecord.key(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                                }
                            } catch (Exception e) {
                                KafkaQueueInterface.log.error("Error when deserializing kafka message: " + e.getLocalizedMessage(), (Throwable) e);
                            }
                        }
                    }
                    ConsumerWrapper.this.kConsumer.close();
                }
            };
            this.thread.setDaemon(true);
            this.thread.start();
        }
    }

    public KafkaQueueInterface(VITAL_GraphContainerObject vITAL_GraphContainerObject) throws Exception {
        this.maxMessageBytesLength = DEFAULT_maxMessageBytesLength.intValue();
        StringProperty stringProperty = (StringProperty) vITAL_GraphContainerObject.getProperty("producer");
        StringProperty stringProperty2 = (StringProperty) vITAL_GraphContainerObject.getProperty("consumer");
        if (stringProperty == null) {
            throw new Exception("No producer config property");
        }
        if (stringProperty2 == null) {
            throw new Exception("No consumer config property");
        }
        IntegerProperty integerProperty = (IntegerProperty) vITAL_GraphContainerObject.getProperty("maxMessageBytesLength");
        if (integerProperty != null) {
            this.maxMessageBytesLength = integerProperty.intValue();
            log.info("maxMessageBytesLength: {} bytes", Integer.valueOf(this.maxMessageBytesLength));
        } else {
            log.warn("No maxMessageBytesLength config, default value: " + integerProperty);
        }
        this.producerProperties.load(new StringReader(stringProperty.asString()));
        this.consumerProperties.load(new StringReader(stringProperty2.asString()));
        log.info("Starting kafka producer...");
        this.producer = new KafkaProducer<>(this.producerProperties, new StringSerializer(), new ByteArraySerializer());
        log.info("Shared producer started successfully");
    }

    private String getFullQueueName(VitalOrganization vitalOrganization, VitalApp vitalApp, String str) {
        return vitalOrganization.get(Property_hasOrganizationID.class).toString() + "__" + vitalApp.get(Property_hasAppID.class).toString() + "__" + str;
    }

    @Override // ai.vital.prime.service.queue.QueueInterface
    public VitalStatus queueSend(VitalOrganization vitalOrganization, VitalApp vitalApp, String str, String str2, List<GraphObject> list) throws Exception {
        List<GraphObject> list2;
        if (list instanceof Serializable) {
            list2 = list;
        } else {
            list2 = new ArrayList();
            list2.addAll(list);
        }
        final String uri = list.size() > 0 ? list.get(0).getURI() : null;
        String fullQueueName = getFullQueueName(vitalOrganization, vitalApp, str);
        byte[] serialize = SerializationUtils.serialize((Serializable) list2);
        if (serialize.length > this.maxMessageBytesLength) {
            throw new Exception("Serialized message length " + serialize.length + " exceeds max allowed value: " + this.maxMessageBytesLength + " bytes");
        }
        this.producer.send(new ProducerRecord(fullQueueName, str2, serialize), new Callback() { // from class: ai.vital.prime.service.queue.KafkaQueueInterface.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc != null) {
                    KafkaQueueInterface.log.error("Error when sending kafka message URI " + uri + " : " + exc.getLocalizedMessage() + " - " + (recordMetadata != null ? recordMetadata.toString() : ""), (Throwable) exc);
                } else {
                    KafkaQueueInterface.log.info("Kafka message URI " + uri + " sent - " + (recordMetadata != null ? recordMetadata.toString() : ""));
                }
            }
        });
        return VitalStatus.withOKMessage("Message sent");
    }

    @Override // ai.vital.prime.service.queue.QueueInterface
    public synchronized VitalStatus queueConsumer(VitalOrganization vitalOrganization, VitalApp vitalApp, String str, QueueConsumer queueConsumer, Map<String, Object> map) throws Exception {
        int intValue;
        if (map == null) {
            map = new HashMap();
        }
        ArrayList<String> arrayList = new ArrayList(Arrays.asList(str.split(",")));
        ArrayList arrayList2 = new ArrayList();
        for (String str2 : arrayList) {
            if (str2.isEmpty()) {
                throw new Exception("Topic name cannot be empty");
            }
            arrayList2.add(getFullQueueName(vitalOrganization, vitalApp, str2));
        }
        Collections.sort(arrayList2);
        String join = StringUtils.join(arrayList2, ",");
        if (this.consumers.get(join) != null) {
            throw new Exception("A consumer already registered to topic: " + str + " (" + join + DefaultExpressionEngine.DEFAULT_INDEX_END);
        }
        Properties properties = new Properties();
        properties.putAll(this.consumerProperties);
        properties.putAll(map);
        Integer num = (Integer) map.get(QueueFunctions.property_vital_workers_threads);
        if (num == null) {
            log.info("Getting partitions for topic: " + ((String) arrayList2.get(0)));
            int size = this.producer.partitionsFor((String) arrayList2.get(0)).size();
            log.info("Topic: {} partitions count: {}", arrayList2.get(0), Integer.valueOf(size));
            intValue = size;
        } else {
            if (num.intValue() < 1) {
                throw new Exception("vital.workers.threads count must be > 0: " + num);
            }
            log.info("Using custom workers count: {}", num);
            intValue = num.intValue();
        }
        log.info("Effective consumer properties: {}", properties);
        ArrayList arrayList3 = new ArrayList();
        for (int i = 0; i < intValue; i++) {
            ConsumerWrapper consumerWrapper = new ConsumerWrapper(i + 1, arrayList, arrayList2, properties, queueConsumer);
            consumerWrapper.start();
            arrayList3.add(consumerWrapper);
        }
        this.consumers.put(join, arrayList3);
        VitalStatus withOKMessage = VitalStatus.withOKMessage("Consumer registered");
        withOKMessage.setSuccesses(Integer.valueOf(intValue));
        return withOKMessage;
    }

    @Override // ai.vital.prime.service.queue.QueueInterface
    public void close() {
        for (Map.Entry<String, List<ConsumerWrapper>> entry : this.consumers.entrySet()) {
            Iterator<ConsumerWrapper> it = entry.getValue().iterator();
            while (it.hasNext()) {
                try {
                    it.next().close(0L);
                } catch (Exception e) {
                    log.error("Error when closing consumer, queue: " + entry.getKey() + ": " + e.getLocalizedMessage());
                }
            }
        }
        if (this.producer != null) {
            try {
                this.producer.close();
                log.info("Producer shut down successfully");
            } catch (Exception e2) {
                log.error("Error when shutting down producer: " + e2.getLocalizedMessage(), (Throwable) e2);
            }
            this.producer = null;
        }
    }

    @Override // ai.vital.prime.service.queue.QueueInterface
    public synchronized VitalStatus queueRemoveConsumer(VitalOrganization vitalOrganization, VitalApp vitalApp, String str) throws Exception {
        try {
            ArrayList<String> arrayList = new ArrayList(Arrays.asList(str.split(",")));
            ArrayList arrayList2 = new ArrayList();
            for (String str2 : arrayList) {
                if (str2.isEmpty()) {
                    throw new Exception("Topic name cannot be empty");
                }
                arrayList2.add(getFullQueueName(vitalOrganization, vitalApp, str2));
            }
            Collections.sort(arrayList2);
            List<ConsumerWrapper> remove = this.consumers.remove(StringUtils.join(arrayList2, ","));
            if (remove == null) {
                throw new Exception("no consumer registered for queue: " + str);
            }
            Iterator<ConsumerWrapper> it = remove.iterator();
            while (it.hasNext()) {
                it.next().close(500L);
            }
            return VitalStatus.withOKMessage("Consumer removed from queue: " + str);
        } catch (Exception e) {
            return VitalStatus.withError(e.getLocalizedMessage());
        }
    }
}
