Joule
  • Welcome to Joule's Docs
  • Why Joule?
    • Joule capabilities
  • What is Joule?
    • Key features
    • The tech stack
  • Use case enablement
    • Use case building framework
  • Concepts
    • Core concepts
    • Low code development
    • Unified execution engine
    • Batch and stream processing
    • Continuous metrics
    • Key Joule data types
      • StreamEvent object
      • Contextual data
      • GeoNode
  • Tutorials
    • Getting started
    • Build your first use case
    • Stream sliding window quote analytics
    • Advanced tutorials
      • Custom missing value processor
      • Stateless Bollinger band analytics
      • IoT device control
  • FAQ
  • Glossary
  • Components
    • Pipelines
      • Use case anatomy
      • Data priming
        • Types of import
      • Processing unit
      • Group by
      • Emit computed events
      • Telemetry auditing
    • Processors
      • Common attributes
      • Filters
        • By type
        • By expression
        • Send on delta
        • Remove attributes
        • Drop all events
      • Enrichment
        • Key concepts
          • Anatomy of enrichment DSL
          • Banking example
        • Metrics
        • Dynamic contextual data
          • Caching architecture
        • Static contextual data
      • Transformation
        • Field Tokeniser
        • Obfuscation
          • Encryption
          • Masking
          • Bucketing
          • Redaction
      • Triggers
        • Change Data Capture
        • Business rules
      • Stream join
        • Inner stream joins
        • Outer stream joins
        • Join attributes & policy
      • Event tap
        • Anatomy of a Tap
        • SQL Queries
    • Analytics
      • Analytic tools
        • User defined analytics
          • Streaming analytics example
          • User defined analytics
          • User defined scripts
          • User defined functions
            • Average function library
        • Window analytics
          • Tumbling window
          • Sliding window
          • Aggregate functions
        • Analytic functions
          • Stateful
            • Exponential moving average
            • Rolling Sum
          • Stateless
            • Normalisation
              • Absolute max
              • Min max
              • Standardisation
              • Mean
              • Log
              • Z-Score
            • Scaling
              • Unit scale
              • Robust Scale
            • Statistics
              • Statistic summaries
              • Weighted moving average
              • Simple moving average
              • Count
            • General
              • Euclidean
        • Advanced analytics
          • Geospatial
            • Entity geo tracker
            • Geofence occupancy trigger
            • Geo search
            • IP address resolver
            • Reverse geocoding
            • Spatial Index
          • HyperLogLog
          • Distinct counter
      • ML inferencing
        • Feature engineering
          • Scripting
          • Scaling
          • Transform
        • Online predictive analytics
        • Model audit
        • Model management
      • Metrics engine
        • Create metrics
        • Apply metrics
        • Manage metrics
        • Priming metrics
    • Contextual data
      • Architecture
      • Configuration
      • MinIO S3
      • Apache Geode
    • Connectors
      • Sources
        • Kafka
          • Ingestion
        • RabbitMQ
          • Further RabbitMQ configurations
        • MQTT
          • Topic wildcards
          • Session management
          • Last Will and Testament
        • Rest endpoints
        • MinIO S3
        • File watcher
      • Sinks
        • Kafka
        • RabbitMQ
          • Further configurations
        • MQTT
          • Persistent messaging
          • Last Will and Testament
        • SQL databases
        • InfluxDB
        • MongoDB
        • Geode
        • WebSocket endpoint
        • MinIO S3
        • File transport
        • Slack
        • Email
      • Serialisers
        • Serialisation
          • Custom transform example
          • Formatters
        • Deserialisers
          • Custom parsing example
    • Observability
      • Enabling JMX for Joule
      • Meters
      • Metrics API
  • DEVELOPER GUIDES
    • Setting up developer environment
      • Environment setup
      • Build and deploy
      • Install Joule
        • Install Docker demo environment
        • Install with Docker
        • Install from source
        • Install Joule examples
    • Joulectl CLI
    • API Endpoints
      • Mangement API
        • Use case
        • Pipelines
        • Data connectors
        • Contextual data
      • Data access API
        • Query
        • Upload
        • WebSocket
      • SQL support
    • Builder SDK
      • Connector API
        • Sources
          • StreamEventParser API
        • Sinks
          • CustomTransformer API
      • Processor API
      • Analytics API
        • Create custom metrics
        • Define analytics
        • Windows API
        • SQL queries
      • Transformation API
        • Obfuscation API
        • FieldTokenizer API
      • File processing
      • Data types
        • StreamEvent
        • ReferenceDataObject
        • GeoNode
    • System configuration
      • System properties
  • Deployment strategies
    • Deployment Overview
    • Single Node
    • Cluster
    • GuardianDB
    • Packaging
      • Containers
      • Bare metal
  • Product updates
    • Public Roadmap
    • Release Notes
      • v1.2.0 Join Streams with stateful analytics
      • v1.1.0 Streaming analytics enhancements
      • v1.0.4 Predictive stream processing
      • v1.0.3 Contextual SQL based metrics
    • Change history
Powered by GitBook
On this page
  • Overview
  • Examples & DSL attributes
  • Attributes schema
  • Exchange attributes schema
  • Exchange example
  • Routing attributes schema
  • Routing example
  • Serialisation attributes schema
  • Serialiser example
  • Additional resources

Was this helpful?

  1. Components
  2. Connectors
  3. Sinks

RabbitMQ

RabbitMQ is lightweight and easy to deploy messaging platform for event-based data integration

PreviousKafkaNextFurther configurations

Last updated 6 months ago

Was this helpful?

Overview

RabbitMQ is a lightweight and easy-to-deploy messaging platform that supports multiple messaging protocols. It can be configured in distributed and federated setups to meet high-scale and high-availability requirements.

Joule enables publishing to RabbitMQ queues, exchanges and topics, providing flexible event-driven architectures.

This page covers how to configure RabbitMQ publishers within Joule, including queue-based eventing, work queues for task distribution, publish / subscribe models and routing with dynamic keys.

The examples demonstrate how to serialise events into JSON, configure exchanges and use routing keys for targeted message delivery.

Additionally, it provides configuration details for common patterns like worker queues and topic-based exchanges, offering scalability and efficiency in event processing.

Client library:

Examples & DSL attributes

This example configures the RabbitMQ Publisher to send StreamEvents as JSON-formatted messages to a queue named quotes_queue.

Here's a breakdown:

  1. queue The events will be published to the quotes_queue, a named queue in RabbitMQ.

  2. serializer The events are serialized using the jsonFormatter with UTF-8 encoding, ensuring the data is formatted as JSON.

This setup sends JSON-encoded events to the quotes_queue in RabbitMQ, with the message content encoded in UTF-8.

rabbitPublisher:
  queue: quotes_queue
  
  serializer:
    formatter:
      jsonFormatter:
        encoding: UTF-8

Attributes schema

Attribute
Description
Data Type
Required

host

RabbitMQ server hostname or Ip address

String

Default: localhost

port

RabbitMQ server port, must be greater than 0.

Integer

Default: 5672

username

User name

String

Default: guest

password

password

String

Default: guest

virtualHost

Creates a logical group of connections, exchanges, queues, bindings, user permissions, etc. within an instance.

String

Default: /

autoAck

Flag to consider if server should message acknowledged once messages have been delivered. False will provide explicit message acknowledgment.

Boolean

Default: true

global

QoS - true if the settings should be applied to the entire channel rather than each consumer

Boolean

Default: true

durable

Set a durable queue (queue will survive a server restart)

Boolean

Default: true

autoDelete

Server deletes queue when not in use

Boolean

Default: true

exclusive

Exclusive queue to this connection

Boolean

Default: false

arguments

Additional queue properties

Map<String, Object>

Default: null

exchange

Exchange configuration

routing

Routing configuration

serializer

Serialisation configuration

Exchange attributes schema

Attribute
Description
Data Type
Required

name

Name of exchange

String

type

In RabbitMQ, messages are published to an exchange and, depending on the type of exchange, the message gets routed to one or more queues

TOPIC, DIRECT, FANOUT, HEADERS

Default: TOPIC

arguments

Additional binding properties

Map<String, Object>

Default: null

Exchange example

This example configures the RabbitMQ Publisher to send events to an exchange instead of a queue:

  1. exchange The events are published to an exchange named marketQuotes.

    1. type The exchange type is set to TOPIC, meaning events can be routed based on routing keys that include wildcards. This allows consumers to subscribe to specific types of messages using patterns like market.IBM or market.*.

This setup sends events to the marketQuotes topic exchange, enabling more flexible routing of messages based on the routing keys used by consumers.

rabbitPublisher:
    ...

    exchange:
        name: marketQuotes
        type: TOPIC

Routing attributes schema

Attribute
Description
Data Type
Required

event fields

List of event fields to create dynamic routing keys

String

Default: None

Routing example

In this example, the RabbitMQ Publisher is configured to dynamically create a routing key based on the market and symbol fields from the StreamEvent:

  1. routing The routing key is constructed using the values of the market and symbol fields in the event. For example, if the market is nasdaq and the symbol is IBM, the routing key nasdaq.IBM will be created.

This enables fine-grained control over which events are routed to specific consumers, as they can subscribe to events using specific routing keys like nasdaq.IBM or nasdaq.*.

rabbitPublisher:
  ...

    routing:
        event fields:
          - market
          - symbol

Serialisation attributes schema

Attribute
Description
Data Type
Required

transform

User provided implementation that transforms the StreamEvent in to a domain class. The resulting domain class must implement serialisable interface

Implementation of StreamEventParser

formatter

If a custom transformer has not been provided the for formatter setting will be applied to the processed StreamEvent object

Default: jsonFormatter

compressed

Not implemented

Boolean

Default: false

Serialiser example

This example demonstrates how to configure the serialiser for the RabbitMQ Publisher:

  1. serializer The jsonFormatter is used to serialise the events.

    1. encoding The events are encoded in UTF-8, ensuring that the message content is in a standard JSON format that can be easily consumed by downstream applications.

This setup serialises the events into JSON format with UTF-8 encoding before publishing them to RabbitMQ.

rabbitPublisher:
  ...

  serializer:
    formatter:
      jsonFormatter:
        encoding: UTF-8

Additional resources

See for further information

See section

See section

See section

Offical

CloudAMQP

com.rabbitmq:amqp-client:5.16.0
RabbitMQ documentation
documentation
vhost documentation
Exchange attributes
Routing attributes
Serialization attributes