Pipelines

Register, list, detail and unregister stream processing pipelines

Overview

Streams are the backbone of all processing. Joule Management API provides the ability to register, list, detail and undeploy stream pipelines.

For further information on the structure of the content payload refer to this documentation.

Example stream content

{
  "stream": {
    "name": "basic_tumbling_window_pipeline",
    "enabled": true,
    "eventTimeType": "EVENT_TIME",
    "sources": [
      "nasdaq_quotes_stream"
    ],
    "processing unit": {
      "pipeline": [
        {
          "time window": {
            "emitting type": "tumblingQuoteAnalytics",
            "aggregate functions": {
              "FIRST": [
                "ask"
              ],
              "LAST": [
                "ask"
              ]
            },
            "policy": {
              "type": "tumblingTime",
              "window size": 1000
            }
          }
        }
      ]
    },
    "emit": {
      "select": "symbol, ask_FIRST, ask_LAST"
    },
    "group by": [
      "symbol"
    ]
  }
}

Register a stream

post

Register a use case stream and validate it before starting up

Body
namestringOptional
enabledbooleanRequired
sourcesstring[]Optional
bufferSizeinteger · int32Required
checkPointProcessingbooleanRequired
groupByAttributesstring[]Optional
streamTimeTypestring · enumOptionalPossible values:
propertiesPathstringOptional
consumerQueueSizeinteger · int32Required
consumingFrequencyinteger · int64Required
publishingQueueSizeinteger · int32Required
publishingFrequencyinteger · int64Required
validFromstring · date-timeOptional
validTostring · date-timeOptional
asyncProcessingbooleanRequired
Responses
201

Successful registered stream

No content

post
/joule/management/stream/register
POST /joule/management/stream/register HTTP/1.1
Host: 
Content-Type: application/json
Accept: */*
Content-Length: 4470

{
  "name": "text",
  "enabled": true,
  "sources": [
    "text"
  ],
  "bufferSize": 1,
  "checkPointProcessing": true,
  "groupbyKey": {
    "keys": [
      "text"
    ],
    "partitionKey": 1
  },
  "groupByAttributes": [
    "text"
  ],
  "streamTimeType": "INGEST_TIME",
  "propertiesPath": "text",
  "consumerQueueSize": 1,
  "consumingFrequency": 1,
  "publishingQueueSize": 1,
  "publishingFrequency": 1,
  "telemetryAuditingSpecification": {
    "name": "text",
    "enabled": true,
    "rawTelemetrySpecification": {
      "cloneEvents": true,
      "auditFrequency": 1
    },
    "processedTelemetrySpecification": {
      "cloneEvents": true,
      "auditFrequency": 1
    }
  },
  "validFrom": "2025-10-23T21:25:14.507Z",
  "validTo": "2025-10-23T21:25:14.507Z",
  "asyncProcessing": true,
  "initialisationSpecification": {
    "tableImportProcessor": {
      "parquetImportConfigurations": [
        {
          "schema": "text",
          "files": [
            "text"
          ],
          "table": "text",
          "asView": true,
          "index": {
            "schema": "text",
            "fields": [
              "text"
            ],
            "unique": true
          }
        }
      ],
      "schema": "text",
      "name": "text",
      "enabled": true
    },
    "modelImporter": {
      "empty": true,
      "": {},
      "orDefault": {},
      "name": "text",
      "enabled": true
    },
    "name": "text",
    "enabled": true
  },
  "processingUnitSpecification": {
    "name": "text",
    "enabled": true,
    "metricsEngineSpecification": {
      "metricsDefinitionSpecification": {
        "metrics": [
          {
            "name": "text",
            "query": "text",
            "tableDefinition": "text",
            "metricKey": "text",
            "compactionSpecification": {
              "frequency": 1,
              "timeUnit": "NANOSECONDS",
              "userDefinedCompactionQuery": "text"
            },
            "cleanOnStart": true,
            "enabled": true
          }
        ],
        "name": "text",
        "enabled": true
      },
      "policy": {
        "timeUnit": "NANOSECONDS",
        "frequency": 1,
        "startupDelay": 1
      },
      "name": "text",
      "enabled": true
    },
    "processorPipeline": [
      {
        "uUID": "text",
        "metrics": {
          "metricsMap": {
            "ANY_ADDITIONAL_PROPERTY": {
              "": 1,
              "andSet": 1,
              "andIncrement": 1,
              "andDecrement": 1,
              "andAdd": 1,
              "andUpdate": 1,
              "andAccumulate": 1,
              "plain": 1,
              "opaque": 1,
              "acquire": 1
            }
          },
          "metric": 1
        }
      }
    ]
  },
  "selectProcessor": {
    "context": {
      "": {},
      "present": true,
      "empty": true
    },
    "uUID": "text",
    "name": "text",
    "properties": {
      "property": "text",
      "empty": true,
      "": {},
      "orDefault": {}
    },
    "cloneEvent": true,
    "stateful": true,
    "dataStores": {
      "ANY_ADDITIONAL_PROPERTY": {
        "name": "text",
        "enabled": true,
        "initialImage": {},
        "storeName": "text",
        "getInitialImage": true,
        "initialImageQuery": "text",
        "store": {
          "metaData": {
            "description": "text",
            "constantJoinFields": "[Circular Reference]",
            "specification": {
              "name": "text",
              "enabled": true
            }
          },
          "storeType": "KEY_VALUE",
          "initialImage": {
            "": {},
            "present": true,
            "empty": true
          }
        },
        "storeType": "KEY_VALUE"
      }
    },
    "groupbyKey": {
      "keys": [
        "text"
      ],
      "partitionKey": 1
    },
    "jouleDBQueryInterface": {
      "name": "text",
      "enabled": true,
      "initialImage": {},
      "storeName": "text",
      "getInitialImage": true,
      "initialImageQuery": "text",
      "store": {
        "metaData": {
          "description": "text",
          "constantJoinFields": "[Circular Reference]",
          "specification": {
            "name": "text",
            "enabled": true
          }
        },
        "storeType": "KEY_VALUE",
        "initialImage": {
          "": {},
          "present": true,
          "empty": true
        }
      },
      "storeType": "KEY_VALUE"
    },
    "metrics": {
      "metricsMap": {
        "ANY_ADDITIONAL_PROPERTY": {
          "": 1,
          "andSet": 1,
          "andIncrement": 1,
          "andDecrement": 1,
          "andAdd": 1,
          "andUpdate": 1,
          "andAccumulate": 1,
          "plain": 1,
          "opaque": 1,
          "acquire": 1
        }
      },
      "metric": 1
    },
    "enabled": true,
    "eventsProcessed": 1,
    "eventsDiscarded": 1,
    "eventsFailed": 1,
    "eventsReceived": 1,
    "eventsIgnored": 1,
    "eventsQueued": 1,
    "responseEventType": "text",
    "projectionExpression": "text",
    "selectAttributes": [
      {
        "originalExpression": "text",
        "metricFamily": "text",
        "metrics": "text",
        "metricKey": "text",
        "metricTokens": [
          "text"
        ],
        "alias": "text",
        "type": "SEARCH_PATH",
        "searchPath": [
          "text"
        ],
        "mathProcessor": {
          "context": {
            "": {},
            "present": true,
            "empty": true
          },
          "uUID": "text",
          "name": "text",
          "properties": {
            "property": "text",
            "empty": true,
            "": {},
            "orDefault": {}
          },
          "cloneEvent": true,
          "stateful": true,
          "dataStores": {
            "ANY_ADDITIONAL_PROPERTY": "[Circular Reference]"
          },
          "groupbyKey": {
            "keys": [
              "text"
            ],
            "partitionKey": 1
          },
          "jouleDBQueryInterface": {
            "name": "text",
            "enabled": true,
            "initialImage": {},
            "storeName": "text",
            "getInitialImage": true,
            "initialImageQuery": "text",
            "store": {
              "metaData": {
                "description": "text",
                "constantJoinFields": "[Circular Reference]",
                "specification": {
                  "name": "text",
                  "enabled": true
                }
              },
              "storeType": "KEY_VALUE",
              "initialImage": {
                "": {},
                "present": true,
                "empty": true
              }
            },
            "storeType": "KEY_VALUE"
          },
          "metrics": {
            "metricsMap": {
              "ANY_ADDITIONAL_PROPERTY": {
                "": 1,
                "andSet": 1,
                "andIncrement": 1,
                "andDecrement": 1,
                "andAdd": 1,
                "andUpdate": 1,
                "andAccumulate": 1,
                "plain": 1,
                "opaque": 1,
                "acquire": 1
              }
            },
            "metric": 1
          },
          "enabled": true,
          "eventsProcessed": 1,
          "eventsDiscarded": 1,
          "eventsFailed": 1,
          "eventsReceived": 1,
          "eventsIgnored": 1,
          "eventsQueued": 1,
          "previousComputedValue": {},
          "responseDataType": "DOUBLE",
          "expression": "text",
          "language": "text",
          "variables": {
            "ANY_ADDITIONAL_PROPERTY": {}
          }
        },
        "queryPredicate": "text",
        "parameters": [
          "text"
        ],
        "aliase": "text"
      }
    ]
  }
}

No content

List all streams

get

List all streams registered within Joule node

Responses
201

Successful list streams

No content

get
/joule/management/stream/list
GET /joule/management/stream/list HTTP/1.1
Host: 
Accept: */*

No content

Get stream specification

get

Get the stream processing specification of a registered stream

Query parameters
namestringRequired

Name of stream configuration.

Responses
201

Successful detail provided for stream

application/json
get
/joule/management/stream/detail
GET /joule/management/stream/detail?name=text HTTP/1.1
Host: 
Accept: */*
{
  "name": "text",
  "enabled": true,
  "sources": [
    "text"
  ],
  "bufferSize": 1,
  "checkPointProcessing": true,
  "groupbyKey": {
    "keys": [
      "text"
    ],
    "partitionKey": 1
  },
  "groupByAttributes": [
    "text"
  ],
  "streamTimeType": "INGEST_TIME",
  "propertiesPath": "text",
  "consumerQueueSize": 1,
  "consumingFrequency": 1,
  "publishingQueueSize": 1,
  "publishingFrequency": 1,
  "telemetryAuditingSpecification": {
    "name": "text",
    "enabled": true,
    "rawTelemetrySpecification": {
      "cloneEvents": true,
      "auditFrequency": 1
    },
    "processedTelemetrySpecification": {
      "cloneEvents": true,
      "auditFrequency": 1
    }
  },
  "validFrom": "2025-10-23T21:25:14.507Z",
  "validTo": "2025-10-23T21:25:14.507Z",
  "asyncProcessing": true,
  "initialisationSpecification": {
    "tableImportProcessor": {
      "parquetImportConfigurations": [
        {
          "schema": "text",
          "files": [
            "text"
          ],
          "table": "text",
          "asView": true,
          "index": {
            "schema": "text",
            "fields": [
              "text"
            ],
            "unique": true
          }
        }
      ],
      "schema": "text",
      "name": "text",
      "enabled": true
    },
    "modelImporter": {
      "empty": true,
      "": {},
      "orDefault": {},
      "name": "text",
      "enabled": true
    },
    "name": "text",
    "enabled": true
  },
  "processingUnitSpecification": {
    "name": "text",
    "enabled": true,
    "metricsEngineSpecification": {
      "metricsDefinitionSpecification": {
        "metrics": [
          {
            "name": "text",
            "query": "text",
            "tableDefinition": "text",
            "metricKey": "text",
            "compactionSpecification": {
              "frequency": 1,
              "timeUnit": "NANOSECONDS",
              "userDefinedCompactionQuery": "text"
            },
            "cleanOnStart": true,
            "enabled": true
          }
        ],
        "name": "text",
        "enabled": true
      },
      "policy": {
        "timeUnit": "NANOSECONDS",
        "frequency": 1,
        "startupDelay": 1
      },
      "name": "text",
      "enabled": true
    },
    "processorPipeline": [
      {
        "uUID": "text",
        "metrics": {
          "metricsMap": {
            "ANY_ADDITIONAL_PROPERTY": {
              "": 1,
              "andSet": 1,
              "andIncrement": 1,
              "andDecrement": 1,
              "andAdd": 1,
              "andUpdate": 1,
              "andAccumulate": 1,
              "plain": 1,
              "opaque": 1,
              "acquire": 1
            }
          },
          "metric": 1
        }
      }
    ]
  },
  "selectProcessor": {
    "context": {
      "": {},
      "present": true,
      "empty": true
    },
    "uUID": "text",
    "name": "text",
    "properties": {
      "property": "text",
      "empty": true,
      "": {},
      "orDefault": {}
    },
    "cloneEvent": true,
    "stateful": true,
    "dataStores": {
      "ANY_ADDITIONAL_PROPERTY": {
        "name": "text",
        "enabled": true,
        "initialImage": {},
        "storeName": "text",
        "getInitialImage": true,
        "initialImageQuery": "text",
        "store": {
          "metaData": {
            "description": "text",
            "constantJoinFields": "[Circular Reference]",
            "specification": {
              "name": "text",
              "enabled": true
            }
          },
          "storeType": "KEY_VALUE",
          "initialImage": {
            "": {},
            "present": true,
            "empty": true
          }
        },
        "storeType": "KEY_VALUE"
      }
    },
    "groupbyKey": {
      "keys": [
        "text"
      ],
      "partitionKey": 1
    },
    "jouleDBQueryInterface": {
      "name": "text",
      "enabled": true,
      "initialImage": {},
      "storeName": "text",
      "getInitialImage": true,
      "initialImageQuery": "text",
      "store": {
        "metaData": {
          "description": "text",
          "constantJoinFields": "[Circular Reference]",
          "specification": {
            "name": "text",
            "enabled": true
          }
        },
        "storeType": "KEY_VALUE",
        "initialImage": {
          "": {},
          "present": true,
          "empty": true
        }
      },
      "storeType": "KEY_VALUE"
    },
    "metrics": {
      "metricsMap": {
        "ANY_ADDITIONAL_PROPERTY": {
          "": 1,
          "andSet": 1,
          "andIncrement": 1,
          "andDecrement": 1,
          "andAdd": 1,
          "andUpdate": 1,
          "andAccumulate": 1,
          "plain": 1,
          "opaque": 1,
          "acquire": 1
        }
      },
      "metric": 1
    },
    "enabled": true,
    "eventsProcessed": 1,
    "eventsDiscarded": 1,
    "eventsFailed": 1,
    "eventsReceived": 1,
    "eventsIgnored": 1,
    "eventsQueued": 1,
    "responseEventType": "text",
    "projectionExpression": "text",
    "selectAttributes": [
      {
        "originalExpression": "text",
        "metricFamily": "text",
        "metrics": "text",
        "metricKey": "text",
        "metricTokens": [
          "text"
        ],
        "alias": "text",
        "type": "SEARCH_PATH",
        "searchPath": [
          "text"
        ],
        "mathProcessor": {
          "context": {
            "": {},
            "present": true,
            "empty": true
          },
          "uUID": "text",
          "name": "text",
          "properties": {
            "property": "text",
            "empty": true,
            "": {},
            "orDefault": {}
          },
          "cloneEvent": true,
          "stateful": true,
          "dataStores": {
            "ANY_ADDITIONAL_PROPERTY": "[Circular Reference]"
          },
          "groupbyKey": {
            "keys": [
              "text"
            ],
            "partitionKey": 1
          },
          "jouleDBQueryInterface": {
            "name": "text",
            "enabled": true,
            "initialImage": {},
            "storeName": "text",
            "getInitialImage": true,
            "initialImageQuery": "text",
            "store": {
              "metaData": {
                "description": "text",
                "constantJoinFields": "[Circular Reference]",
                "specification": {
                  "name": "text",
                  "enabled": true
                }
              },
              "storeType": "KEY_VALUE",
              "initialImage": {
                "": {},
                "present": true,
                "empty": true
              }
            },
            "storeType": "KEY_VALUE"
          },
          "metrics": {
            "metricsMap": {
              "ANY_ADDITIONAL_PROPERTY": {
                "": 1,
                "andSet": 1,
                "andIncrement": 1,
                "andDecrement": 1,
                "andAdd": 1,
                "andUpdate": 1,
                "andAccumulate": 1,
                "plain": 1,
                "opaque": 1,
                "acquire": 1
              }
            },
            "metric": 1
          },
          "enabled": true,
          "eventsProcessed": 1,
          "eventsDiscarded": 1,
          "eventsFailed": 1,
          "eventsReceived": 1,
          "eventsIgnored": 1,
          "eventsQueued": 1,
          "previousComputedValue": {},
          "responseDataType": "DOUBLE",
          "expression": "text",
          "language": "text",
          "variables": {
            "ANY_ADDITIONAL_PROPERTY": {}
          }
        },
        "queryPredicate": "text",
        "parameters": [
          "text"
        ],
        "aliase": "text"
      }
    ]
  }
}

Unregister stream

delete

Unregister a stream processing specification from platform

Query parameters
namestringRequired

Name of stream to unregistered

Example: quoteStream
Responses
201

Successfully unregistered stream

No content

delete
/joule/management/stream/unregister
DELETE /joule/management/stream/unregister?name=quoteStream HTTP/1.1
Host: 
Accept: */*

No content

Last updated

Was this helpful?