In this post under JMS I will explain how to receive messages asynchronously.
Note: I will be using JMS 2.0 api for this and future examples.
In all my previous post’s examples I have used “receive” method under “JSMConsumer” class to receive messages synchronously.
To receive messages asynchronously we will take the help of “javax.jms.MessageListener” interface.
When an implementation of “javax.jms.MessageListener” is set as message listener to an instance of JMSConsumer. The JMS provider will call it whenever a message is available to deliver.
Below is the code of Consumer class which implements the interface.
Consumer
1 package package6;
2
3 import javax.jms.ConnectionFactory;
4 import javax.jms.Destination;
5 import javax.jms.JMSConsumer;
6 import javax.jms.JMSContext;
7 import javax.jms.JMSException;
8 import javax.jms.Message;
9 import javax.jms.MessageListener;
10 import javax.jms.TextMessage;
11
12 public class Consumer implements Runnable, MessageListener {
13 private Destination destination;
14 private ConnectionFactory connectionFactory;
15
16 public Consumer(Destination destination, ConnectionFactory connectionFactory) {
17 this.destination = destination;
18 this.connectionFactory = connectionFactory;
19 }
20
21 @Override
22 public void run() {
23 JMSContext jmsContext = null;
24 try {
25 jmsContext = connectionFactory.createContext();
26 JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
27 jmsConsumer.setMessageListener(this);
28 Thread.sleep(10000);
29 jmsConsumer.close();
30 } catch(Exception excep) {
31 excep.printStackTrace();
32 } finally {
33 if(jmsContext != null) {
34 jmsContext.close();
35 }
36 }
37 }
38
39 @Override
40 public void onMessage(Message message) {
41 try {
42 System.out.println("MessageListener: " + ((TextMessage)message).getText());
43 } catch(JMSException exception) {
44 exception.printStackTrace();
45 }
46 }
47 }
When implementing the javax.jms.MessageListener interface, we need to provide implementation for only one method which is “onMessage” as shown in the above code.
Next we tell the instance of JMSConsumer that we will be receiving messages asynchronously by calling “setMessageListener” method and passing an implementation of javax.jms.MessageListener interface, which in this case is an instance of Consumer class. Refer to line 27.
Now when the provider receives a message, it calls “onMessage” method of Consumer class and pass the message as an argument.
In this way we can configure the consumer to receive messages asynchronously.
Below is the complete code of Producer class for your reference.
Producer
package package6;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSContext;
import javax.jms.JMSProducer;
public class Producer implements Runnable {
private Destination destination;
private ConnectionFactory connectionFactory;
public Producer(Destination destination, ConnectionFactory connectionFactory) {
this.destination = destination;
this.connectionFactory = connectionFactory;
}
@Override
public void run() {
JMSContext jmsContext = null;
try {
jmsContext = connectionFactory.createContext();
JMSProducer jmsProducer = jmsContext.createProducer();
jmsProducer.send(destination, "Hello World");
} catch(Exception excep) {
excep.printStackTrace();
} finally {
if(jmsContext != null) {
jmsContext.close();
}
}
}
}
Below is the complete code of Main class for your reference
package package6;
import java.util.Properties;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class Example6 {
public static void main(String[] args) throws NamingException, JMSException {
Properties env = new Properties();
env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory");
env.put(Context.PROVIDER_URL, "file:///C:/openmq5_1_1/temp");
InitialContext initialContext = new InitialContext(env);
ConnectionFactory connectionFactory = (ConnectionFactory)initialContext.lookup("MyConnectionFactory");
Destination destination = (Destination)initialContext.lookup("MyQueue");
Producer producer = new Producer(destination, connectionFactory);
Consumer consumer = new Consumer(destination, connectionFactory);
Thread producerThread = new Thread(producer);
Thread consumerThread = new Thread(consumer);
producerThread.start();
consumerThread.start();
initialContext.close();
}
}