Fazendo processamento de batches assíncrono em tempo real com Kotlin/Java
Tenho um consumidor/subscriber AMQP/MQTT (RabbitMQ, Mosquitto, ActiveMQ Artemis) que recebe mensagens em tempo real de forma assíncrona contendo números. Quando uma mensagem chegar quero ir acumulando estes números em uma fila/array. Depois de determinados segundos, pegarei toda esta fila, farei a média de todos esses números, e esvaziarei-a para que novas mensagens cheguem com uma fila vazia.
Para implementar isso usaremos as seguintes estruturas de dados nativas do Java:
Classe para lidar com concorrência:
ReentrantReadWriteLock
Estrutura para guardar as filas para cada routing key/tópico:
HashMap
Estruturas para guardar as mensagens vindas do RabbitMQ/MQTT:
ArrayBlockingQueue (thread safe) (performance ++)LinkedList (não thread safe) (performance ++)
LinkedBlockingQueue (thread safe) (performance +) não precisamos de thread safe
LinkedTransferQueue (?)
Bônus
Aqui vai uma lista de coisas interessantes que ainda não testei mas vale a pena dar uma olhada.
Striped Lock do Guava
É possível aumentar a performance de acesso ao HashMap usando um Lock especial chamado StripedLock que está disponível na biblioteca Guava.
Confira esse tutorial do Baeldung
https://www.baeldung.com/java-lock-stripping
Biblioteca Disruptor para filas de alta performance
Website do Disruptor
https://lmax-exchange.github.io/disruptor/
Referências
LinkedTransferQueue
https://www.baeldung.com/java-transfer-queue
Comparando Queues
https://stackoverflow.com/questions/35967792/when-to-prefer-linkedblockingqueue-over-arrayblockingqueue
Documentação da Oracle sobre Queues
https://docs.oracle.com/javase/tutorial/collections/implementations/queue.html