Direct Exchange in AMQP
Description

- A Producer sends a message to an Exchange.
- A Queue binds to an Exchange using a routing key.
- Usually, there is more than one Queue bind to an Exchange using the same/different routing keys.
- The message sent to the Exchange contains a routing key. Based on the routing key the message is forwarded to one or more Queues.
- Consumers subscribe to the Queue receives the message and processes it.
Requirements
- JDK 17+
- RabbitMQ on Docker
Install RabbitMQ on Docker
docker-compose.yml:
version: "3.6"
# https://docs.docker.com/compose/compose-file/
services:
rabbitmq:
image: 'rabbitmq:3.6-management-alpine'
ports:
# The standard AMQP protocol port
- '5672:5672'
# HTTP management UI
- '15672:15672'
environment:
# The location of the RabbitMQ server. "amqp" is the protocol;
# "rabbitmq" is the hostname. Note that there is not a guarantee
# that the server will start first! Telling the pika client library
# to try multiple times gets around this ordering issue.
AMQP_URL: 'amqp://rabbitmq?connection_attempts=5&retry_delay=5'
RABBITMQ_DEFAULT_USER: "guest"
RABBITMQ_DEFAULT_PASS: "guest"
networks:
- network
# volumes:
# - ./.docker/rabbitmq/etc/:/etc/rabbitmq/
# - ./.docker/rabbitmq/data/:/var/lib/rabbitmq/
# - ./.docker/rabbitmq/logs/:/var/log/rabbitmq/
networks:
# Declare our private network. We must declare one for the magic
# Docker DNS to work, but otherwise its default settings are fine.
network: {}
http://localhost:15672/
Project Structure
.
├── pom.xml
├── rabbitmq-docker
│ └── docker-compose.yml
├── readme.md
└── src
└── main
└── java
└── com
└── amqp
├── basic
│ └── queue
│ └── CommonConfigs.java
└── exchanges
├── ConnectionManager.java
└── DirectExchange.java
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>rabbitmq-examples</groupId>
<artifactId>rabbitmq-direct-exchange</artifactId>
<version>0.0.1-SNAPSHOT</version>
<description>RabbitMQ code examples using the amqp-client</description>
<properties>
<maven.compiler.target>22</maven.compiler.target>
<maven.compiler.source>22</maven.compiler.source>
<java.version>22</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.24.0</version>
</dependency>
</dependencies>
</project>
CommonConfigs.java
package com.amqp.basic.queue;
public class CommonConfigs {
public static final String DEFAULT_QUEUE = "Queue-1";
public static final String AMQP_URL = "amqp://guest:guest@localhost:5672/";
}
ConnectionManager.java
package com.amqp.exchanges;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionManager {
private static Connection connection = null;
/**
* Create RabbitMQ Connection
*
* @return Connection
*/
public static Connection getConnection() {
if (connection == null) {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connection = connectionFactory.newConnection("amqp://guest:guest@localhost:5672/");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
return connection;
}
}
DirectExchange.java
package com.amqp.exchanges;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Selective message broadcast with routingkey filter.
*/
public class DirectExchange {
//Declare the exchange
public static void declareExchange() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Declare my-direct-exchange DIRECT exchange
channel.exchangeDeclare("my-direct-exchange", BuiltinExchangeType.DIRECT, true);
channel.close();
}
//Declare the Queues
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do not share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//Create the Queues
channel.queueDeclare("MobileQ", true, false, false, null);
channel.queueDeclare("ACQ", true, false, false, null);
channel.queueDeclare("LightQ", true, false, false, null);
channel.close();
}
//Create the Bindings
public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey)
channel.queueBind("MobileQ", "my-direct-exchange", "personalDevice");
channel.queueBind("ACQ", "my-direct-exchange", "homeAppliance");
channel.queueBind("LightQ", "my-direct-exchange", "homeAppliance");
channel.close();
}
//Create the Subscribers
public static void subscribeMessage() throws IOException {
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("LightQ", true, ((consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("LightQ:" + new String(message.getBody()));
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("ACQ", true, ((consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("ACQ:" + new String(message.getBody()));
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("MobileQ", true, ((consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("MobileQ:" + new String(message.getBody()));
}), consumerTag -> {
System.out.println(consumerTag);
});
}
//Publish the messages
public static void publishMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
String message = "Direct message - Turn on the Home Appliances ";
channel.basicPublish("my-direct-exchange", "homeAppliance", null, message.getBytes());
channel.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
DirectExchange.declareQueues();
DirectExchange.declareExchange();
DirectExchange.declareBindings();
//Threads created to publish-subscribe asynchronously
Thread subscribe = new Thread(){
@Override
public void run() {
try {
DirectExchange.subscribeMessage();
} catch (IOException e) {
e.printStackTrace();
}
}
};
Thread publish = new Thread(){
@Override
public void run() {
try {
DirectExchange.publishMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
};
subscribe.start();
publish.start();
}
}
Run
public static void main(String[] args) throws IOException, TimeoutException { ...
Output
amq.ctag-blalUvgtHM6x9NgjQA9mEg
LightQ:Direct message - Turn on the Home Appliances
amq.ctag-Fuoi24ca2qGAALCs5geDpQ
ACQ:Direct message - Turn on the Home Appliances
Source Code:
https://github.com/ZbCiok/zjc-examples/tree/main/reactive-messaging/rabbitmq/rabbitmq-direct-exchange
Based on: