Home kellton

Main navigation

  • Services
    • Digital Business Services
      • AI & ML
        • Utilitarian AI
        • Predictive Analytics
        • Generative AI
        • Machine Learning
        • Data Science
        • RPA
      • Digital Experience
        • Product Strategy & Consulting
        • Product Design
        • Product Management
      • Product Engineering
        • Digital Application Development
        • Mobile Engineering
        • IoT & Wearables Solutions
        • Quality Engineering
      • Data & Analytics
        • Data Consulting
        • Data Engineering
        • Data Migration & Modernization
        • Analytics Services
        • Integration & API
      • Cloud Engineering
        • Cloud Consulting
        • Cloud Migration
        • Cloud Managed Services
        • DevSecOps
      • NextGen Services
        • Blockchain
        • Web3
        • Metaverse
        • Digital Signage Solutions
        • Spatial Computing
    • SAP
      • SAP Services
        • S/4HANA Implementations
        • SAP AMS Support
        • SAP Automation
        • SAP Security & GRC
        • SAP Value Added Solutions
        • Other SAP Implementations
      • View All Services
  • Platforms & Products
    • Audit.io
    • Kellton4Health
    • Kellton4NFT
    • Kellton4Commerce
    • KLGAME
    • tHRive
    • Optima
    • Our Data Accelerators
      • Data DigitalTwin
      • SmartScope
      • DataLift
      • SchemaLift
      • Reconcile360
    • View All Products
  • Industries
    • Fintech, Banking, Financial Services & Insurance
    • Retail, E-Commerce & Distribution
    • Pharma, Healthcare & Life Sciences
    • Non-Profit, Government & Education
    • Travel, Logistics & Hospitality
    • HiTech, SaaS, ISV & Communications
    • Manufacturing, Automotive & Chemicals
    • Oil,Gas & Mining
    • Energy & Utilities
    • View All Industries
  • Insights
    • Blogs
    • Brochures
    • Success Stories
    • News / Announcements
    • Webinars
    • White Papers
  • Careers
    • Life At Kellton
    • Jobs
  • About
    • About Us
    • Our Partners
    • Our Leadership
    • Testimonials
    • Analyst Recognitions
    • Investors
    • Privacy-Policy
    • Contact Us
    • Our Delivery Centers
      • India Delivery Center
      • Europe Delivery Center
Search
  1. Home
  2. All Insights
  3. Blogs

How to Integrate RabbitMQ with webMethods?

Data Engineering & AI
Integration & API
January 14 , 2019
Posted By:
Nageswara Reddy Chintakuntla
linkedin
How to Integrate RabbitMQ with webMethods

Other recent blogs

Generative AI companies
ROI of Generative AI: Measuring its impact and value for your business
April 15 , 2025
Data migration cost
Breaking down the cost of Data Migration: Is it worth in 2025
April 10 , 2025
Data Migration trends 2025
Revealing top Data Migration trends and predictions to watch
April 01 , 2025

Let's talk

Reach out, we'd love to hear from you!

Image CAPTCHA
Get new captcha!
Enter the characters shown in the image.

RabbitMQ, a message broker, is enterprise-level message-queuing software. It’s equipped with multiple features for reliable delivery, routing, and federation to cater to extensible business requirements beyond the throughput. RabbitMQ currently powers 35000+ projects for startups and large enterprises. The fact that it can implement AMQP, an open wire protocol for messaging with powerful routing features, is what makes RabbitMQ highly popular for an open-source messaging queuing broker. It’s one of the earliest enterprise-grade messaging software to achieve quality compliance in terms of features, dev tools, client libraries, and quality documentation.

Java has always had messaging standards like JMS. However, it was a pain to find the right message broker for non-Java applications, which had distributed messaging, but limited to integration scenarios, monolithic or microservices. With the advent of AMQP, cross-language flexibility has become feasible for open-source message brokers.

Guide: RabbitMQ Message Broker Integration with Software AG's webMethods

RabbitMQ can be integrated into webMethods using Java Client programs. Here is a step-by-step guide to performing smooth integration.
 

Technologies used

  • webMethods Integration Server 9.X
  • RabbitMQ Server Version 3.7.4 with Erlang 20.3
  • Jar files to place in Integration Server classpath (rabbitmq-client.jar & amqp-client-5.5.1.jar)
     

RabbitMQ Webmethods Integration1


Once both webMethods Integration Server and RabbitMQ Server are installed, consumers on the Queue need to be created to listen to the messages from RabbitMQ and invoke flow service by passing data received in message payload to perform business logic in the flow service.

Receiving messages from RabbitMQ

Following are the steps to perform successful RabbitMQ integration with Java service:

Step 1: Create RabbitMQ Connection
We need to create connection to RabbitMQ for receiving messages from RabbitMQ to webMethods.

com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();

factory.setHost("localhost");
factory.setUsername("Guest");
factory.setPassword("Guest");
factory.setPort(5672);
Connection connection = factory.newConnection();

Step 2: Create Channel
Channel channel = connection.createChannel();
channel.queueDeclare("RMQ_Out_Queue", true, false, false, null);

Step 3: Create Consumer
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
String headers = properties.getHeaders().toString();
};
channel.basicConsume("RMQ_Out_Queue",true,consumer);

Step 4: Invoke flow service by passing inputs from Receive Client code
NSName nsName = NSName.create( "RabbitMQTest.services:printMessage" );
com.wm.app.b2b.server.User user = com.wm.app.b2b.server.UserManager.getUser("Administrator");
Session s = StateManager.createContext(0x7fffffffL, "system", user);
s.setUser(user);
s.clearModified();
Service.doThreadInvoke("RabbitMQTest.services","printMessage", s, idata);
StateManager.deleteContext(s.getSessionID());

Step 5: Implement flow service 
The flow service can perform actual business logic with the inputs passed from the received java client program.

RabbitMQ Webmethods Integration

Step 6: Publish message from RabbitMQ

Once the consumer is created for the queue from the Java client program, publish message on the same queue. 

RabbitMQ Webmethods Integration 1RabbitMQ Message Help

Step 7: Once the message is published successfully in RabbitMQ, the consumer created by the received java client program gets the message and passes it to the flow service. You can see the debug message printed in Server logs of Integration Server as shown below.

RabbitMQ Webmethods Integration 4

Send messages to RabbitMQ

Step 1: Create RabbitMQ Connection
We need to create connection to RabbitMQ for receiving messages from webMethods to RabbitMQ.

com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
                        
factory.setHost("localhost");
factory.setUsername("Guest");
factory.setPassword("Guest");
factory.setPort(5672);
Connection connection = factory.newConnection();

Step 2: Create Channel
Channel channel = connection.createChannel();
channel.queueDeclare("RMQ_In_Queue", true, false, false, null);

Step 3: Publish Message
String message = "Hello!! this message is from webMethods.";
channel.basicPublish("", "RMQ_In_Queue", new AMQP.BasicProperties.Builder().headers(headers).build(), message.getBytes("UTF-8"));
Once you run the Send client program successfully, you can see the message being sent to queue in RabbitMQ.

RabbitMQ Webmethods Integration Message

Complete Java Client code 

Receiver Client Code:

package RabbitMQTest.client;

import com.wm.data.*;
import com.wm.util.Values;
import com.wm.app.b2b.server.Service;
import com.wm.app.b2b.server.ServiceException;
import com.wm.app.b2b.server.Session;
import com.wm.app.b2b.server.StateManager;
import com.wm.lang.ns.NSName;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.Connection;
import com.rabbitmq.client.Channel;
import com.wm.util.JournalLogger;

public final class receiveMessage_SVC

{

    /** 
     * The primary method for the Java service
     *
     * @param pipeline
     *            The IData pipeline
     * @throws ServiceException
     */
    public static final void receiveMessage(IData pipeline) throws ServiceException {
        String log="";
        String MsgFromQueue = "";
        String queueName = "RMQ_Out_Queue";
        try{
            com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
        
            factory.setHost("localhost");
            factory.setUsername("Guest");
            factory.setPassword("Guest");
            factory.setPort(5672);
            com.rabbitmq.client.Connection connection = factory.newConnection();
        
            Channel channel = connection.createChannel();
            channel.queueDeclare(queueName, true, false, false, null);
        
            JournalLogger.log(4, JournalLogger.FAC_FLOW_SVC, JournalLogger.DEBUG,"function", "[**] Waiting for messages [**]");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    String headers = properties.getHeaders().toString();
        
                    JournalLogger.log(4, JournalLogger.FAC_FLOW_SVC, JournalLogger.DEBUG,"function", message); 
                    IData idata = new IDataFactory().create();
                    IDataCursor idc = idata.getCursor();
                    IDataUtil.put(idc, "jsonString", headers+"::"+message);
                    IData pipelineIn = IDataUtil.clone(idata);
                    JournalLogger.log(4, JournalLogger.FAC_FLOW_SVC, JournalLogger.DEBUG,"@@@@", "Before calling service.doInvoke()");
                    try {
                        NSName nsName = NSName.create( "RabbitMQTest.services:printMessage" );
                        com.wm.app.b2b.server.User user = com.wm.app.b2b.server.UserManager.getUser("Administrator");
                        Session s = StateManager.createContext(0x7fffffffL, "system", user);
                        s.setUser(user);
                        s.clearModified();
        
                        Service.doThreadInvoke("RabbitMQTest.services","printMessage", s, idata);
                        StateManager.deleteContext(s.getSessionID());
                        JournalLogger.log(4, JournalLogger.FAC_FLOW_SVC, JournalLogger.DEBUG,"@@@@", "After calling Service.doInvoke()");
                    } catch (Exception e) 
                    {
                        JournalLogger.log(4, JournalLogger.FAC_FLOW_SVC, JournalLogger.DEBUG,"*** EXCEPTION***", e.toString());
                        e.printStackTrace();
                    }
                }
            };
            channel.basicConsume(queueName,true,consumer);            
            log = "Consumer created Successfully for RabbitMQ Queue";
            IDataCursor pipelineCursor = pipeline.getCursor();
            IDataUtil.put( pipelineCursor, "message",log );
            pipelineCursor.destroy();
        }
        catch(Exception e){
            IDataCursor pipelineCursor = pipeline.getCursor();
            IDataUtil.put( pipelineCursor, "Error", e.getStackTrace());
            IDataUtil.put( pipelineCursor, "log-levl", log);
            pipelineCursor.destroy();
        }
            
    }
    
    // --- <<IS-BEGIN-SHARED-SOURCE-AREA>> ---
    
    
    
    // --- <<IS-END-SHARED-SOURCE-AREA>> ---

    /**
     * The service implementations given below are read-only and show only the
     * method definitions and not the complete implementation.
     */
    public static final void sendMessage(IData pipeline) throws ServiceException {
    }

    final static receiveMessage_SVC _instance = new receiveMessage_SVC();

    static receiveMessage_SVC _newInstance() { return new receiveMessage_SVC(); }

    static receiveMessage_SVC _cast(Object o) { return (receiveMessage_SVC)o; }

Sender Client Code:

package RabbitMQTest.client;

import com.wm.data.*;
import com.wm.util.Values;
import com.wm.app.b2b.server.Service;
import com.wm.app.b2b.server.ServiceException;
import com.wm.app.b2b.server.Session;
import com.wm.app.b2b.server.StateManager;
import com.wm.lang.ns.NSName;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.Connection;
import com.rabbitmq.client.Channel;
import com.wm.util.JournalLogger;

public final class sendMessage_SVC

{

    /** 
     * The primary method for the Java service
     *
     * @param pipeline
     *            The IData pipeline
     * @throws ServiceException
     */
    public static final void sendMessage(IData pipeline) throws ServiceException {
        String queueName = "RMQ_In_Queue";
        try{
            com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("Guest");
            factory.setPassword("Guest");
            factory.setPort(5672);
            com.rabbitmq.client.Connection connection = factory.newConnection();
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("ID",  12345);
            headers.put("Name", "Kellton");
            Channel channel = connection.createChannel();
            channel.queueDeclare(queueName, true, false, false, null);
            String message = "Hello!! this message is from webMethods.";
            channel.basicPublish("", queueName, new AMQP.BasicProperties.Builder().headers(headers).build(), message.getBytes("UTF-8"));
            channel.close();
            connection.close();
        
            IDataCursor pipelineCursor = pipeline.getCursor();
            IDataUtil.put( pipelineCursor, "message", "Message Sent To RabbitMQ Successfully !" );
            pipelineCursor.destroy();
        }
        catch(Exception e){
            IDataCursor pipelineCursor = pipeline.getCursor();
            IDataUtil.put( pipelineCursor, "Error", e.getLocalizedMessage() );
            IDataUtil.put( pipelineCursor, "StackTrace", e.getStackTrace() );
            IDataUtil.put( pipelineCursor, "ErrorString", e.toString());
            pipelineCursor.destroy();
        }
            
    }
    
    // --- <<IS-BEGIN-SHARED-SOURCE-AREA>> ---
    
    
    
    // --- <<IS-END-SHARED-SOURCE-AREA>> ---

    /**
     * The service implementations given below are read-only and show only the
     * method definitions and not the complete implementation.
     */
    public static final void receiveMessage(IData pipeline) throws ServiceException {
    }

    final static sendMessage_SVC _instance = new sendMessage_SVC();

    static sendMessage_SVC _newInstance() { return new sendMessage_SVC(); }

    static sendMessage_SVC _cast(Object o) { return (sendMessage_SVC)o; }

}

Want to know more?

What is data migration
Blog
The what, how, and why of data migration
February 03 , 2025
Data Migration Challenges
Blog
Top data migration mistakes to avoid in 2025
January 27 , 2025
Artificial Intelligence Trends 2025
Blog
6 Artificial Intelligence trends for 2025 and beyond
January 24 , 2025

North America: +1.844.469.8900

Asia: +91.124.469.8900

Europe: +44.203.807.6911

Email: ask@kellton.com

Footer menu right

  • Services
  • Platforms & Products
  • Industries
  • Insights

Footer Menu Left

  • About
  • News
  • Careers
  • Contact
LinkedIn Twitter Youtube
clutch Badge

© 2024 Kellton