RabbitMQ Client Java - Erro Broken Pipe
Se você tem uma aplicação java que fica por muito tempo ligado consumindo de um broker RabbitMQ, eventualmente essa exception pode acontecer:
ERROR IOException java.net.SocketException: Broken pipe at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.DataOutputStream.flush(DataOutputStream.java:123) at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:142) at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:488) at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:125) at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:316) at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:292) at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:285) at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:383) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533) ...
Essa exception basicamente acontece quando a sua aplicação tenta enviar algum dado de volta ao servidor RabbitMQ e o servidor encerrou a sua conexão por vontade própria.
Uma das possíveis causas desse erro (foi o que me deu dor de cabeça).
A minha forma de resolver foi que ao criar minha conexão, eu atribuísse o parâmetro "max prefetch count" para um número razoável ao criar um channel do rabbitmq. O número que escolhi foi 800 para meu programa que consome mensagens do RabbitMQ e salva no PostgreSQL.
rabbitMQChannel = this.rabbitMQConnection.createChannel() rabbitMQChannel.basicQos(productionLineMetadata.rabbitmqPrefetchSize, //max prefetch sizeproductionLineMetadata.rabbitmqPrefetchCount,//max prefetch countfalse) rabbitMQChannel.basicConsume( productionLineMetadata.getMainQueueName(), false, deliverCallback, cancelCallback )
Referências
Discussão da exception - Mulesoft
https://help.mulesoft.com/s/article/RabbitMQ-resets-the-connection-and-messages-are-resent-to-AMQP-inbound-endpoint
Discussão - Lukasmestan
https://lukasmestan.com/rabbitmq-broken-pipe-or-closed-connection/