Topic Exchange in AMQP
Description

- A message Queue binds to an Exchange with a routing key pattern (P).
- A publisher sends a message with a routing key (K) to the Topic Exchange.
- The message is passed to the Queue if P matches with K. The routing key matching is decided as discussed below.
- The consumer subscribing the Queue receives the message.
Requirements
- JDK 17+
- RabbitMQ on Docker
Install RabbitMQ on Docker
docker-compose.yml:
version: "3.6"
# https://docs.docker.com/compose/compose-file/
services:
rabbitmq:
image: 'rabbitmq:3.6-management-alpine'
ports:
# The standard AMQP protocol port
- '5672:5672'
# HTTP management UI
- '15672:15672'
environment:
# The location of the RabbitMQ server. "amqp" is the protocol;
# "rabbitmq" is the hostname. Note that there is not a guarantee
# that the server will start first! Telling the pika client library
# to try multiple times gets around this ordering issue.
AMQP_URL: 'amqp://rabbitmq?connection_attempts=5&retry_delay=5'
RABBITMQ_DEFAULT_USER: "guest"
RABBITMQ_DEFAULT_PASS: "guest"
networks:
- network
# volumes:
# - ./.docker/rabbitmq/etc/:/etc/rabbitmq/
# - ./.docker/rabbitmq/data/:/var/lib/rabbitmq/
# - ./.docker/rabbitmq/logs/:/var/log/rabbitmq/
networks:
# Declare our private network. We must declare one for the magic
# Docker DNS to work, but otherwise its default settings are fine.
network: {}
http://localhost:15672/
Project Structure
.
├── pom.xml
├── rabbitmq-docker
│ └── docker-compose.yml
├── readme.md
└── src
└── main
└── java
└── com
└── amqp
├── basic
│ └── queue
│ └── CommonConfigs.java
└── exchanges
├── ConnectionManager.java
└── TopicExchange.java
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>rabbitmq-examples</groupId>
<artifactId>rabbitmq-topic-exchange</artifactId>
<version>0.0.1-SNAPSHOT</version>
<description>RabbitMQ code examples using the amqp-client</description>
<properties>
<maven.compiler.target>22</maven.compiler.target>
<maven.compiler.source>22</maven.compiler.source>
<java.version>22</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.24.0</version>
</dependency>
</dependencies>
</project>
CommonConfigs.java
package com.amqp.basic.queue;
public class CommonConfigs {
public static final String DEFAULT_QUEUE = "Queue-1";
public static final String AMQP_URL = "amqp://guest:guest@localhost:5672/";
}
ConnectionManager.java
package com.amqp.exchanges;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionManager {
private static Connection connection = null;
/**
* Create RabbitMQ Connection
*
* @return Connection
*/
public static Connection getConnection() {
if (connection == null) {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connection = connectionFactory.newConnection("amqp://guest:guest@localhost:5672/");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
return connection;
}
}
TopicExchange.java
package com.amqp.exchanges;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicExchange {
/**
* Declare a Topic Exchange with the name my-topic-exchange.
*
* @throws IOException
* @throws TimeoutException
*/
public static void declareExchange() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create Topic Exchange
channel.exchangeDeclare("my-topic-exchange", BuiltinExchangeType.TOPIC, true);
channel.close();
}
/**
* Declare Queues to receive respective interested messages.
*
* @throws IOException
* @throws TimeoutException
*/
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do not share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//Create the Queues
channel.queueDeclare("HealthQ", true, false, false, null);
channel.queueDeclare("SportsQ", true, false, false, null);
channel.queueDeclare("EducationQ", true, false, false, null);
channel.close();
}
/**
* Declare Bindings - register interests using routing key patterns.
*
* @throws IOException
* @throws TimeoutException
*/
public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey) - routingKey != null
channel.queueBind("HealthQ", "my-topic-exchange", "health.*");
channel.queueBind("SportsQ", "my-topic-exchange", "#.sports.*");
channel.queueBind("EducationQ", "my-topic-exchange", "#.education");
channel.close();
}
/**
* Assign Consumers to each of the Queue.
*
* @throws IOException
* @throws TimeoutException
*/
public static void subscribeMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("HealthQ", true, ((consumerTag, message) -> {
System.out.println("\n\n=========== Health Queue ==========");
System.out.println(consumerTag);
System.out.println("HealthQ: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("SportsQ", true, ((consumerTag, message) -> {
System.out.println("\n\n ============ Sports Queue ==========");
System.out.println(consumerTag);
System.out.println("SportsQ: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("EducationQ", true, ((consumerTag, message) -> {
System.out.println("\n\n ============ Education Queue ==========");
System.out.println(consumerTag);
System.out.println("EducationQ: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
}
/**
* Publish Messages with different routing keys.
*
* @throws IOException
* @throws TimeoutException
*/
public static void publishMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
String message = "Drink a lot of Water and stay Healthy!";
//channel.basicPublish("my-topic-exchange", "sports.sports.sports", null, message.getBytes());
channel.basicPublish("my-topic-exchange", "health.education", null, message.getBytes());
message = "Learn something new everyday";
channel.basicPublish("my-topic-exchange", "education", null, message.getBytes());
message = "Stay fit in Mind and Body";
channel.basicPublish("my-topic-exchange", "education.health", null, message.getBytes());
channel.close();
}
/**
* Execute the methods.
*
* @param args
* @throws IOException
* @throws TimeoutException
*/
public static void main(String[] args) throws IOException, TimeoutException {
TopicExchange.declareExchange();
TopicExchange.declareQueues();
TopicExchange.declareBindings();
Thread subscribe = new Thread(() -> {
try {
TopicExchange.subscribeMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
Thread publish = new Thread(() -> {
try {
TopicExchange.publishMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
subscribe.start();
publish.start();
}
}
Run
public static void main(String[] args) throws IOException, TimeoutException { ...
Output
=========== Health Queue ==========
amq.ctag-dfqDl2dbY6pw85w7wgpqGA
HealthQ: Drink a lot of Water and stay Healthy!
Envelope(deliveryTag=1, redeliver=false, exchange=my-topic-exchange, routingKey=health.education)
============ Education Queue ==========
amq.ctag-5O8xw4-vhPh0oK1VX9_DRw
EducationQ: Drink a lot of Water and stay Healthy!
Envelope(deliveryTag=2, redeliver=false, exchange=my-topic-exchange, routingKey=health.education)
============ Education Queue ==========
amq.ctag-5O8xw4-vhPh0oK1VX9_DRw
EducationQ: Learn something new everyday
Envelope(deliveryTag=3, redeliver=false, exchange=my-topic-exchange, routingKey=education)
Source Code:
https://github.com/ZbCiok/zjc-examples/tree/main/reactive-messaging/rabbitmq/rabbitmq-topic-exchange