Fanout Exchange in AMQP
Description

- One or more Queues bind to the Fanout exchange with no routing keys.
- A publisher sends the Exchange a message.
- The Exchange then forwards the message to the Queues unconditionally.
Requirements
- JDK 17+
- RabbitMQ on Docker
Install RabbitMQ on Docker
docker-compose.yml:
version: "3.6"
# https://docs.docker.com/compose/compose-file/
services:
rabbitmq:
image: 'rabbitmq:3.6-management-alpine'
ports:
# The standard AMQP protocol port
- '5672:5672'
# HTTP management UI
- '15672:15672'
environment:
# The location of the RabbitMQ server. "amqp" is the protocol;
# "rabbitmq" is the hostname. Note that there is not a guarantee
# that the server will start first! Telling the pika client library
# to try multiple times gets around this ordering issue.
AMQP_URL: 'amqp://rabbitmq?connection_attempts=5&retry_delay=5'
RABBITMQ_DEFAULT_USER: "guest"
RABBITMQ_DEFAULT_PASS: "guest"
networks:
- network
# volumes:
# - ./.docker/rabbitmq/etc/:/etc/rabbitmq/
# - ./.docker/rabbitmq/data/:/var/lib/rabbitmq/
# - ./.docker/rabbitmq/logs/:/var/log/rabbitmq/
networks:
# Declare our private network. We must declare one for the magic
# Docker DNS to work, but otherwise its default settings are fine.
network: {}
http://localhost:15672/
Project Structure
.
├── pom.xml
├── rabbitmq-docker
│ └── docker-compose.yml
├── readme.md
└── src
└── main
└── java
└── com
└── amqp
├── basic
│ └── queue
│ └── CommonConfigs.java
└── exchanges
├── ConnectionManager.java
└── FanoutExchange.java
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>rabbitmq-examples</groupId>
<artifactId>rabbitmq-fanout-exchange</artifactId>
<version>0.0.1-SNAPSHOT</version>
<description>RabbitMQ code examples using the amqp-client</description>
<properties>
<maven.compiler.target>22</maven.compiler.target>
<maven.compiler.source>22</maven.compiler.source>
<java.version>22</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.24.0</version>
</dependency>
</dependencies>
</project>
CommonConfigs.java
package com.amqp.basic.queue;
public class CommonConfigs {
public static final String DEFAULT_QUEUE = "Queue-1";
public static final String AMQP_URL = "amqp://guest:guest@localhost:5672/";
}
ConnectionManager.java
package com.amqp.exchanges;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionManager {
private static Connection connection = null;
/**
* Create RabbitMQ Connection
*
* @return Connection
*/
public static Connection getConnection() {
if (connection == null) {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connection = connectionFactory.newConnection("amqp://guest:guest@localhost:5672/");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
return connection;
}
}
FanoutExchange.java
package com.amqp.exchanges;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Unconditional message broadcast.
*/
public class FanoutExchange {
public static void declareExchange() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Declare my-fanout-exchange
channel.exchangeDeclare("my-fanout-exchange", BuiltinExchangeType.FANOUT, true);
channel.close();
}
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do not share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//Create the Queues
channel.queueDeclare("MobileQ", true, false, false, null);
channel.queueDeclare("ACQ", true, false, false, null);
channel.queueDeclare("LightQ", true, false, false, null);
channel.close();
}
public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey) - routingKey != null
channel.queueBind("MobileQ", "my-fanout-exchange", "");
channel.queueBind("ACQ", "my-fanout-exchange", "");
channel.queueBind("LightQ", "my-fanout-exchange", "");
channel.close();
}
public static void subscribeMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("LightQ", true, ((consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("LightQ: " + new String(message.getBody()));
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("ACQ", true, ((consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("ACQ: " + new String(message.getBody()));
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("MobileQ", true, ((consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("MobileQ: " + new String(message.getBody()));
}), consumerTag -> {
System.out.println(consumerTag);
});
}
public static void publishMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
String message = "Main Power is ON";
channel.basicPublish("my-fanout-exchange", "", null, message.getBytes());
channel.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
FanoutExchange.declareQueues();
FanoutExchange.declareExchange();
FanoutExchange.declareBindings();
//Threads created to publish-subscribe asynchronously
Thread subscribe = new Thread() {
@Override
public void run() {
try {
FanoutExchange.subscribeMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
};
Thread publish = new Thread() {
@Override
public void run() {
try {
FanoutExchange.publishMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
};
subscribe.start();
publish.start();
}
}
Run
public static void main(String[] args) throws IOException, TimeoutException { ...
Output
amq.ctag-m8cjdQniE8tXYQjHw44f7Q
LightQ: Main Power is ON
amq.ctag-1rsJPVZkXgwMBvTBEJQJ6w
ACQ: Main Power is ON
amq.ctag-6X_dOQzw7rjiPOUQDK1bfg
MobileQ: Main Power is ON
Source Code:
https://github.com/ZbCiok/zjc-examples/tree/main/reactive-messaging/rabbitmq/rabbitmq-fanout-exchange