Pipelines
Register, list, detail and unregister stream processing pipelines
Overview
Streams are the backbone of all processing. Joule Management API provides the ability to register, list, detail and undeploy stream pipelines.
For further information on the structure of the content payload refer to this documentation.
Example stream content
{
"stream": {
"name": "basic_tumbling_window_pipeline",
"enabled": true,
"eventTimeType": "EVENT_TIME",
"sources": [
"nasdaq_quotes_stream"
],
"processing unit": {
"pipeline": [
{
"time window": {
"emitting type": "tumblingQuoteAnalytics",
"aggregate functions": {
"FIRST": [
"ask"
],
"LAST": [
"ask"
]
},
"policy": {
"type": "tumblingTime",
"window size": 1000
}
}
}
]
},
"emit": {
"select": "symbol, ask_FIRST, ask_LAST"
},
"group by": [
"symbol"
]
}
}
Register a use case stream and validate it before starting up
Body
namestringOptional
enabledbooleanRequired
sourcesstring[]Optional
bufferSizeinteger · int32Required
checkPointProcessingbooleanRequired
groupByAttributesstring[]Optional
streamTimeTypestring · enumOptionalPossible values:
propertiesPathstringOptional
consumerQueueSizeinteger · int32Required
consumingFrequencyinteger · int64Required
publishingQueueSizeinteger · int32Required
publishingFrequencyinteger · int64Required
validFromstring · date-timeOptional
validTostring · date-timeOptional
asyncProcessingbooleanRequired
Responses
201
Successful registered stream
500
Failed to register stream due to malformed or missing DSL
501
Invalid stream definition
502
Stream has already been registered
post
POST /joule/management/stream/register HTTP/1.1
Host:
Content-Type: application/json
Accept: */*
Content-Length: 4470
{
"name": "text",
"enabled": true,
"sources": [
"text"
],
"bufferSize": 1,
"checkPointProcessing": true,
"groupbyKey": {
"keys": [
"text"
],
"partitionKey": 1
},
"groupByAttributes": [
"text"
],
"streamTimeType": "INGEST_TIME",
"propertiesPath": "text",
"consumerQueueSize": 1,
"consumingFrequency": 1,
"publishingQueueSize": 1,
"publishingFrequency": 1,
"telemetryAuditingSpecification": {
"name": "text",
"enabled": true,
"rawTelemetrySpecification": {
"cloneEvents": true,
"auditFrequency": 1
},
"processedTelemetrySpecification": {
"cloneEvents": true,
"auditFrequency": 1
}
},
"validFrom": "2025-06-27T06:33:13.733Z",
"validTo": "2025-06-27T06:33:13.733Z",
"asyncProcessing": true,
"initialisationSpecification": {
"tableImportProcessor": {
"parquetImportConfigurations": [
{
"schema": "text",
"files": [
"text"
],
"table": "text",
"asView": true,
"index": {
"schema": "text",
"fields": [
"text"
],
"unique": true
}
}
],
"schema": "text",
"name": "text",
"enabled": true
},
"modelImporter": {
"empty": true,
"": {},
"orDefault": {},
"name": "text",
"enabled": true
},
"name": "text",
"enabled": true
},
"processingUnitSpecification": {
"name": "text",
"enabled": true,
"metricsEngineSpecification": {
"metricsDefinitionSpecification": {
"metrics": [
{
"name": "text",
"query": "text",
"tableDefinition": "text",
"metricKey": "text",
"compactionSpecification": {
"frequency": 1,
"timeUnit": "NANOSECONDS",
"userDefinedCompactionQuery": "text"
},
"cleanOnStart": true,
"enabled": true
}
],
"name": "text",
"enabled": true
},
"policy": {
"timeUnit": "NANOSECONDS",
"frequency": 1,
"startupDelay": 1
},
"name": "text",
"enabled": true
},
"processorPipeline": [
{
"uUID": "text",
"metrics": {
"metricsMap": {
"ANY_ADDITIONAL_PROPERTY": {
"": 1,
"andSet": 1,
"andIncrement": 1,
"andDecrement": 1,
"andAdd": 1,
"andUpdate": 1,
"andAccumulate": 1,
"plain": 1,
"opaque": 1,
"acquire": 1
}
},
"metric": 1
}
}
]
},
"selectProcessor": {
"context": {
"": {},
"present": true,
"empty": true
},
"uUID": "text",
"name": "text",
"properties": {
"property": "text",
"empty": true,
"": {},
"orDefault": {}
},
"cloneEvent": true,
"stateful": true,
"dataStores": {
"ANY_ADDITIONAL_PROPERTY": {
"name": "text",
"enabled": true,
"initialImage": {},
"storeName": "text",
"getInitialImage": true,
"initialImageQuery": "text",
"store": {
"metaData": {
"description": "text",
"constantJoinFields": "[Circular Reference]",
"specification": {
"name": "text",
"enabled": true
}
},
"storeType": "KEY_VALUE",
"initialImage": {
"": {},
"present": true,
"empty": true
}
},
"storeType": "KEY_VALUE"
}
},
"groupbyKey": {
"keys": [
"text"
],
"partitionKey": 1
},
"jouleDBQueryInterface": {
"name": "text",
"enabled": true,
"initialImage": {},
"storeName": "text",
"getInitialImage": true,
"initialImageQuery": "text",
"store": {
"metaData": {
"description": "text",
"constantJoinFields": "[Circular Reference]",
"specification": {
"name": "text",
"enabled": true
}
},
"storeType": "KEY_VALUE",
"initialImage": {
"": {},
"present": true,
"empty": true
}
},
"storeType": "KEY_VALUE"
},
"metrics": {
"metricsMap": {
"ANY_ADDITIONAL_PROPERTY": {
"": 1,
"andSet": 1,
"andIncrement": 1,
"andDecrement": 1,
"andAdd": 1,
"andUpdate": 1,
"andAccumulate": 1,
"plain": 1,
"opaque": 1,
"acquire": 1
}
},
"metric": 1
},
"enabled": true,
"eventsProcessed": 1,
"eventsDiscarded": 1,
"eventsFailed": 1,
"eventsReceived": 1,
"eventsIgnored": 1,
"eventsQueued": 1,
"responseEventType": "text",
"projectionExpression": "text",
"selectAttributes": [
{
"originalExpression": "text",
"metricFamily": "text",
"metrics": "text",
"metricKey": "text",
"metricTokens": [
"text"
],
"alias": "text",
"type": "SEARCH_PATH",
"searchPath": [
"text"
],
"mathProcessor": {
"context": {
"": {},
"present": true,
"empty": true
},
"uUID": "text",
"name": "text",
"properties": {
"property": "text",
"empty": true,
"": {},
"orDefault": {}
},
"cloneEvent": true,
"stateful": true,
"dataStores": {
"ANY_ADDITIONAL_PROPERTY": "[Circular Reference]"
},
"groupbyKey": {
"keys": [
"text"
],
"partitionKey": 1
},
"jouleDBQueryInterface": {
"name": "text",
"enabled": true,
"initialImage": {},
"storeName": "text",
"getInitialImage": true,
"initialImageQuery": "text",
"store": {
"metaData": {
"description": "text",
"constantJoinFields": "[Circular Reference]",
"specification": {
"name": "text",
"enabled": true
}
},
"storeType": "KEY_VALUE",
"initialImage": {
"": {},
"present": true,
"empty": true
}
},
"storeType": "KEY_VALUE"
},
"metrics": {
"metricsMap": {
"ANY_ADDITIONAL_PROPERTY": {
"": 1,
"andSet": 1,
"andIncrement": 1,
"andDecrement": 1,
"andAdd": 1,
"andUpdate": 1,
"andAccumulate": 1,
"plain": 1,
"opaque": 1,
"acquire": 1
}
},
"metric": 1
},
"enabled": true,
"eventsProcessed": 1,
"eventsDiscarded": 1,
"eventsFailed": 1,
"eventsReceived": 1,
"eventsIgnored": 1,
"eventsQueued": 1,
"previousComputedValue": {},
"responseDataType": "DOUBLE",
"expression": "text",
"language": "text",
"variables": {
"ANY_ADDITIONAL_PROPERTY": {}
}
},
"queryPredicate": "text",
"parameters": [
"text"
],
"aliase": "text"
}
]
}
}
No content
Get the stream processing specification of a registered stream
Query parameters
namestringRequired
Name of stream configuration.
Responses
201
Successful detail provided for stream
application/json
404
Stream not found
500
Internal Joule error. Check log files.
501
Unknown stream.
get
GET /joule/management/stream/detail?name=text HTTP/1.1
Host:
Accept: */*
{
"name": "text",
"enabled": true,
"sources": [
"text"
],
"bufferSize": 1,
"checkPointProcessing": true,
"groupbyKey": {
"keys": [
"text"
],
"partitionKey": 1
},
"groupByAttributes": [
"text"
],
"streamTimeType": "INGEST_TIME",
"propertiesPath": "text",
"consumerQueueSize": 1,
"consumingFrequency": 1,
"publishingQueueSize": 1,
"publishingFrequency": 1,
"telemetryAuditingSpecification": {
"name": "text",
"enabled": true,
"rawTelemetrySpecification": {
"cloneEvents": true,
"auditFrequency": 1
},
"processedTelemetrySpecification": {
"cloneEvents": true,
"auditFrequency": 1
}
},
"validFrom": "2025-06-27T06:33:13.733Z",
"validTo": "2025-06-27T06:33:13.733Z",
"asyncProcessing": true,
"initialisationSpecification": {
"tableImportProcessor": {
"parquetImportConfigurations": [
{
"schema": "text",
"files": [
"text"
],
"table": "text",
"asView": true,
"index": {
"schema": "text",
"fields": [
"text"
],
"unique": true
}
}
],
"schema": "text",
"name": "text",
"enabled": true
},
"modelImporter": {
"empty": true,
"": {},
"orDefault": {},
"name": "text",
"enabled": true
},
"name": "text",
"enabled": true
},
"processingUnitSpecification": {
"name": "text",
"enabled": true,
"metricsEngineSpecification": {
"metricsDefinitionSpecification": {
"metrics": [
{
"name": "text",
"query": "text",
"tableDefinition": "text",
"metricKey": "text",
"compactionSpecification": {
"frequency": 1,
"timeUnit": "NANOSECONDS",
"userDefinedCompactionQuery": "text"
},
"cleanOnStart": true,
"enabled": true
}
],
"name": "text",
"enabled": true
},
"policy": {
"timeUnit": "NANOSECONDS",
"frequency": 1,
"startupDelay": 1
},
"name": "text",
"enabled": true
},
"processorPipeline": [
{
"uUID": "text",
"metrics": {
"metricsMap": {
"ANY_ADDITIONAL_PROPERTY": {
"": 1,
"andSet": 1,
"andIncrement": 1,
"andDecrement": 1,
"andAdd": 1,
"andUpdate": 1,
"andAccumulate": 1,
"plain": 1,
"opaque": 1,
"acquire": 1
}
},
"metric": 1
}
}
]
},
"selectProcessor": {
"context": {
"": {},
"present": true,
"empty": true
},
"uUID": "text",
"name": "text",
"properties": {
"property": "text",
"empty": true,
"": {},
"orDefault": {}
},
"cloneEvent": true,
"stateful": true,
"dataStores": {
"ANY_ADDITIONAL_PROPERTY": {
"name": "text",
"enabled": true,
"initialImage": {},
"storeName": "text",
"getInitialImage": true,
"initialImageQuery": "text",
"store": {
"metaData": {
"description": "text",
"constantJoinFields": "[Circular Reference]",
"specification": {
"name": "text",
"enabled": true
}
},
"storeType": "KEY_VALUE",
"initialImage": {
"": {},
"present": true,
"empty": true
}
},
"storeType": "KEY_VALUE"
}
},
"groupbyKey": {
"keys": [
"text"
],
"partitionKey": 1
},
"jouleDBQueryInterface": {
"name": "text",
"enabled": true,
"initialImage": {},
"storeName": "text",
"getInitialImage": true,
"initialImageQuery": "text",
"store": {
"metaData": {
"description": "text",
"constantJoinFields": "[Circular Reference]",
"specification": {
"name": "text",
"enabled": true
}
},
"storeType": "KEY_VALUE",
"initialImage": {
"": {},
"present": true,
"empty": true
}
},
"storeType": "KEY_VALUE"
},
"metrics": {
"metricsMap": {
"ANY_ADDITIONAL_PROPERTY": {
"": 1,
"andSet": 1,
"andIncrement": 1,
"andDecrement": 1,
"andAdd": 1,
"andUpdate": 1,
"andAccumulate": 1,
"plain": 1,
"opaque": 1,
"acquire": 1
}
},
"metric": 1
},
"enabled": true,
"eventsProcessed": 1,
"eventsDiscarded": 1,
"eventsFailed": 1,
"eventsReceived": 1,
"eventsIgnored": 1,
"eventsQueued": 1,
"responseEventType": "text",
"projectionExpression": "text",
"selectAttributes": [
{
"originalExpression": "text",
"metricFamily": "text",
"metrics": "text",
"metricKey": "text",
"metricTokens": [
"text"
],
"alias": "text",
"type": "SEARCH_PATH",
"searchPath": [
"text"
],
"mathProcessor": {
"context": {
"": {},
"present": true,
"empty": true
},
"uUID": "text",
"name": "text",
"properties": {
"property": "text",
"empty": true,
"": {},
"orDefault": {}
},
"cloneEvent": true,
"stateful": true,
"dataStores": {
"ANY_ADDITIONAL_PROPERTY": "[Circular Reference]"
},
"groupbyKey": {
"keys": [
"text"
],
"partitionKey": 1
},
"jouleDBQueryInterface": {
"name": "text",
"enabled": true,
"initialImage": {},
"storeName": "text",
"getInitialImage": true,
"initialImageQuery": "text",
"store": {
"metaData": {
"description": "text",
"constantJoinFields": "[Circular Reference]",
"specification": {
"name": "text",
"enabled": true
}
},
"storeType": "KEY_VALUE",
"initialImage": {
"": {},
"present": true,
"empty": true
}
},
"storeType": "KEY_VALUE"
},
"metrics": {
"metricsMap": {
"ANY_ADDITIONAL_PROPERTY": {
"": 1,
"andSet": 1,
"andIncrement": 1,
"andDecrement": 1,
"andAdd": 1,
"andUpdate": 1,
"andAccumulate": 1,
"plain": 1,
"opaque": 1,
"acquire": 1
}
},
"metric": 1
},
"enabled": true,
"eventsProcessed": 1,
"eventsDiscarded": 1,
"eventsFailed": 1,
"eventsReceived": 1,
"eventsIgnored": 1,
"eventsQueued": 1,
"previousComputedValue": {},
"responseDataType": "DOUBLE",
"expression": "text",
"language": "text",
"variables": {
"ANY_ADDITIONAL_PROPERTY": {}
}
},
"queryPredicate": "text",
"parameters": [
"text"
],
"aliase": "text"
}
]
}
}
Unregister a stream processing specification from platform
Query parameters
namestringRequiredExample:
Name of stream to unregistered
quoteStream
Responses
201
Successfully unregistered stream
500
Failed to unregister stream . Check Joule service log files.
501
Unknown stream.
502
Stream name must be provided.
delete
DELETE /joule/management/stream/unregister?name=quoteStream HTTP/1.1
Host:
Accept: */*
No content
Last updated
Was this helpful?