Skip to main content

Fanout Exchange in AMQP

Description

rabbitmq-fanout-excange-diagr-01.png
  • One or more Queues bind to the Fanout exchange with no routing keys.
  • A publisher sends the Exchange a message.
  • The Exchange then forwards the message to the Queues unconditionally.

Requirements

  • JDK 17+
  • RabbitMQ on Docker

Install RabbitMQ on Docker

docker-compose.yml:

version: "3.6"
# https://docs.docker.com/compose/compose-file/

services:
rabbitmq:
image: 'rabbitmq:3.6-management-alpine'
ports:
# The standard AMQP protocol port
- '5672:5672'
# HTTP management UI
- '15672:15672'
environment:
# The location of the RabbitMQ server. "amqp" is the protocol;
# "rabbitmq" is the hostname. Note that there is not a guarantee
# that the server will start first! Telling the pika client library
# to try multiple times gets around this ordering issue.
AMQP_URL: 'amqp://rabbitmq?connection_attempts=5&retry_delay=5'
RABBITMQ_DEFAULT_USER: "guest"
RABBITMQ_DEFAULT_PASS: "guest"
networks:
- network
# volumes:
# - ./.docker/rabbitmq/etc/:/etc/rabbitmq/
# - ./.docker/rabbitmq/data/:/var/lib/rabbitmq/
# - ./.docker/rabbitmq/logs/:/var/log/rabbitmq/

networks:
# Declare our private network. We must declare one for the magic
# Docker DNS to work, but otherwise its default settings are fine.
network: {}
http://localhost:15672/

Project Structure

.
├── pom.xml
├── rabbitmq-docker
│   └── docker-compose.yml
├── readme.md
└── src
└── main
└── java
└── com
└── amqp
├── basic
│   └── queue
│   └── CommonConfigs.java
└── exchanges
├── ConnectionManager.java
└── FanoutExchange.java

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>rabbitmq-examples</groupId>
<artifactId>rabbitmq-fanout-exchange</artifactId>
<version>0.0.1-SNAPSHOT</version>
<description>RabbitMQ code examples using the amqp-client</description>

<properties>
<maven.compiler.target>22</maven.compiler.target>
<maven.compiler.source>22</maven.compiler.source>
<java.version>22</java.version>
</properties>

<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.24.0</version>
</dependency>
</dependencies>

</project>

CommonConfigs.java

package com.amqp.basic.queue;

public class CommonConfigs {
public static final String DEFAULT_QUEUE = "Queue-1";
public static final String AMQP_URL = "amqp://guest:guest@localhost:5672/";
}

ConnectionManager.java

package com.amqp.exchanges;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionManager {

private static Connection connection = null;

/**
* Create RabbitMQ Connection
*
* @return Connection
*/
public static Connection getConnection() {
if (connection == null) {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connection = connectionFactory.newConnection("amqp://guest:guest@localhost:5672/");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
return connection;
}
}

FanoutExchange.java

package com.amqp.exchanges;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* Unconditional message broadcast.
*/
public class FanoutExchange {

public static void declareExchange() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Declare my-fanout-exchange
channel.exchangeDeclare("my-fanout-exchange", BuiltinExchangeType.FANOUT, true);
channel.close();
}

public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do not share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();

//Create the Queues
channel.queueDeclare("MobileQ", true, false, false, null);
channel.queueDeclare("ACQ", true, false, false, null);
channel.queueDeclare("LightQ", true, false, false, null);

channel.close();
}

public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey) - routingKey != null
channel.queueBind("MobileQ", "my-fanout-exchange", "");
channel.queueBind("ACQ", "my-fanout-exchange", "");
channel.queueBind("LightQ", "my-fanout-exchange", "");
channel.close();
}

public static void subscribeMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("LightQ", true, ((consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("LightQ: " + new String(message.getBody()));
}), consumerTag -> {
System.out.println(consumerTag);
});

channel.basicConsume("ACQ", true, ((consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("ACQ: " + new String(message.getBody()));
}), consumerTag -> {
System.out.println(consumerTag);
});

channel.basicConsume("MobileQ", true, ((consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("MobileQ: " + new String(message.getBody()));
}), consumerTag -> {
System.out.println(consumerTag);
});
}

public static void publishMessage() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
String message = "Main Power is ON";
channel.basicPublish("my-fanout-exchange", "", null, message.getBytes());
channel.close();
}

public static void main(String[] args) throws IOException, TimeoutException {
FanoutExchange.declareQueues();
FanoutExchange.declareExchange();
FanoutExchange.declareBindings();

//Threads created to publish-subscribe asynchronously
Thread subscribe = new Thread() {
@Override
public void run() {
try {
FanoutExchange.subscribeMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
};

Thread publish = new Thread() {
@Override
public void run() {
try {
FanoutExchange.publishMessage();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
};

subscribe.start();
publish.start();
}
}

Run

public static void main(String[] args) throws IOException, TimeoutException { ...

Output

amq.ctag-m8cjdQniE8tXYQjHw44f7Q
LightQ: Main Power is ON
amq.ctag-1rsJPVZkXgwMBvTBEJQJ6w
ACQ: Main Power is ON
amq.ctag-6X_dOQzw7rjiPOUQDK1bfg
MobileQ: Main Power is ON

Source Code:
https://github.com/ZbCiok/zjc-examples/tree/main/reactive-messaging/rabbitmq/rabbitmq-fanout-exchange