Comment on page
RabbitMQ
RabbitMQ is lightweight and easy to deploy messaging platform. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements. Joule provides the ability to consume events using RabbitMQ messaging subscription
com.rabbitmq:amqp-client:5.16.0
rabbitConsumer:
host: localhost
exchange:
name: marketQuotes
type: TOPIC
routing:
keys:
- NASDAQ.IBM
- NASDAQ.MSFT
- NASDAQ.GOOG
deserializer:
transform: com.fractalworks.streams.examples.banking.data.QuoteToStreamEventTransformer
This example configures a RabbitMQ consumer to receive three event types using the
routing keys
elements from the topic
exchange type. Quote events are deserialised from a StreamEvent Json object to a Joule StreamEvent object.Available configuration parameters
Attribute | Description | Data Type | Required |
---|---|---|---|
host | RabbitMQ server hostname or Ip address | String Default: localhost | |
port | RabbitNQ server port, must be greater than 0. | Integer Default: 5672 | |
username | User name | String Default: guest | |
password | password | String Default: guest | |
virtualHost | Creates a logical group of connections, exchanges, queues, bindings, user permissions, etc. within an instance. | String Default: / | |
autoAck | Flag to consider if server should message acknowledged once messages have been delivered. False will provide explicit message acknowledgment. | Boolean Default: true | |
prefetchCount | QoS -maximum number of messages that the server will deliver, 0 if unlimited | Integer Default: 1 | |
global | QoS - true if the settings should be applied to the entire channel rather than each consumer | Boolean Default: true | |
durable | Set a durable queue (queue will survive a server restart) | Boolean Default: true | |
autoDelete | Server deletes queue when not in use | Boolean Default: true | |
exclusive | Exclusive queue to this connection | Boolean Default: false | |
arguments | Additional queue properties | Map<String, Object> Default: null | |
exchange | Exchange configuration | See Exchange Attributes section | |
routing | Routing configuration | See Routing Attributes section | |
deserializer | Deserialization configuration | See Deserialization Attributes section |
Attribute | Description | Data Type | Required |
---|---|---|---|
name | Name of exchange | String | |
type | In RabbitMQ, messages are published to an exchange and, depending on the type of exchange, the message gets routed to one or more queues. | TOPIC, DIRECT, FANOUT, HEADERS Default: TOPIC | |
arguments | Additional binding properties | Map<String, Object> Default: null |
exchange:
name: marketQuotes
type: DIRECT
Attribute | Description | Data Type | Required |
---|---|---|---|
keys | Consumers can selectively consume events sent to a topic exchange by specifying one or more words delimited by dots. Valid key format examples: 'nasdaq.ibm', 'dept.inventory.bolts'. There are two important special cases to define keys.
| String Default: All Constraints: Any number of dot separated word to a limit of 255 bytes |
This example will subscribe to all events that match the first component of the key
'NASDAQ'
routing:
keys:
- NASDAQ.#
This topic provides configuration parameters available object deserialization process.
Attribute | Description | Data Type | Required |
---|---|---|---|
transform | User provided implementation | Implementation of StreamEventParser | |
compressed | Not implemented | Boolean Default: false |
deserializer:
transform: com.fractalworks.streams.examples.banking.data.QuoteToStreamEventTransformer
compressed: false
All examples assume default values when not applied.
Sets up a simple consumer which subscribes to events directly off of a queue.
rabbitConsumer:
queue: point_to_point_queue
deserializer:
transform: com.fractalworks.streams.transport.serializers.QuoteToStreamEventTransformer
Subscribe to a Worker Queue that is used to distribute time-consuming tasks among multiple Joule workers.
rabbitConsumer:
queue: worker_queue
autoAck: false
durable: true
prefetchCount: 1
deserializer:
transform: com.fractalworks.streams.transport.serializers.QuoteToStreamEventTransformer
This example each consumer process will receive the same message. Because the exchange is configured as a
fanout
we do not need to use the routing
element, this will be covered in the next example rabbitConsumer:
exchange:
name: quotes_exchange
type: FANOUT
deserializer:
transform: com.fractalworks.streams.transport.serializers.QuoteToStreamEventTransformer
This time we're going to make it possible to subscribe only to a subset of the messages for a single consumer. Therefore each Joule consumer process will have its own
routing keys
they would subscribe to. In the case below a process using this configuration will only receive IBM and MSFT quotes.rabbitConsumer:
host: localhost
exchange:
name: quotes_exchange
type: DIRECT
routing:
keys:
- NASDAQ.IBM
- NASDAQ.MSFT
deserializer:
transform: com.fractalworks.streams.transport.serializers.QuoteToStreamEventTransformer
This example is very similar to the previous case whereby we subscribe to events that we are interested in. However, we are using a topic exchange that enabled us to use the dot notation and wildcards to subscribe to a wider set of events. In the case below we would receive all market venue quotes for both IBM and MSFT.
rabbitConsumer:
host: localhost
exchange:
name: quotes_exchange
type: TOPIC
routing:
keys:
- "*.IBM"
- "*.MSFT"
deserializer:
transform: com.fractalworks.streams.transport.serializers.QuoteToStreamEventTransformer
Last modified 11mo ago