Sending JMS messages asynchronously

In this post, I will explain how to send JMS messages asynchronously.

Note: I will be using JMS 2.0 api for this and future examples.

When messages are sent asynchronously, the producer of the message won’t wait for acknowledgement from the provider.

In this case, the producer uses callback approach so that he can be notified about the status of the message.

For this callback approach, we need to implement the interface javax.jms.CompletionListener.

Below code shows Producer class implementing the interface.

Producer


1  package package5;
2  
3  import javax.jms.CompletionListener;
4  import javax.jms.ConnectionFactory;
5  import javax.jms.Destination;
6  import javax.jms.JMSContext;
7  import javax.jms.JMSProducer;
8  import javax.jms.Message;
9  
10 public class Producer implements Runnable, CompletionListener {
11  private Destination destination;
12  private ConnectionFactory connectionFactory;
13  
14  public Producer(Destination destination, ConnectionFactory connectionFactory) {
15      this.destination = destination;
16      this.connectionFactory = connectionFactory;
17  }
18  
19  @Override
20  public void run() {
21      JMSContext jmsContext =  null;
22      
23      try {
24          jmsContext = connectionFactory.createContext();
25          JMSProducer jmsProducer = jmsContext.createProducer();
26          jmsProducer.setAsync(this);
27          jmsProducer.send(destination, "Hello World");
28      } catch(Exception excep) {
29          excep.printStackTrace();
30      } finally {
31          if(jmsContext != null) {
32              jmsContext.close();
33          }
34      }
35  }
36 
37  @Override
38  public void onCompletion(Message message) {
39      System.out.println("Message was sent");
40  }
41 
42  @Override
43  public void onException(Message message, Exception exception) {
44      System.out.println("Error happened when sending the message");
45  }
46 }

As shown in the above code, the Producer class has to provide implementation for two interface methods “onCompletion” and “onException”.

When message is successfully sent, onCompletion method is called and if an error occurs then onException is called.

At line 26, we set this class to JMSProducer instance by passing it as an argument to “setAsync” method.

In other words we are saying an instance of JMSProducer class to use the instance of Producer class to call the callback methods of the interface.

In this way we are telling the instance of JMSProducer to send the messages asynchronously.

Below is the consumer class for your reference

Consumer


package package5;

import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.TextMessage;

public class Consumer implements Runnable {
    private Destination destination;
    private ConnectionFactory connectionFactory;
    
    public Consumer(Destination destination, ConnectionFactory connectionFactory) {
        this.destination = destination;
        this.connectionFactory = connectionFactory;
    }
    
    @Override
    public void run() {
        JMSContext jmsContext =  null;
        
        try {
            jmsContext = connectionFactory.createContext();
            JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
            Message message = jmsConsumer.receive();
            System.out.println(((TextMessage)message).getText());
            jmsConsumer.close();
        } catch(Exception excep) {
            excep.printStackTrace();
        } finally {
            if(jmsContext != null) {
                jmsContext.close();
            }
        }       
    }
}

There is no change to Consumer class when compared to Consumer class code from previous posts.

Below is the complete main code for your reference.

Main Class


1  package package5;
2  
3  import java.util.Properties;
4  
5  import javax.jms.ConnectionFactory;
6  import javax.jms.Destination;
7  import javax.jms.JMSException;
8  import javax.naming.Context;
9  import javax.naming.InitialContext;
10 import javax.naming.NamingException;
11 
12 public class Example5 {
13  public static void main(String[] args) throws NamingException, JMSException {
14      Properties env = new Properties();
15      env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory");
16      env.put(Context.PROVIDER_URL, "file:///C:/openmq5_1_1/temp");
17      InitialContext initialContext = new InitialContext(env);
18      ConnectionFactory connectionFactory = (ConnectionFactory)initialContext.lookup("MyConnectionFactory");
19      Destination destination = (Destination)initialContext.lookup("MyQueue");
20              
21      Producer producer = new Producer(destination, connectionFactory);
22      Consumer consumer = new Consumer(destination, connectionFactory);
23      
24      Thread producerThread = new Thread(producer);
25      Thread consumerThread = new Thread(consumer);
26      
27      producerThread.start();
28      consumerThread.start();
29      
30      initialContext.close();
31  }
32 }

There is no change to Main class when compared to code from previous posts.

Leave a Reply