# RabbitMQ

## Overview

RabbitMQ is a **lightweight and easy-to-deploy messaging platform** that supports multiple messaging protocols. It can be configured in distributed and federated setups to meet high-scale and high-availability requirements.

Joule enables publishing to RabbitMQ queues, exchanges and topics, providing flexible event-driven architectures.

This page covers how to configure RabbitMQ publishers within Joule, including queue-based eventing, work queues for task distribution, publish / subscribe models and routing with dynamic keys.

The examples demonstrate how to **serialise events into JSON**, configure exchanges and use routing keys for targeted message delivery.

Additionally, it provides **configuration details for common pattern**s like worker queues and topic-based exchanges, offering scalability and efficiency in event processing.

{% hint style="info" %}
**Client library**: [com.rabbitmq:amqp-client:5.16.0](https://mvnrepository.com/artifact/com.rabbitmq/amqp-client/5.16.0)
{% endhint %}

## Examples & DSL attributes

This example configures the **RabbitMQ Publisher** to send `StreamEvents` as **JSON-formatted messages** to a queue named `quotes_queue`.

Here's a breakdown:

1. <mark style="color:green;">**queue**</mark>\
   The events will be published to the `quotes_queue`, a named queue in RabbitMQ.
2. <mark style="color:green;">**serializer**</mark>\
   The events are serialized using the `jsonFormatter` with UTF-8 encoding, ensuring the data is formatted as JSON.

This setup sends JSON-encoded events to the `quotes_queue` in RabbitMQ, with the message content encoded in UTF-8.

```yaml
rabbitPublisher:
  queue: quotes_queue
  
  serializer:
    formatter:
      jsonFormatter:
        encoding: UTF-8
```

### Attributes schema

<table><thead><tr><th width="214">Attribute</th><th width="217">Description</th><th width="215">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>host</td><td>RabbitMQ server hostname or Ip address</td><td><p>String</p><p>Default: localhost</p></td><td>false</td></tr><tr><td>port </td><td>RabbitMQ server port, must be greater than 0.</td><td><p>Integer</p><p>Default: 5672</p></td><td>false</td></tr><tr><td>username</td><td>User name</td><td><p>String</p><p>Default: guest</p></td><td>false</td></tr><tr><td>password</td><td>password</td><td><p>String</p><p>Default: guest</p></td><td>false</td></tr><tr><td>virtualHost</td><td><p>Creates a logical group of connections, exchanges, queues, bindings, user permissions, etc. within an instance.</p><p></p><p>See <a href="https://www.cloudamqp.com/blog/what-is-a-rabbitmq-vhost.html">vhost documentation</a> for further information</p></td><td><p>String</p><p>Default: /</p></td><td>false</td></tr><tr><td>autoAck</td><td>Flag to consider if server should message acknowledged once messages have been delivered. False will provide explicit  message acknowledgment.</td><td><p>Boolean</p><p>Default: true</p></td><td>false</td></tr><tr><td>global</td><td>QoS - true if the settings should be applied to the entire channel rather than each consumer</td><td><p>Boolean</p><p>Default: true</p></td><td>false</td></tr><tr><td>durable</td><td>Set a durable queue (queue will survive a server restart)</td><td><p>Boolean</p><p>Default: true</p></td><td>false</td></tr><tr><td>autoDelete</td><td>Server deletes queue when not in use</td><td><p>Boolean</p><p>Default: true</p></td><td>false</td></tr><tr><td>exclusive</td><td>Exclusive queue to this connection</td><td><p>Boolean</p><p>Default: false</p></td><td>false</td></tr><tr><td>arguments</td><td>Additional queue properties</td><td><p>Map&#x3C;String, Object></p><p>Default: null</p></td><td>false</td></tr><tr><td>exchange</td><td>Exchange configuration</td><td>See <a href="#exchange-attributes-schema">Exchange attributes</a> section</td><td>false</td></tr><tr><td>routing</td><td>Routing configuration</td><td>See <a href="#routing-attributes-schema">Routing attributes</a> section</td><td>false</td></tr><tr><td>serializer</td><td>Serialisation configuration</td><td>See <a href="#serialisation-attributes-schema">Serialization attributes</a> section</td><td>false</td></tr></tbody></table>

### **Exchange attributes schema**

<table><thead><tr><th width="200">Attribute</th><th width="220">Description</th><th width="222">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>name</td><td>Name of exchange</td><td>String</td><td>true</td></tr><tr><td>type</td><td>In RabbitMQ, messages are published to an exchange and, depending on the type of exchange, the message gets routed to one or more queues</td><td><p>TOPIC, DIRECT, FANOUT, HEADERS</p><p>Default: TOPIC</p></td><td>false</td></tr><tr><td>arguments</td><td>Additional binding properties</td><td><p>Map&#x3C;String, Object></p><p>Default: null</p></td><td>false</td></tr></tbody></table>

### **Exchange example**

This example configures the **RabbitMQ Publisher** to send events to an **exchange** instead of a queue:

1. <mark style="color:green;">**exchange**</mark>\
   The events are published to an exchange named `marketQuotes`.
   1. <mark style="color:green;">**type**</mark>\
      The exchange type is set to `TOPIC`, meaning events can be routed based on routing keys that include wildcards. This allows consumers to subscribe to specific types of messages using patterns like `market.IBM` or `market.*`.

This setup sends events to the `marketQuotes` topic exchange, enabling more flexible routing of messages based on the routing keys used by consumers.

```yaml
rabbitPublisher:
    ...

    exchange:
        name: marketQuotes
        type: TOPIC
```

### **Routing attributes schema**

<table><thead><tr><th width="200">Attribute</th><th width="220">Description</th><th width="222">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>event fields</td><td>List of event fields to create dynamic routing keys</td><td><p>String</p><p>Default: None</p></td><td>false</td></tr></tbody></table>

### **Routing example**

In this example, the **RabbitMQ Publisher** is configured to dynamically create a routing key based on the `market` and `symbol` fields from the **StreamEvent**:

1. <mark style="color:green;">**routing**</mark>\
   The routing key is constructed using the values of the `market` and `symbol` fields in the event. For example, if the `market` is `nasdaq` and the `symbol` is `IBM`, the routing key `nasdaq.IBM` will be created.

This enables fine-grained control over which events are routed to specific consumers, as they can subscribe to events using specific routing keys like `nasdaq.IBM` or `nasdaq.*`.

```yaml
rabbitPublisher:
  ...

    routing:
        event fields:
          - market
          - symbol
```

### **Serialisation** attributes schema

<table><thead><tr><th width="200">Attribute</th><th width="220">Description</th><th width="222">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>transform</td><td>User provided implementation that transforms the StreamEvent in to a domain class. The resulting domain class must implement serialisable interface </td><td>Implementation of StreamEventParser</td><td>false</td></tr><tr><td>formatter</td><td>If a custom transformer has not been provided the for formatter setting will be applied to the processed <code>StreamEvent</code> object</td><td>Default: jsonFormatter</td><td>false</td></tr><tr><td>compressed</td><td>Not implemented</td><td><p>Boolean </p><p>Default: false</p></td><td>false</td></tr></tbody></table>

### **Serialiser example**

This example demonstrates how to configure the **serialiser** for the RabbitMQ Publisher:

1. <mark style="color:green;">**serializer**</mark>\
   The `jsonFormatter` is used to serialise the events.
   1. <mark style="color:green;">**encoding**</mark>\
      The events are encoded in **UTF-8**, ensuring that the message content is in a standard JSON format that can be easily consumed by downstream applications.

This setup serialises the events into JSON format with UTF-8 encoding before publishing them to RabbitMQ.

```yaml
rabbitPublisher:
  ...

  serializer:
    formatter:
      jsonFormatter:
        encoding: UTF-8
```

## Additional resources

* Offical [RabbitMQ documentation](https://www.rabbitmq.com/documentation.html)
* CloudAMQP [documentation](https://www.cloudamqp.com/docs/index.html)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.fractalworks.io/joule/components/connectors/sinks/rabbitmq.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
