Hi,
I decided to write this blog because I could not find one simple example in combination to Spring, ActiveMQ and Email the message from queue.
Problem taken
What I am going to do in this example is:
- Create one Spring application, that has one POJO i.e. Entity, I took Mobile as my pojo with three attributes as Company, Model and Price.
- There will be one message producer which will post one message (as entity Mobile) to queue-one.
- There will be a listener to queue-one and on receiving a message it will de-queue the message. Further after processing Listener post message to queue-two.
- This queue-two has another listener which de-queue the message from queue-two and process it to send a mail.
Pictorial Presentation
Technologies Used
Below is the stack of technologies
- Spring 3
- ActiveMQ 5.6.0
- Java Mail
- Tomcat 7
- Maven
How to execute and flow explanation
- You need to hit url http://host:port/app/main/amqmobile/form.html
- Based on given configuration this url will search for file form.jsp inside "WEB-INF/amqmobile" folder in your web application.
- As this is a GET request method in getForm of AMQMobileController will be executed.
- From getForm method AMQMsgProducer.produce method will be called that in turn creates
- AMQMobile object
- Convert it into ObjectMessage
- Send it to queue-one namely "amqMobileQueue"
- As soon as the message send to "amqMobileQueue", listener 1 i.e. AMQMobileListener will get active and its method onMessage will be executed.
- This is the place to perform biz logic. Now that message is send to AMQMsgSender.sendMessage method. This method sends the message to queue-two i.e. amqMobileEmailQueue.
- Now Listener 2 i.e. AMQMobileEmailListener will act and its method onMessage sends the email.
Config File - applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:intg="http://www.springframework.org/schema/integration"
xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:mail="http://www.springframework.org/schema/integration/mail"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-2.1.xsd
http://www.springframework.org/schema/integration/mail
http://www.springframework.org/schema/integration/mail/spring-integration-mail-2.1.xsd">
<bean id="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl"
p:host="domain-address"
p:port="587"
p:username="username"
p:password="password"
>
<property name="javaMailProperties">
<props>
<prop key="mail.smtp.auth">true</prop>
<prop key="mail.smtp.starttls.enable">true</prop>
<prop key="mail.smtp.auth.ntlm.domain" >DOMAIN</prop>
</props>
</property>
</bean>
<bean id="reqAMQMobileQueue" class="org.apache.activemq.command.ActiveMQQueue">
<property name="physicalName" value="amqMobileQueue" />
</bean>
<bean id="reqAMQMobileEmailQueue" class="org.apache.activemq.command.ActiveMQQueue">
<property name="physicalName" value="amqMobileEmailQueue" />
</bean>
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="amqMobileListener" class="com.examples.amqmobile.AMQMobileListener" />
<bean id="container" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref ="activeMQConnectionFactory" />
<property name="destination" ref ="reqAMQMobileQueue"/>
<property name="messageListener" ref ="amqMobileListener"/>
<property name="concurrentConsumers" value="1" />
</bean>
<bean id="amqMobileEmailListener" class="com.examples.amqmobile.AMQMobileEmailListener" />
<bean id="emailContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref ="activeMQConnectionFactory" />
<property name="destination" ref ="reqAMQMobileEmailQueue"/>
<property name="messageListener" ref ="amqMobileEmailListener"/>
<property name="concurrentConsumers" value="1" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="activeMQConnectionFactory" />
<property name="defaultDestination" ref="reqAMQMobileQueue" />
</bean>
</beans>
Config File - main-servlet.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
  http://www.springframework.org/schema/context
  http://www.springframework.org/schema/context/spring-context-2.5.xsd">
    <context:component-scan base-package="com" />
    <bean class="org.springframework.web.servlet.mvc.annotation.DefaultAnnotationHandlerMapping" />
    <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver" p:prefix="/WEB-INF/jsp/" p:suffix=".jsp" />
</beans>
Config File - web.xml
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
id="WebApp_ID" version="2.5">
<display-name>Spring examples</display-name>
<description>Spring examples</description>
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>
/WEB-INF/classes/applicationContext.xml
</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<servlet>
<servlet-name>main</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>main</servlet-name>
<url-pattern>/main/*</url-pattern>
</servlet-mapping>
</web-app>
JSP File - form.jsp (to be placed inside WEB-INF/amqmobile folder)
// This file does not contains any logic, it is just a hook point to controller
<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %>
<%@ taglib prefix="form" uri="http://www.springframework.org/tags/form" %>
<html>
<head>
<title>Create AMQMobile</title>
</head>
<body>
<h1>Create AMQMobile</h1>
<p><b>You have successfully created a amqmobile.</b></p>
</body>
</html>
package com.examples.amqmobile;
import java.io.Serializable;
/**
* Created with IntelliJ IDEA.
* User: sverma
* Date: 18/10/12
* Time: 3:06 PM
*/
public class AMQMobile implements Serializable {
private String company;
private String model;
private int price;
public String getCompany() {
return company;
}
public void setCompany(String company) {
this.company = company;
}
public String getModel() {
return model;
}
public void setModel(String model) {
this.model = model;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
public String toString () {
StringBuilder sb = new StringBuilder().append("Company: ").append(company)
.append(", Model: ").append(model)
.append(", Price: ").append(price);
return sb.toString();
}
}
Java File - AMQMsgProducer.java
package com.examples.amqmobile;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import javax.jms.*;
/**
 * Created with IntelliJ IDEA.
 * User: sverma
 * Date: 5/12/12
 * Time: 10:59 AM
 */
@Service("amqMsgProducer")
public class AMQMsgProducer {
    @Autowired
    JmsTemplate jmsTemplate;
    public void produce() {
        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                AMQMobile amqMobile = new AMQMobile();
                amqMobile.setCompany("newCompany");
                amqMobile.setModel("iPhone5");
                amqMobile.setPrice(45000);
                ObjectMessage message = session.createObjectMessage(amqMobile);
                System.out.println("Sending on ["+jmsTemplate.getDefaultDestination()+"] object as " + amqMobile);
                return message;
            }
        });
    }
}
Java File - AMQMobileListener.java
package com.examples.amqmobile;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.springframework.beans.factory.annotation.Autowired;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
/**
 * Created with IntelliJ IDEA.
 * User: sverma
 * Date: 4/12/12
 * Time: 5:18 PM
 */
public class AMQMobileListener implements MessageListener {
    @Autowired
    AMQMsgSender amqMsgSender;
    public void onMessage(Message message) {
        System.out.println("OnMessage: Listener working... " + message);
        System.out.println("OnMessage: message instance matched = " + (message instanceof ActiveMQObjectMessage));
        if (message instanceof ActiveMQObjectMessage)
        {
            try
            {
                System.out.println("Converting object to AMQMobile class.");
                AMQMobile amqMobile = (AMQMobile) ((ActiveMQObjectMessage) message).getObject();
                System.out.println("OnMessage: Object received in Listener as " + amqMobile);
                /* call message sender to put message onto second queue */
                amqMsgSender.sendMessage(amqMobile);
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }
    }
}
Java File - AMQMsgSender.java
package com.examples.amqmobile;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import javax.jms.JMSException;
import javax.jms.Queue;
/**
 * Created with IntelliJ IDEA.
 * User: sverma
 * Date: 4/12/12
 * Time: 5:21 PM
 */
@Service ("amqMsgSender")
public class AMQMsgSender {
    @Autowired
    private JmsTemplate jmsTemplate;
    @Autowired
    private Queue reqAMQMobileEmailQueue;
    /**
     * Sends message using JMS Template.
     *
     *
     * @param msg the msg
     * @throws JMSException the jMS exception
     */
    public void sendMessage(AMQMobile msg) throws JMSException
    {
        System.out.println("About to put message on queue. Queue[" + reqAMQMobileEmailQueue.toString() + "] Message[" + msg + "]");
        jmsTemplate.convertAndSend(reqAMQMobileEmailQueue, msg);
    }
    /**
     * Sets the jms template.
     *
     * @param template the jms template
     */
    public void setJmsTemplate(JmsTemplate template)
    {
        this.jmsTemplate = template;
    }
}
File - AMQMobileEmailListener.java
package com.examples.amqmobile;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.MailSender;
import org.springframework.mail.SimpleMailMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.Date;
/**
 * Created with IntelliJ IDEA.
 * User: sverma
 * Date: 4/12/12
 * Time: 5:18 PM
 */
public class AMQMobileEmailListener implements MessageListener {
    @Autowired
    MailSender mailSender;
    public void onMessage(Message message) {
        System.out.println("OnMessage: Email Listener working... " + message);
        System.out.println("OnMessage: Email message instance matched = " + (message instanceof ActiveMQObjectMessage));
        if (message instanceof ActiveMQObjectMessage)
        {
            try
            {
                System.out.println("Converting object to AMQMobile class for email.");
                AMQMobile amqMobile = (AMQMobile) ((ActiveMQObjectMessage) message).getObject();
                System.out.println("OnMessage: Email Object received in Listener as " + amqMobile);
                /* call message sender to put message onto second queue */
                System.out.println("Sending mail for amqMobile " + amqMobile);
                SimpleMailMessage msg = new SimpleMailMessage();
                msg.setTo("sverma@impetus.co.in");
                msg.setFrom("sverma@impetus.co.in");
                msg.setSubject("Transforming from activemq");
                msg.setSentDate(new Date());
                msg.setText("Transforming using spring for mobile " + amqMobile);
                System.out.println("Waiting for 10 sec intentionally bf sending mail...");
                Thread.sleep(10000); // Mail server was throwing error of message sending limit in given time frame exceeds
                mailSender.send(msg);
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }
    }
}
Java File - AMQMobileController.java
package com.examples.amqmobile;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
/**
 * Created with IntelliJ IDEA.
 * User: sverma
 * Date: 18/10/12
 * Time: 7:38 PM
 */
@Controller
public class AMQMobileController {
    @Autowired
    AMQMsgProducer amqMsgProducer;
    @RequestMapping (value = "/amqmobile/form.html", method = RequestMethod.GET)
    public void getForm(Model model) {
        System.out.println("AMQController: Adding amq mobile in a model");
        amqMsgProducer.produce();
        System.out.println("AMQController: Msg send");
    }
}
Although I tried to keep my logic focus, but definitely I need to work more on formatting skills :)
Above Code is at GitHub, click here.
- Shailendra Verma
Note: I tested this application; after commenting Thread.sleep and mail.send command to save mailbox; with jMeter for 5000 simultaneous threads and there is no exception. All threads executed successfully. Hurrah !! Infact jMeter give up by outOfMemory exception :)

