Sending JMS messages in a Transaction

In this post under JMS, I will explain how to send JMS messages in a Transaction.

If you are running your code in Java SE environment or Java EE application client side code. We create a transacted session by calling the below method on ConnectionFactory class and passing “JMSContext.SESSION_TRANSACTED” as session mode.


public JMSContext createContext(int sessionMode)

In a Java EE web or EJB container, when there is an active JTA transaction in progress. The argument sessionMode is ignored. The session will participate in the JTA transaction and will be committed or rolled back when that transaction is committed or rolled back, not by calling the JMSContext’s commit or rollback methods. Since the argument is ignored, developers are recommended to use createContext() instead of this method.

In the Java EE web or EJB container, when there is no active JTA transaction in progress. The argument acknowledgeMode must be set to either of JMSContext.AUTO_ACKNOWLEDGE or JMSContext.DUPS_OK_ACKNOWLEDGE. The session will be non-transacted and messages received by this session will be acknowledged automatically according to the value of acknowledgeMode.

Below is the complete code of Producer class

Producer


1  package package9;
2  
3  import javax.jms.ConnectionFactory;
4  import javax.jms.Destination;
5  import javax.jms.JMSContext;
6  import javax.jms.JMSProducer;
7  
8  public class Producer implements Runnable {
9  	private Destination destination;
10 	private ConnectionFactory connectionFactory;
11 	
12 	public Producer(Destination destination, ConnectionFactory connectionFactory) {
13 		this.destination = destination;
14 		this.connectionFactory = connectionFactory;
15 	}
16 	
17 	@Override
18 	public void run() {
19 		JMSContext jmsContext =  null;
20 		
21 		try {
22 			jmsContext = connectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
23 			System.out.println("Producer:" + jmsContext.getTransacted());
24 			JMSProducer jmsProducer = jmsContext.createProducer();
25 			jmsProducer.send(destination, "Hello World 1");
26 			jmsContext.commit();
27 			jmsProducer.send(destination, "Hello World 2");
28 		} catch(Exception excep) {
29 			excep.printStackTrace();
30 		} finally {
31 			if(jmsContext != null) {
32 				jmsContext.close();
33 			}
34 		}
35 	}
36 }

At line 22, we create an instance of JMSContext which internally creates a transacted session since we pass JMSContext.SESSION_TRANSACTED as session mode while creating JMSContext.

At line 23, we are calling “getTransacted” method on JMSContext instance. This method will return true if a transacted session is created otherwise false.

At line 25, we send a message and line 26 we finalize the send by calling “commit” method on JMSContext.

The messages send by the JMSProducer is stored by the JMSProvider untill commit is called. Once commit is called JMSProvider will send the message to the destination.

We can also call rollback by calling the “rollback” method on JMSContext. When rollback is called, the JMSProvider will discard the messages.

Between the start of the transaction and call of commit, we can send any number of messages.

The completion of a session’s current transaction (by calling commit) automatically begins the next. The result is that a transacted session always has a current transaction within which its work is done.

At line 27, we send another message in a new transaction but this time we didn’t called the commit and closed the JMSContext. The JMSProvider will discard this message.

Below is the complete code of Consumer class

Consumer


package package9;

import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.TextMessage;

public class Consumer implements Runnable {
	private Destination destination;
	private ConnectionFactory connectionFactory;
	
	public Consumer(Destination destination, ConnectionFactory connectionFactory) {
		this.destination = destination;
		this.connectionFactory = connectionFactory;
	}
	
	@Override
	public void run() {
		JMSContext jmsContext =  null;
		
		try {
			jmsContext = connectionFactory.createContext();
			JMSConsumer jmsConsumer = jmsContext.createConsumer(destination);
			Message message = jmsConsumer.receive();
			System.out.println(((TextMessage)message).getText());
			jmsConsumer.close();
		} catch(Exception excep) {
			excep.printStackTrace();
		} finally {
			if(jmsContext != null) {
				jmsContext.close();
			}
		}		
	}
}

Below is the complete code of Main class

Main Class


package package9;
import java.util.Properties;

import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class Example9 {
	public static void main(String[] args) throws NamingException, JMSException {
		Properties env = new Properties();
		env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory");
		env.put(Context.PROVIDER_URL, "file:///C:/openmq5_1_1/temp");
		InitialContext initialContext = new InitialContext(env);
		ConnectionFactory connectionFactory = (ConnectionFactory)initialContext.lookup("MyConnectionFactory");
		Destination destination = (Destination)initialContext.lookup("MyQueue");
		
		Producer producer = new Producer(destination, connectionFactory);
		Consumer consumer = new Consumer(destination, connectionFactory);
		
		Thread producerThread = new Thread(producer);
		Thread consumerThread = new Thread(consumer);
		
		producerThread.start();
		consumerThread.start();
		
		initialContext.close();
	}
}

Below is the output

Output

Producer:true
Hello World 1

Leave a Reply