Pipelines
Register, list, detail and unregister stream processing pipelines
Overview
Streams are the backbone of all processing. Joule Management API provides the ability to register, list, detail and undeploy stream pipelines.
For further information on the structure of the content payload refer to this documentation.
Example stream content
{
"stream": {
"name": "basic_tumbling_window_pipeline",
"enabled": true,
"eventTimeType": "EVENT_TIME",
"sources": [
"nasdaq_quotes_stream"
],
"processing unit": {
"pipeline": [
{
"time window": {
"emitting type": "tumblingQuoteAnalytics",
"aggregate functions": {
"FIRST": [
"ask"
],
"LAST": [
"ask"
]
},
"policy": {
"type": "tumblingTime",
"window size": 1000
}
}
}
]
},
"emit": {
"select": "symbol, ask_FIRST, ask_LAST"
},
"group by": [
"symbol"
]
}
}
Register a use case stream and validate it before starting up
Successful registered stream
Failed to register stream due to malformed or missing DSL
Invalid stream definition
Stream has already been registered
POST /joule/management/stream/register HTTP/1.1
Host:
Content-Type: application/json
Accept: */*
Content-Length: 4470
{
"name": "text",
"enabled": true,
"sources": [
"text"
],
"bufferSize": 1,
"checkPointProcessing": true,
"groupbyKey": {
"keys": [
"text"
],
"partitionKey": 1
},
"groupByAttributes": [
"text"
],
"streamTimeType": "INGEST_TIME",
"propertiesPath": "text",
"consumerQueueSize": 1,
"consumingFrequency": 1,
"publishingQueueSize": 1,
"publishingFrequency": 1,
"telemetryAuditingSpecification": {
"name": "text",
"enabled": true,
"rawTelemetrySpecification": {
"cloneEvents": true,
"auditFrequency": 1
},
"processedTelemetrySpecification": {
"cloneEvents": true,
"auditFrequency": 1
}
},
"validFrom": "2025-08-22T09:54:09.450Z",
"validTo": "2025-08-22T09:54:09.450Z",
"asyncProcessing": true,
"initialisationSpecification": {
"tableImportProcessor": {
"parquetImportConfigurations": [
{
"schema": "text",
"files": [
"text"
],
"table": "text",
"asView": true,
"index": {
"schema": "text",
"fields": [
"text"
],
"unique": true
}
}
],
"schema": "text",
"name": "text",
"enabled": true
},
"modelImporter": {
"empty": true,
"": {},
"orDefault": {},
"name": "text",
"enabled": true
},
"name": "text",
"enabled": true
},
"processingUnitSpecification": {
"name": "text",
"enabled": true,
"metricsEngineSpecification": {
"metricsDefinitionSpecification": {
"metrics": [
{
"name": "text",
"query": "text",
"tableDefinition": "text",
"metricKey": "text",
"compactionSpecification": {
"frequency": 1,
"timeUnit": "NANOSECONDS",
"userDefinedCompactionQuery": "text"
},
"cleanOnStart": true,
"enabled": true
}
],
"name": "text",
"enabled": true
},
"policy": {
"timeUnit": "NANOSECONDS",
"frequency": 1,
"startupDelay": 1
},
"name": "text",
"enabled": true
},
"processorPipeline": [
{
"uUID": "text",
"metrics": {
"metricsMap": {
"ANY_ADDITIONAL_PROPERTY": {
"": 1,
"andSet": 1,
"andIncrement": 1,
"andDecrement": 1,
"andAdd": 1,
"andUpdate": 1,
"andAccumulate": 1,
"plain": 1,
"opaque": 1,
"acquire": 1
}
},
"metric": 1
}
}
]
},
"selectProcessor": {
"context": {
"": {},
"present": true,
"empty": true
},
"uUID": "text",
"name": "text",
"properties": {
"property": "text",
"empty": true,
"": {},
"orDefault": {}
},
"cloneEvent": true,
"stateful": true,
"dataStores": {
"ANY_ADDITIONAL_PROPERTY": {
"name": "text",
"enabled": true,
"initialImage": {},
"storeName": "text",
"getInitialImage": true,
"initialImageQuery": "text",
"store": {
"metaData": {
"description": "text",
"constantJoinFields": "[Circular Reference]",
"specification": {
"name": "text",
"enabled": true
}
},
"storeType": "KEY_VALUE",
"initialImage": {
"": {},
"present": true,
"empty": true
}
},
"storeType": "KEY_VALUE"
}
},
"groupbyKey": {
"keys": [
"text"
],
"partitionKey": 1
},
"jouleDBQueryInterface": {
"name": "text",
"enabled": true,
"initialImage": {},
"storeName": "text",
"getInitialImage": true,
"initialImageQuery": "text",
"store": {
"metaData": {
"description": "text",
"constantJoinFields": "[Circular Reference]",
"specification": {
"name": "text",
"enabled": true
}
},
"storeType": "KEY_VALUE",
"initialImage": {
"": {},
"present": true,
"empty": true
}
},
"storeType": "KEY_VALUE"
},
"metrics": {
"metricsMap": {
"ANY_ADDITIONAL_PROPERTY": {
"": 1,
"andSet": 1,
"andIncrement": 1,
"andDecrement": 1,
"andAdd": 1,
"andUpdate": 1,
"andAccumulate": 1,
"plain": 1,
"opaque": 1,
"acquire": 1
}
},
"metric": 1
},
"enabled": true,
"eventsProcessed": 1,
"eventsDiscarded": 1,
"eventsFailed": 1,
"eventsReceived": 1,
"eventsIgnored": 1,
"eventsQueued": 1,
"responseEventType": "text",
"projectionExpression": "text",
"selectAttributes": [
{
"originalExpression": "text",
"metricFamily": "text",
"metrics": "text",
"metricKey": "text",
"metricTokens": [
"text"
],
"alias": "text",
"type": "SEARCH_PATH",
"searchPath": [
"text"
],
"mathProcessor": {
"context": {
"": {},
"present": true,
"empty": true
},
"uUID": "text",
"name": "text",
"properties": {
"property": "text",
"empty": true,
"": {},
"orDefault": {}
},
"cloneEvent": true,
"stateful": true,
"dataStores": {
"ANY_ADDITIONAL_PROPERTY": "[Circular Reference]"
},
"groupbyKey": {
"keys": [
"text"
],
"partitionKey": 1
},
"jouleDBQueryInterface": {
"name": "text",
"enabled": true,
"initialImage": {},
"storeName": "text",
"getInitialImage": true,
"initialImageQuery": "text",
"store": {
"metaData": {
"description": "text",
"constantJoinFields": "[Circular Reference]",
"specification": {
"name": "text",
"enabled": true
}
},
"storeType": "KEY_VALUE",
"initialImage": {
"": {},
"present": true,
"empty": true
}
},
"storeType": "KEY_VALUE"
},
"metrics": {
"metricsMap": {
"ANY_ADDITIONAL_PROPERTY": {
"": 1,
"andSet": 1,
"andIncrement": 1,
"andDecrement": 1,
"andAdd": 1,
"andUpdate": 1,
"andAccumulate": 1,
"plain": 1,
"opaque": 1,
"acquire": 1
}
},
"metric": 1
},
"enabled": true,
"eventsProcessed": 1,
"eventsDiscarded": 1,
"eventsFailed": 1,
"eventsReceived": 1,
"eventsIgnored": 1,
"eventsQueued": 1,
"previousComputedValue": {},
"responseDataType": "DOUBLE",
"expression": "text",
"language": "text",
"variables": {
"ANY_ADDITIONAL_PROPERTY": {}
}
},
"queryPredicate": "text",
"parameters": [
"text"
],
"aliase": "text"
}
]
}
}
No content
Get the stream processing specification of a registered stream
Name of stream configuration.
Successful detail provided for stream
Stream not found
Internal Joule error. Check log files.
Unknown stream.
GET /joule/management/stream/detail?name=text HTTP/1.1
Host:
Accept: */*
{
"name": "text",
"enabled": true,
"sources": [
"text"
],
"bufferSize": 1,
"checkPointProcessing": true,
"groupbyKey": {
"keys": [
"text"
],
"partitionKey": 1
},
"groupByAttributes": [
"text"
],
"streamTimeType": "INGEST_TIME",
"propertiesPath": "text",
"consumerQueueSize": 1,
"consumingFrequency": 1,
"publishingQueueSize": 1,
"publishingFrequency": 1,
"telemetryAuditingSpecification": {
"name": "text",
"enabled": true,
"rawTelemetrySpecification": {
"cloneEvents": true,
"auditFrequency": 1
},
"processedTelemetrySpecification": {
"cloneEvents": true,
"auditFrequency": 1
}
},
"validFrom": "2025-08-22T09:54:09.450Z",
"validTo": "2025-08-22T09:54:09.450Z",
"asyncProcessing": true,
"initialisationSpecification": {
"tableImportProcessor": {
"parquetImportConfigurations": [
{
"schema": "text",
"files": [
"text"
],
"table": "text",
"asView": true,
"index": {
"schema": "text",
"fields": [
"text"
],
"unique": true
}
}
],
"schema": "text",
"name": "text",
"enabled": true
},
"modelImporter": {
"empty": true,
"": {},
"orDefault": {},
"name": "text",
"enabled": true
},
"name": "text",
"enabled": true
},
"processingUnitSpecification": {
"name": "text",
"enabled": true,
"metricsEngineSpecification": {
"metricsDefinitionSpecification": {
"metrics": [
{
"name": "text",
"query": "text",
"tableDefinition": "text",
"metricKey": "text",
"compactionSpecification": {
"frequency": 1,
"timeUnit": "NANOSECONDS",
"userDefinedCompactionQuery": "text"
},
"cleanOnStart": true,
"enabled": true
}
],
"name": "text",
"enabled": true
},
"policy": {
"timeUnit": "NANOSECONDS",
"frequency": 1,
"startupDelay": 1
},
"name": "text",
"enabled": true
},
"processorPipeline": [
{
"uUID": "text",
"metrics": {
"metricsMap": {
"ANY_ADDITIONAL_PROPERTY": {
"": 1,
"andSet": 1,
"andIncrement": 1,
"andDecrement": 1,
"andAdd": 1,
"andUpdate": 1,
"andAccumulate": 1,
"plain": 1,
"opaque": 1,
"acquire": 1
}
},
"metric": 1
}
}
]
},
"selectProcessor": {
"context": {
"": {},
"present": true,
"empty": true
},
"uUID": "text",
"name": "text",
"properties": {
"property": "text",
"empty": true,
"": {},
"orDefault": {}
},
"cloneEvent": true,
"stateful": true,
"dataStores": {
"ANY_ADDITIONAL_PROPERTY": {
"name": "text",
"enabled": true,
"initialImage": {},
"storeName": "text",
"getInitialImage": true,
"initialImageQuery": "text",
"store": {
"metaData": {
"description": "text",
"constantJoinFields": "[Circular Reference]",
"specification": {
"name": "text",
"enabled": true
}
},
"storeType": "KEY_VALUE",
"initialImage": {
"": {},
"present": true,
"empty": true
}
},
"storeType": "KEY_VALUE"
}
},
"groupbyKey": {
"keys": [
"text"
],
"partitionKey": 1
},
"jouleDBQueryInterface": {
"name": "text",
"enabled": true,
"initialImage": {},
"storeName": "text",
"getInitialImage": true,
"initialImageQuery": "text",
"store": {
"metaData": {
"description": "text",
"constantJoinFields": "[Circular Reference]",
"specification": {
"name": "text",
"enabled": true
}
},
"storeType": "KEY_VALUE",
"initialImage": {
"": {},
"present": true,
"empty": true
}
},
"storeType": "KEY_VALUE"
},
"metrics": {
"metricsMap": {
"ANY_ADDITIONAL_PROPERTY": {
"": 1,
"andSet": 1,
"andIncrement": 1,
"andDecrement": 1,
"andAdd": 1,
"andUpdate": 1,
"andAccumulate": 1,
"plain": 1,
"opaque": 1,
"acquire": 1
}
},
"metric": 1
},
"enabled": true,
"eventsProcessed": 1,
"eventsDiscarded": 1,
"eventsFailed": 1,
"eventsReceived": 1,
"eventsIgnored": 1,
"eventsQueued": 1,
"responseEventType": "text",
"projectionExpression": "text",
"selectAttributes": [
{
"originalExpression": "text",
"metricFamily": "text",
"metrics": "text",
"metricKey": "text",
"metricTokens": [
"text"
],
"alias": "text",
"type": "SEARCH_PATH",
"searchPath": [
"text"
],
"mathProcessor": {
"context": {
"": {},
"present": true,
"empty": true
},
"uUID": "text",
"name": "text",
"properties": {
"property": "text",
"empty": true,
"": {},
"orDefault": {}
},
"cloneEvent": true,
"stateful": true,
"dataStores": {
"ANY_ADDITIONAL_PROPERTY": "[Circular Reference]"
},
"groupbyKey": {
"keys": [
"text"
],
"partitionKey": 1
},
"jouleDBQueryInterface": {
"name": "text",
"enabled": true,
"initialImage": {},
"storeName": "text",
"getInitialImage": true,
"initialImageQuery": "text",
"store": {
"metaData": {
"description": "text",
"constantJoinFields": "[Circular Reference]",
"specification": {
"name": "text",
"enabled": true
}
},
"storeType": "KEY_VALUE",
"initialImage": {
"": {},
"present": true,
"empty": true
}
},
"storeType": "KEY_VALUE"
},
"metrics": {
"metricsMap": {
"ANY_ADDITIONAL_PROPERTY": {
"": 1,
"andSet": 1,
"andIncrement": 1,
"andDecrement": 1,
"andAdd": 1,
"andUpdate": 1,
"andAccumulate": 1,
"plain": 1,
"opaque": 1,
"acquire": 1
}
},
"metric": 1
},
"enabled": true,
"eventsProcessed": 1,
"eventsDiscarded": 1,
"eventsFailed": 1,
"eventsReceived": 1,
"eventsIgnored": 1,
"eventsQueued": 1,
"previousComputedValue": {},
"responseDataType": "DOUBLE",
"expression": "text",
"language": "text",
"variables": {
"ANY_ADDITIONAL_PROPERTY": {}
}
},
"queryPredicate": "text",
"parameters": [
"text"
],
"aliase": "text"
}
]
}
}
Unregister a stream processing specification from platform
Name of stream to unregistered
quoteStream
Successfully unregistered stream
Failed to unregister stream . Check Joule service log files.
Unknown stream.
Stream name must be provided.
DELETE /joule/management/stream/unregister?name=quoteStream HTTP/1.1
Host:
Accept: */*
No content
Last updated
Was this helpful?