java - RabbitMQ: Few messages remain unacknowledged -


i have consumer reads rabbitmq , writes database. consumer class use, user defined, , implements callable.

manual consumer method:

public static void consume(queue queue, string consumername, int threadcount, int prefetchcount) throws exception {     dbutil.initializeconnectionpool();      connection connection = rabbitmqutils.getconnection();     channel channel = connection.createchannel();     queueingconsumer consumer = new queueingconsumer(channel);      channel.basicqos(threadcount * prefetchcount);     queue.initiateconsumer(channel, consumer);      system.out.println("mq server: " + connection.getaddress() + ":" + connection.getport());     system.out.println("queue: " + queue.name);      // multi-threading code.     threadpool = executors.newfixedthreadpool(threadcount);     class c = class.forname(consumername);      while (true) {         consumer consumerinstance = (consumer) c.newinstance();         java.sql.connection dbconnection = null;         queueingconsumer.delivery delivery = null;          try {             delivery = consumer.nextdelivery();             dbconnection = dbutil.getconnectionfrompool();             consumerinstance.setconsumerdetails(new string(delivery.getbody()), dbconnection, channel, queue, delivery);             threadpool.submit(consumerinstance);         } catch (exception e) {             if (delivery != null) {                 consumer.requeuemessage(delivery, new string(delivery.getbody()), channel, queue, e);             }             if (dbconnection != null) {                 dbconnection.close();             }          }     } } 

call method:

@override public object call() throws exception {     try {         dowork(this.message, this.dbconnection);         channel.basicack(this.delivery.getenvelope().getdeliverytag(), false);     } catch (exception e) {         consumer.dumpmsg(e.getmessage(), "error msg");         consumer.requeuemessage(this.delivery, new string(this.delivery.getbody()), this.channel, this.queue, e);     } {         java.sql.connection dbconnection = this.dbconnection;         if (dbconnection != null) {             dbconnection.close();         }     }      return null; } 

the dowork method overridden in children of consumer class. issue 1 in 1000 messages stuck in queue in unacked state. can them out of unacked state stopping , starting consumer. guess acknowledgement gets lost in network traffic. want know if there way cause messages remain in unacked state sometime, before going queue, re-consumed.


Comments

Popular posts from this blog

html - Outlook 2010 Anchor (url/address/link) -

javascript - Why does running this loop 9 times take 100x longer than running it 8 times? -

Getting gateway time-out Rails app with Nginx + Puma running on Digital Ocean -