Pipelines

Register, list, detail and unregister stream processing pipelines

Overview

Streams are the backbone of all processing. Joule Management API provides the ability to register, list, detail and undeploy stream pipelines.

For further information on the structure of the content payload refer to this documentation.

Example stream content

{
  "stream": {
    "name": "basic_tumbling_window_pipeline",
    "enabled": true,
    "eventTimeType": "EVENT_TIME",
    "sources": [
      "nasdaq_quotes_stream"
    ],
    "processing unit": {
      "pipeline": [
        {
          "time window": {
            "emitting type": "tumblingQuoteAnalytics",
            "aggregate functions": {
              "FIRST": [
                "ask"
              ],
              "LAST": [
                "ask"
              ]
            },
            "policy": {
              "type": "tumblingTime",
              "window size": 1000
            }
          }
        }
      ]
    },
    "emit": {
      "select": "symbol, ask_FIRST, ask_LAST"
    },
    "group by": [
      "symbol"
    ]
  }
}

Register a stream

post

Register a use case stream and validate it before starting up

Body
namestringOptional
enabledbooleanRequired
sourcesstring[]Optional
bufferSizeinteger · int32Required
checkPointProcessingbooleanRequired
groupByAttributesstring[]Optional
streamTimeTypestring · enumOptionalPossible values:
propertiesPathstringOptional
consumerQueueSizeinteger · int32Required
consumingFrequencyinteger · int64Required
publishingQueueSizeinteger · int32Required
publishingFrequencyinteger · int64Required
validFromstring · date-timeOptional
validTostring · date-timeOptional
asyncProcessingbooleanRequired
Responses
201
Successful registered stream
post
POST /joule/management/stream/register HTTP/1.1
Host: 
Content-Type: application/json
Accept: */*
Content-Length: 4470

{
  "name": "text",
  "enabled": true,
  "sources": [
    "text"
  ],
  "bufferSize": 1,
  "checkPointProcessing": true,
  "groupbyKey": {
    "keys": [
      "text"
    ],
    "partitionKey": 1
  },
  "groupByAttributes": [
    "text"
  ],
  "streamTimeType": "INGEST_TIME",
  "propertiesPath": "text",
  "consumerQueueSize": 1,
  "consumingFrequency": 1,
  "publishingQueueSize": 1,
  "publishingFrequency": 1,
  "telemetryAuditingSpecification": {
    "name": "text",
    "enabled": true,
    "rawTelemetrySpecification": {
      "cloneEvents": true,
      "auditFrequency": 1
    },
    "processedTelemetrySpecification": {
      "cloneEvents": true,
      "auditFrequency": 1
    }
  },
  "validFrom": "2025-06-27T06:33:13.733Z",
  "validTo": "2025-06-27T06:33:13.733Z",
  "asyncProcessing": true,
  "initialisationSpecification": {
    "tableImportProcessor": {
      "parquetImportConfigurations": [
        {
          "schema": "text",
          "files": [
            "text"
          ],
          "table": "text",
          "asView": true,
          "index": {
            "schema": "text",
            "fields": [
              "text"
            ],
            "unique": true
          }
        }
      ],
      "schema": "text",
      "name": "text",
      "enabled": true
    },
    "modelImporter": {
      "empty": true,
      "": {},
      "orDefault": {},
      "name": "text",
      "enabled": true
    },
    "name": "text",
    "enabled": true
  },
  "processingUnitSpecification": {
    "name": "text",
    "enabled": true,
    "metricsEngineSpecification": {
      "metricsDefinitionSpecification": {
        "metrics": [
          {
            "name": "text",
            "query": "text",
            "tableDefinition": "text",
            "metricKey": "text",
            "compactionSpecification": {
              "frequency": 1,
              "timeUnit": "NANOSECONDS",
              "userDefinedCompactionQuery": "text"
            },
            "cleanOnStart": true,
            "enabled": true
          }
        ],
        "name": "text",
        "enabled": true
      },
      "policy": {
        "timeUnit": "NANOSECONDS",
        "frequency": 1,
        "startupDelay": 1
      },
      "name": "text",
      "enabled": true
    },
    "processorPipeline": [
      {
        "uUID": "text",
        "metrics": {
          "metricsMap": {
            "ANY_ADDITIONAL_PROPERTY": {
              "": 1,
              "andSet": 1,
              "andIncrement": 1,
              "andDecrement": 1,
              "andAdd": 1,
              "andUpdate": 1,
              "andAccumulate": 1,
              "plain": 1,
              "opaque": 1,
              "acquire": 1
            }
          },
          "metric": 1
        }
      }
    ]
  },
  "selectProcessor": {
    "context": {
      "": {},
      "present": true,
      "empty": true
    },
    "uUID": "text",
    "name": "text",
    "properties": {
      "property": "text",
      "empty": true,
      "": {},
      "orDefault": {}
    },
    "cloneEvent": true,
    "stateful": true,
    "dataStores": {
      "ANY_ADDITIONAL_PROPERTY": {
        "name": "text",
        "enabled": true,
        "initialImage": {},
        "storeName": "text",
        "getInitialImage": true,
        "initialImageQuery": "text",
        "store": {
          "metaData": {
            "description": "text",
            "constantJoinFields": "[Circular Reference]",
            "specification": {
              "name": "text",
              "enabled": true
            }
          },
          "storeType": "KEY_VALUE",
          "initialImage": {
            "": {},
            "present": true,
            "empty": true
          }
        },
        "storeType": "KEY_VALUE"
      }
    },
    "groupbyKey": {
      "keys": [
        "text"
      ],
      "partitionKey": 1
    },
    "jouleDBQueryInterface": {
      "name": "text",
      "enabled": true,
      "initialImage": {},
      "storeName": "text",
      "getInitialImage": true,
      "initialImageQuery": "text",
      "store": {
        "metaData": {
          "description": "text",
          "constantJoinFields": "[Circular Reference]",
          "specification": {
            "name": "text",
            "enabled": true
          }
        },
        "storeType": "KEY_VALUE",
        "initialImage": {
          "": {},
          "present": true,
          "empty": true
        }
      },
      "storeType": "KEY_VALUE"
    },
    "metrics": {
      "metricsMap": {
        "ANY_ADDITIONAL_PROPERTY": {
          "": 1,
          "andSet": 1,
          "andIncrement": 1,
          "andDecrement": 1,
          "andAdd": 1,
          "andUpdate": 1,
          "andAccumulate": 1,
          "plain": 1,
          "opaque": 1,
          "acquire": 1
        }
      },
      "metric": 1
    },
    "enabled": true,
    "eventsProcessed": 1,
    "eventsDiscarded": 1,
    "eventsFailed": 1,
    "eventsReceived": 1,
    "eventsIgnored": 1,
    "eventsQueued": 1,
    "responseEventType": "text",
    "projectionExpression": "text",
    "selectAttributes": [
      {
        "originalExpression": "text",
        "metricFamily": "text",
        "metrics": "text",
        "metricKey": "text",
        "metricTokens": [
          "text"
        ],
        "alias": "text",
        "type": "SEARCH_PATH",
        "searchPath": [
          "text"
        ],
        "mathProcessor": {
          "context": {
            "": {},
            "present": true,
            "empty": true
          },
          "uUID": "text",
          "name": "text",
          "properties": {
            "property": "text",
            "empty": true,
            "": {},
            "orDefault": {}
          },
          "cloneEvent": true,
          "stateful": true,
          "dataStores": {
            "ANY_ADDITIONAL_PROPERTY": "[Circular Reference]"
          },
          "groupbyKey": {
            "keys": [
              "text"
            ],
            "partitionKey": 1
          },
          "jouleDBQueryInterface": {
            "name": "text",
            "enabled": true,
            "initialImage": {},
            "storeName": "text",
            "getInitialImage": true,
            "initialImageQuery": "text",
            "store": {
              "metaData": {
                "description": "text",
                "constantJoinFields": "[Circular Reference]",
                "specification": {
                  "name": "text",
                  "enabled": true
                }
              },
              "storeType": "KEY_VALUE",
              "initialImage": {
                "": {},
                "present": true,
                "empty": true
              }
            },
            "storeType": "KEY_VALUE"
          },
          "metrics": {
            "metricsMap": {
              "ANY_ADDITIONAL_PROPERTY": {
                "": 1,
                "andSet": 1,
                "andIncrement": 1,
                "andDecrement": 1,
                "andAdd": 1,
                "andUpdate": 1,
                "andAccumulate": 1,
                "plain": 1,
                "opaque": 1,
                "acquire": 1
              }
            },
            "metric": 1
          },
          "enabled": true,
          "eventsProcessed": 1,
          "eventsDiscarded": 1,
          "eventsFailed": 1,
          "eventsReceived": 1,
          "eventsIgnored": 1,
          "eventsQueued": 1,
          "previousComputedValue": {},
          "responseDataType": "DOUBLE",
          "expression": "text",
          "language": "text",
          "variables": {
            "ANY_ADDITIONAL_PROPERTY": {}
          }
        },
        "queryPredicate": "text",
        "parameters": [
          "text"
        ],
        "aliase": "text"
      }
    ]
  }
}

No content

List all streams

get

List all streams registered within Joule node

Responses
201
Successful list streams
get
GET /joule/management/stream/list HTTP/1.1
Host: 
Accept: */*

No content

Get stream specification

get

Get the stream processing specification of a registered stream

Query parameters
namestringRequired

Name of stream configuration.

Responses
201
Successful detail provided for stream
application/json
get
GET /joule/management/stream/detail?name=text HTTP/1.1
Host: 
Accept: */*
{
  "name": "text",
  "enabled": true,
  "sources": [
    "text"
  ],
  "bufferSize": 1,
  "checkPointProcessing": true,
  "groupbyKey": {
    "keys": [
      "text"
    ],
    "partitionKey": 1
  },
  "groupByAttributes": [
    "text"
  ],
  "streamTimeType": "INGEST_TIME",
  "propertiesPath": "text",
  "consumerQueueSize": 1,
  "consumingFrequency": 1,
  "publishingQueueSize": 1,
  "publishingFrequency": 1,
  "telemetryAuditingSpecification": {
    "name": "text",
    "enabled": true,
    "rawTelemetrySpecification": {
      "cloneEvents": true,
      "auditFrequency": 1
    },
    "processedTelemetrySpecification": {
      "cloneEvents": true,
      "auditFrequency": 1
    }
  },
  "validFrom": "2025-06-27T06:33:13.733Z",
  "validTo": "2025-06-27T06:33:13.733Z",
  "asyncProcessing": true,
  "initialisationSpecification": {
    "tableImportProcessor": {
      "parquetImportConfigurations": [
        {
          "schema": "text",
          "files": [
            "text"
          ],
          "table": "text",
          "asView": true,
          "index": {
            "schema": "text",
            "fields": [
              "text"
            ],
            "unique": true
          }
        }
      ],
      "schema": "text",
      "name": "text",
      "enabled": true
    },
    "modelImporter": {
      "empty": true,
      "": {},
      "orDefault": {},
      "name": "text",
      "enabled": true
    },
    "name": "text",
    "enabled": true
  },
  "processingUnitSpecification": {
    "name": "text",
    "enabled": true,
    "metricsEngineSpecification": {
      "metricsDefinitionSpecification": {
        "metrics": [
          {
            "name": "text",
            "query": "text",
            "tableDefinition": "text",
            "metricKey": "text",
            "compactionSpecification": {
              "frequency": 1,
              "timeUnit": "NANOSECONDS",
              "userDefinedCompactionQuery": "text"
            },
            "cleanOnStart": true,
            "enabled": true
          }
        ],
        "name": "text",
        "enabled": true
      },
      "policy": {
        "timeUnit": "NANOSECONDS",
        "frequency": 1,
        "startupDelay": 1
      },
      "name": "text",
      "enabled": true
    },
    "processorPipeline": [
      {
        "uUID": "text",
        "metrics": {
          "metricsMap": {
            "ANY_ADDITIONAL_PROPERTY": {
              "": 1,
              "andSet": 1,
              "andIncrement": 1,
              "andDecrement": 1,
              "andAdd": 1,
              "andUpdate": 1,
              "andAccumulate": 1,
              "plain": 1,
              "opaque": 1,
              "acquire": 1
            }
          },
          "metric": 1
        }
      }
    ]
  },
  "selectProcessor": {
    "context": {
      "": {},
      "present": true,
      "empty": true
    },
    "uUID": "text",
    "name": "text",
    "properties": {
      "property": "text",
      "empty": true,
      "": {},
      "orDefault": {}
    },
    "cloneEvent": true,
    "stateful": true,
    "dataStores": {
      "ANY_ADDITIONAL_PROPERTY": {
        "name": "text",
        "enabled": true,
        "initialImage": {},
        "storeName": "text",
        "getInitialImage": true,
        "initialImageQuery": "text",
        "store": {
          "metaData": {
            "description": "text",
            "constantJoinFields": "[Circular Reference]",
            "specification": {
              "name": "text",
              "enabled": true
            }
          },
          "storeType": "KEY_VALUE",
          "initialImage": {
            "": {},
            "present": true,
            "empty": true
          }
        },
        "storeType": "KEY_VALUE"
      }
    },
    "groupbyKey": {
      "keys": [
        "text"
      ],
      "partitionKey": 1
    },
    "jouleDBQueryInterface": {
      "name": "text",
      "enabled": true,
      "initialImage": {},
      "storeName": "text",
      "getInitialImage": true,
      "initialImageQuery": "text",
      "store": {
        "metaData": {
          "description": "text",
          "constantJoinFields": "[Circular Reference]",
          "specification": {
            "name": "text",
            "enabled": true
          }
        },
        "storeType": "KEY_VALUE",
        "initialImage": {
          "": {},
          "present": true,
          "empty": true
        }
      },
      "storeType": "KEY_VALUE"
    },
    "metrics": {
      "metricsMap": {
        "ANY_ADDITIONAL_PROPERTY": {
          "": 1,
          "andSet": 1,
          "andIncrement": 1,
          "andDecrement": 1,
          "andAdd": 1,
          "andUpdate": 1,
          "andAccumulate": 1,
          "plain": 1,
          "opaque": 1,
          "acquire": 1
        }
      },
      "metric": 1
    },
    "enabled": true,
    "eventsProcessed": 1,
    "eventsDiscarded": 1,
    "eventsFailed": 1,
    "eventsReceived": 1,
    "eventsIgnored": 1,
    "eventsQueued": 1,
    "responseEventType": "text",
    "projectionExpression": "text",
    "selectAttributes": [
      {
        "originalExpression": "text",
        "metricFamily": "text",
        "metrics": "text",
        "metricKey": "text",
        "metricTokens": [
          "text"
        ],
        "alias": "text",
        "type": "SEARCH_PATH",
        "searchPath": [
          "text"
        ],
        "mathProcessor": {
          "context": {
            "": {},
            "present": true,
            "empty": true
          },
          "uUID": "text",
          "name": "text",
          "properties": {
            "property": "text",
            "empty": true,
            "": {},
            "orDefault": {}
          },
          "cloneEvent": true,
          "stateful": true,
          "dataStores": {
            "ANY_ADDITIONAL_PROPERTY": "[Circular Reference]"
          },
          "groupbyKey": {
            "keys": [
              "text"
            ],
            "partitionKey": 1
          },
          "jouleDBQueryInterface": {
            "name": "text",
            "enabled": true,
            "initialImage": {},
            "storeName": "text",
            "getInitialImage": true,
            "initialImageQuery": "text",
            "store": {
              "metaData": {
                "description": "text",
                "constantJoinFields": "[Circular Reference]",
                "specification": {
                  "name": "text",
                  "enabled": true
                }
              },
              "storeType": "KEY_VALUE",
              "initialImage": {
                "": {},
                "present": true,
                "empty": true
              }
            },
            "storeType": "KEY_VALUE"
          },
          "metrics": {
            "metricsMap": {
              "ANY_ADDITIONAL_PROPERTY": {
                "": 1,
                "andSet": 1,
                "andIncrement": 1,
                "andDecrement": 1,
                "andAdd": 1,
                "andUpdate": 1,
                "andAccumulate": 1,
                "plain": 1,
                "opaque": 1,
                "acquire": 1
              }
            },
            "metric": 1
          },
          "enabled": true,
          "eventsProcessed": 1,
          "eventsDiscarded": 1,
          "eventsFailed": 1,
          "eventsReceived": 1,
          "eventsIgnored": 1,
          "eventsQueued": 1,
          "previousComputedValue": {},
          "responseDataType": "DOUBLE",
          "expression": "text",
          "language": "text",
          "variables": {
            "ANY_ADDITIONAL_PROPERTY": {}
          }
        },
        "queryPredicate": "text",
        "parameters": [
          "text"
        ],
        "aliase": "text"
      }
    ]
  }
}

Unregister stream

delete

Unregister a stream processing specification from platform

Query parameters
namestringRequired

Name of stream to unregistered

Example: quoteStream
Responses
201
Successfully unregistered stream
delete
DELETE /joule/management/stream/unregister?name=quoteStream HTTP/1.1
Host: 
Accept: */*

No content

Last updated

Was this helpful?