Headers Exchange in AMQP
Description

- One or more Queues bind (linked) to a Headers Exchange using header properties( H ).
- A Producer sends a message to this Exchange with a Header property (MH).
- If MH matches with H, the message is forwarded to the Queue. The Headers matching algorithm is discussed next.
- The consumers listening to the Queue receives the message and processes it.
Requirements
- JDK 17+
- RabbitMQ on Docker
Install RabbitMQ on Docker
docker-compose.yml:
version: "3.6"
# https://docs.docker.com/compose/compose-file/
services:
rabbitmq:
image: 'rabbitmq:3.6-management-alpine'
ports:
# The standard AMQP protocol port
- '5672:5672'
# HTTP management UI
- '15672:15672'
environment:
# The location of the RabbitMQ server. "amqp" is the protocol;
# "rabbitmq" is the hostname. Note that there is not a guarantee
# that the server will start first! Telling the pika client library
# to try multiple times gets around this ordering issue.
AMQP_URL: 'amqp://rabbitmq?connection_attempts=5&retry_delay=5'
RABBITMQ_DEFAULT_USER: "guest"
RABBITMQ_DEFAULT_PASS: "guest"
networks:
- network
# volumes:
# - ./.docker/rabbitmq/etc/:/etc/rabbitmq/
# - ./.docker/rabbitmq/data/:/var/lib/rabbitmq/
# - ./.docker/rabbitmq/logs/:/var/log/rabbitmq/
networks:
# Declare our private network. We must declare one for the magic
# Docker DNS to work, but otherwise its default settings are fine.
network: {}
http://localhost:15672/
Project Structure
.
├── pom.xml
├── rabbitmq-docker
│ └── docker-compose.yml
├── readme.md
└── src
└── main
└── java
└── com
└── amqp
├── basic
│ └── queue
│ └── CommonConfigs.java
└── exchanges
├── ConnectionManager.java
└ ── HeadersExchange.java
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>rabbitmq-examples</groupId>
<artifactId>rabbitmq-headers-exchange</artifactId>
<version>0.0.1-SNAPSHOT</version>
<description>RabbitMQ code examples using the amqp-client</description>
<properties>
<maven.compiler.target>22</maven.compiler.target>
<maven.compiler.source>22</maven.compiler.source>
<java.version>22</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.24.0</version>
</dependency>
</dependencies>
</project>
CommonConfigs.java
package com.amqp.basic.queue;
public class CommonConfigs {
public static final String DEFAULT_QUEUE = "Queue-1";
public static final String AMQP_URL = "amqp://guest:guest@localhost:5672/";
}
ConnectionManager.java
package com.amqp.exchanges;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionManager {
private static Connection connection = null;
/**
* Create RabbitMQ Connection
*
* @return Connection
*/
public static Connection getConnection() {
if (connection == null) {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connection = connectionFactory.newConnection("amqp://guest:guest@localhost:5672/");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
return connection;
}
}
HeadersExchange.java
package com.amqp.exchanges;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class HeadersExchange {
/**
* Declare a Headers Exchange.
*
* @throws IOException
* @throws TimeoutException
*/
public static void declareExchange() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Declare my-header-exchange
channel.exchangeDeclare("my-header-exchange", BuiltinExchangeType.HEADERS, true);
channel.close();
}
/**
* Declare 3 Queues to demonstrate the example.
*
* @throws TimeoutException
*/
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do not share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//Create the Queues
channel.queueDeclare("HealthQ", true, false, false, null);
channel.queueDeclare("SportsQ", true, false, false, null);
channel.queueDeclare("EducationQ", true, false, false, null);
channel.close();
}
/**
* Set the Bindings between Exchange and Queues.
*
* @throws IOException
* @throws TimeoutException
*/
public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey, headers) - routingKey != null
Map<String, Object> healthArgs = new HashMap<>();
healthArgs.put("x-match", "any"); //Match any of the header
healthArgs.put("h1", "Header1");
healthArgs.put("h2", "Header2");
channel.queueBind("HealthQ", "my-header-exchange", "", healthArgs);
Map<String, Object> sportsArgs = new HashMap<>();
sportsArgs.put("x-match", "all"); //Match all of the header
sportsArgs.put("h1", "Header1");
sportsArgs.put("h2", "Header2");
channel.queueBind("SportsQ", "my-header-exchange", "", sportsArgs);
Map<String, Object> educationArgs = new HashMap<>();
educationArgs.put("x-match", "any"); //Match any of the header
educationArgs.put("h1", "Header1");
educationArgs.put("h2", "Header2");
channel.queueBind("EducationQ", "my-header-exchange", "", educationArgs);
channel.close();
}
/**
* Subscribe the Queues.
*
* @throws IOException
* @throws TimeoutException
*/
public static void subscribeMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("HealthQ", true, ((consumerTag, message) -> {
System.out.println("\n\n=========== Health Queue ==========");
System.out.println(consumerTag);
System.out.println("HealthQ: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("SportsQ", true, ((consumerTag, message) -> {
System.out.println("\n\n ============ Sports Queue ==========");
System.out.println(consumerTag);
System.out.println("SportsQ: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("EducationQ", true, ((consumerTag, message) -> {
System.out.println("\n\n ============ Education Queue ==========");
System.out.println(consumerTag);
System.out.println("EducationQ: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
}
public static void publishMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
String message = "Header Exchange example 1";
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("h1", "Header1");
headerMap.put("h3", "Header3");
BasicProperties properties = new BasicProperties()
.builder().headers(headerMap).build();
channel.basicPublish("my-header-exchange", "", properties, message.getBytes());
message = "Header Exchange example 2";
headerMap.put("h2", "Header2");
properties = new BasicProperties()
.builder().headers(headerMap).build();
channel.basicPublish("my-header-exchange", "", properties, message.getBytes());
channel.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
HeadersExchange.declareQueues();
HeadersExchange.declareExchange();
HeadersExchange.declareBindings();
//Threads created to publish-subscribe asynchronously
Thread subscribe = new Thread() {
@Override
public void run() {
try {
HeadersExchange.subscribeMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
};
Thread publish = new Thread() {
@Override
public void run() {
try {
HeadersExchange.publishMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
};
subscribe.start();
publish.start();
}
}
Run
public static void main(String[] args) throws IOException, TimeoutException { ...
Output
=========== Health Queue ==========
amq.ctag-OGzUfuJGaGMSSwlhBDrsKg
HealthQ: Header Exchange example 1
Envelope(deliveryTag=1, redeliver=false, exchange=my-header-exchange, routingKey=)
=========== Health Queue ==========
amq.ctag-OGzUfuJGaGMSSwlhBDrsKg
HealthQ: Header Exchange example 2
Envelope(deliveryTag=2, redeliver=false, exchange=my-header-exchange, routingKey=)
============ Sports Queue ==========
amq.ctag-nAg7T-JAqEgSPFihJNKQwQ
SportsQ: Header Exchange example 2
Envelope(deliveryTag=3, redeliver=false, exchange=my-header-exchange, routingKey=)
============ Education Queue ==========
amq.ctag-vovwDsMTh2_SGTM1ykY42A
EducationQ: Header Exchange example 1
Envelope(deliveryTag=4, redeliver=false, exchange=my-header-exchange, routingKey=)
============ Education Queue ==========
amq.ctag-vovwDsMTh2_SGTM1ykY42A
EducationQ: Header Exchange example 2
Envelope(deliveryTag=5, redeliver=false, exchange=my-header-exchange, routingKey=)
Source Code:
https://github.com/ZbCiok/zjc-examples/tree/main/reactive-messaging/rabbitmq/rabbitmq-headers-exchange