java - RabbitMQ: Few messages remain unacknowledged -
i have consumer reads rabbitmq , writes database. consumer class use, user defined, , implements callable.
manual consumer method:
public static void consume(queue queue, string consumername, int threadcount, int prefetchcount) throws exception { dbutil.initializeconnectionpool(); connection connection = rabbitmqutils.getconnection(); channel channel = connection.createchannel(); queueingconsumer consumer = new queueingconsumer(channel); channel.basicqos(threadcount * prefetchcount); queue.initiateconsumer(channel, consumer); system.out.println("mq server: " + connection.getaddress() + ":" + connection.getport()); system.out.println("queue: " + queue.name); // multi-threading code. threadpool = executors.newfixedthreadpool(threadcount); class c = class.forname(consumername); while (true) { consumer consumerinstance = (consumer) c.newinstance(); java.sql.connection dbconnection = null; queueingconsumer.delivery delivery = null; try { delivery = consumer.nextdelivery(); dbconnection = dbutil.getconnectionfrompool(); consumerinstance.setconsumerdetails(new string(delivery.getbody()), dbconnection, channel, queue, delivery); threadpool.submit(consumerinstance); } catch (exception e) { if (delivery != null) { consumer.requeuemessage(delivery, new string(delivery.getbody()), channel, queue, e); } if (dbconnection != null) { dbconnection.close(); } } } }
call method:
@override public object call() throws exception { try { dowork(this.message, this.dbconnection); channel.basicack(this.delivery.getenvelope().getdeliverytag(), false); } catch (exception e) { consumer.dumpmsg(e.getmessage(), "error msg"); consumer.requeuemessage(this.delivery, new string(this.delivery.getbody()), this.channel, this.queue, e); } { java.sql.connection dbconnection = this.dbconnection; if (dbconnection != null) { dbconnection.close(); } } return null; }
the dowork
method overridden in children of consumer
class. issue 1 in 1000 messages stuck in queue in unacked
state. can them out of unacked
state stopping , starting consumer. guess acknowledgement gets lost in network traffic. want know if there way cause messages remain in unacked
state sometime, before going queue, re-consumed.
Comments
Post a Comment