Pipelines
Register, list, detail and unregister stream processing pipelines
Overview
Streams are the backbone of all processing. Joule Management API provides the ability to register, list, detail and undeploy stream pipelines.
For further information on the structure of the content payload refer to this documentation.
Example stream content
{
  "stream": {
    "name": "basic_tumbling_window_pipeline",
    "enabled": true,
    "eventTimeType": "EVENT_TIME",
    "sources": [
      "nasdaq_quotes_stream"
    ],
    "processing unit": {
      "pipeline": [
        {
          "time window": {
            "emitting type": "tumblingQuoteAnalytics",
            "aggregate functions": {
              "FIRST": [
                "ask"
              ],
              "LAST": [
                "ask"
              ]
            },
            "policy": {
              "type": "tumblingTime",
              "window size": 1000
            }
          }
        }
      ]
    },
    "emit": {
      "select": "symbol, ask_FIRST, ask_LAST"
    },
    "group by": [
      "symbol"
    ]
  }
}Register a use case stream and validate it before starting up
Successful registered stream
No content
Failed to register stream due to malformed or missing DSL
Invalid stream definition
Stream has already been registered
POST /joule/management/stream/register HTTP/1.1
Host: 
Content-Type: application/json
Accept: */*
Content-Length: 4470
{
  "name": "text",
  "enabled": true,
  "sources": [
    "text"
  ],
  "bufferSize": 1,
  "checkPointProcessing": true,
  "groupbyKey": {
    "keys": [
      "text"
    ],
    "partitionKey": 1
  },
  "groupByAttributes": [
    "text"
  ],
  "streamTimeType": "INGEST_TIME",
  "propertiesPath": "text",
  "consumerQueueSize": 1,
  "consumingFrequency": 1,
  "publishingQueueSize": 1,
  "publishingFrequency": 1,
  "telemetryAuditingSpecification": {
    "name": "text",
    "enabled": true,
    "rawTelemetrySpecification": {
      "cloneEvents": true,
      "auditFrequency": 1
    },
    "processedTelemetrySpecification": {
      "cloneEvents": true,
      "auditFrequency": 1
    }
  },
  "validFrom": "2025-10-30T20:48:33.497Z",
  "validTo": "2025-10-30T20:48:33.497Z",
  "asyncProcessing": true,
  "initialisationSpecification": {
    "tableImportProcessor": {
      "parquetImportConfigurations": [
        {
          "schema": "text",
          "files": [
            "text"
          ],
          "table": "text",
          "asView": true,
          "index": {
            "schema": "text",
            "fields": [
              "text"
            ],
            "unique": true
          }
        }
      ],
      "schema": "text",
      "name": "text",
      "enabled": true
    },
    "modelImporter": {
      "empty": true,
      "": {},
      "orDefault": {},
      "name": "text",
      "enabled": true
    },
    "name": "text",
    "enabled": true
  },
  "processingUnitSpecification": {
    "name": "text",
    "enabled": true,
    "metricsEngineSpecification": {
      "metricsDefinitionSpecification": {
        "metrics": [
          {
            "name": "text",
            "query": "text",
            "tableDefinition": "text",
            "metricKey": "text",
            "compactionSpecification": {
              "frequency": 1,
              "timeUnit": "NANOSECONDS",
              "userDefinedCompactionQuery": "text"
            },
            "cleanOnStart": true,
            "enabled": true
          }
        ],
        "name": "text",
        "enabled": true
      },
      "policy": {
        "timeUnit": "NANOSECONDS",
        "frequency": 1,
        "startupDelay": 1
      },
      "name": "text",
      "enabled": true
    },
    "processorPipeline": [
      {
        "uUID": "text",
        "metrics": {
          "metricsMap": {
            "ANY_ADDITIONAL_PROPERTY": {
              "": 1,
              "andSet": 1,
              "andIncrement": 1,
              "andDecrement": 1,
              "andAdd": 1,
              "andUpdate": 1,
              "andAccumulate": 1,
              "plain": 1,
              "opaque": 1,
              "acquire": 1
            }
          },
          "metric": 1
        }
      }
    ]
  },
  "selectProcessor": {
    "context": {
      "": {},
      "present": true,
      "empty": true
    },
    "uUID": "text",
    "name": "text",
    "properties": {
      "property": "text",
      "empty": true,
      "": {},
      "orDefault": {}
    },
    "cloneEvent": true,
    "stateful": true,
    "dataStores": {
      "ANY_ADDITIONAL_PROPERTY": {
        "name": "text",
        "enabled": true,
        "initialImage": {},
        "storeName": "text",
        "getInitialImage": true,
        "initialImageQuery": "text",
        "store": {
          "metaData": {
            "description": "text",
            "constantJoinFields": "[Circular Reference]",
            "specification": {
              "name": "text",
              "enabled": true
            }
          },
          "storeType": "KEY_VALUE",
          "initialImage": {
            "": {},
            "present": true,
            "empty": true
          }
        },
        "storeType": "KEY_VALUE"
      }
    },
    "groupbyKey": {
      "keys": [
        "text"
      ],
      "partitionKey": 1
    },
    "jouleDBQueryInterface": {
      "name": "text",
      "enabled": true,
      "initialImage": {},
      "storeName": "text",
      "getInitialImage": true,
      "initialImageQuery": "text",
      "store": {
        "metaData": {
          "description": "text",
          "constantJoinFields": "[Circular Reference]",
          "specification": {
            "name": "text",
            "enabled": true
          }
        },
        "storeType": "KEY_VALUE",
        "initialImage": {
          "": {},
          "present": true,
          "empty": true
        }
      },
      "storeType": "KEY_VALUE"
    },
    "metrics": {
      "metricsMap": {
        "ANY_ADDITIONAL_PROPERTY": {
          "": 1,
          "andSet": 1,
          "andIncrement": 1,
          "andDecrement": 1,
          "andAdd": 1,
          "andUpdate": 1,
          "andAccumulate": 1,
          "plain": 1,
          "opaque": 1,
          "acquire": 1
        }
      },
      "metric": 1
    },
    "enabled": true,
    "eventsProcessed": 1,
    "eventsDiscarded": 1,
    "eventsFailed": 1,
    "eventsReceived": 1,
    "eventsIgnored": 1,
    "eventsQueued": 1,
    "responseEventType": "text",
    "projectionExpression": "text",
    "selectAttributes": [
      {
        "originalExpression": "text",
        "metricFamily": "text",
        "metrics": "text",
        "metricKey": "text",
        "metricTokens": [
          "text"
        ],
        "alias": "text",
        "type": "SEARCH_PATH",
        "searchPath": [
          "text"
        ],
        "mathProcessor": {
          "context": {
            "": {},
            "present": true,
            "empty": true
          },
          "uUID": "text",
          "name": "text",
          "properties": {
            "property": "text",
            "empty": true,
            "": {},
            "orDefault": {}
          },
          "cloneEvent": true,
          "stateful": true,
          "dataStores": {
            "ANY_ADDITIONAL_PROPERTY": "[Circular Reference]"
          },
          "groupbyKey": {
            "keys": [
              "text"
            ],
            "partitionKey": 1
          },
          "jouleDBQueryInterface": {
            "name": "text",
            "enabled": true,
            "initialImage": {},
            "storeName": "text",
            "getInitialImage": true,
            "initialImageQuery": "text",
            "store": {
              "metaData": {
                "description": "text",
                "constantJoinFields": "[Circular Reference]",
                "specification": {
                  "name": "text",
                  "enabled": true
                }
              },
              "storeType": "KEY_VALUE",
              "initialImage": {
                "": {},
                "present": true,
                "empty": true
              }
            },
            "storeType": "KEY_VALUE"
          },
          "metrics": {
            "metricsMap": {
              "ANY_ADDITIONAL_PROPERTY": {
                "": 1,
                "andSet": 1,
                "andIncrement": 1,
                "andDecrement": 1,
                "andAdd": 1,
                "andUpdate": 1,
                "andAccumulate": 1,
                "plain": 1,
                "opaque": 1,
                "acquire": 1
              }
            },
            "metric": 1
          },
          "enabled": true,
          "eventsProcessed": 1,
          "eventsDiscarded": 1,
          "eventsFailed": 1,
          "eventsReceived": 1,
          "eventsIgnored": 1,
          "eventsQueued": 1,
          "previousComputedValue": {},
          "responseDataType": "DOUBLE",
          "expression": "text",
          "language": "text",
          "variables": {
            "ANY_ADDITIONAL_PROPERTY": {}
          }
        },
        "queryPredicate": "text",
        "parameters": [
          "text"
        ],
        "aliase": "text"
      }
    ]
  }
}No content
Get the stream processing specification of a registered stream
Name of stream configuration.
Successful detail provided for stream
Stream not found
Internal Joule error. Check log files.
Unknown stream.
GET /joule/management/stream/detail?name=text HTTP/1.1
Host: 
Accept: */*
{
  "name": "text",
  "enabled": true,
  "sources": [
    "text"
  ],
  "bufferSize": 1,
  "checkPointProcessing": true,
  "groupbyKey": {
    "keys": [
      "text"
    ],
    "partitionKey": 1
  },
  "groupByAttributes": [
    "text"
  ],
  "streamTimeType": "INGEST_TIME",
  "propertiesPath": "text",
  "consumerQueueSize": 1,
  "consumingFrequency": 1,
  "publishingQueueSize": 1,
  "publishingFrequency": 1,
  "telemetryAuditingSpecification": {
    "name": "text",
    "enabled": true,
    "rawTelemetrySpecification": {
      "cloneEvents": true,
      "auditFrequency": 1
    },
    "processedTelemetrySpecification": {
      "cloneEvents": true,
      "auditFrequency": 1
    }
  },
  "validFrom": "2025-10-30T20:48:33.497Z",
  "validTo": "2025-10-30T20:48:33.497Z",
  "asyncProcessing": true,
  "initialisationSpecification": {
    "tableImportProcessor": {
      "parquetImportConfigurations": [
        {
          "schema": "text",
          "files": [
            "text"
          ],
          "table": "text",
          "asView": true,
          "index": {
            "schema": "text",
            "fields": [
              "text"
            ],
            "unique": true
          }
        }
      ],
      "schema": "text",
      "name": "text",
      "enabled": true
    },
    "modelImporter": {
      "empty": true,
      "": {},
      "orDefault": {},
      "name": "text",
      "enabled": true
    },
    "name": "text",
    "enabled": true
  },
  "processingUnitSpecification": {
    "name": "text",
    "enabled": true,
    "metricsEngineSpecification": {
      "metricsDefinitionSpecification": {
        "metrics": [
          {
            "name": "text",
            "query": "text",
            "tableDefinition": "text",
            "metricKey": "text",
            "compactionSpecification": {
              "frequency": 1,
              "timeUnit": "NANOSECONDS",
              "userDefinedCompactionQuery": "text"
            },
            "cleanOnStart": true,
            "enabled": true
          }
        ],
        "name": "text",
        "enabled": true
      },
      "policy": {
        "timeUnit": "NANOSECONDS",
        "frequency": 1,
        "startupDelay": 1
      },
      "name": "text",
      "enabled": true
    },
    "processorPipeline": [
      {
        "uUID": "text",
        "metrics": {
          "metricsMap": {
            "ANY_ADDITIONAL_PROPERTY": {
              "": 1,
              "andSet": 1,
              "andIncrement": 1,
              "andDecrement": 1,
              "andAdd": 1,
              "andUpdate": 1,
              "andAccumulate": 1,
              "plain": 1,
              "opaque": 1,
              "acquire": 1
            }
          },
          "metric": 1
        }
      }
    ]
  },
  "selectProcessor": {
    "context": {
      "": {},
      "present": true,
      "empty": true
    },
    "uUID": "text",
    "name": "text",
    "properties": {
      "property": "text",
      "empty": true,
      "": {},
      "orDefault": {}
    },
    "cloneEvent": true,
    "stateful": true,
    "dataStores": {
      "ANY_ADDITIONAL_PROPERTY": {
        "name": "text",
        "enabled": true,
        "initialImage": {},
        "storeName": "text",
        "getInitialImage": true,
        "initialImageQuery": "text",
        "store": {
          "metaData": {
            "description": "text",
            "constantJoinFields": "[Circular Reference]",
            "specification": {
              "name": "text",
              "enabled": true
            }
          },
          "storeType": "KEY_VALUE",
          "initialImage": {
            "": {},
            "present": true,
            "empty": true
          }
        },
        "storeType": "KEY_VALUE"
      }
    },
    "groupbyKey": {
      "keys": [
        "text"
      ],
      "partitionKey": 1
    },
    "jouleDBQueryInterface": {
      "name": "text",
      "enabled": true,
      "initialImage": {},
      "storeName": "text",
      "getInitialImage": true,
      "initialImageQuery": "text",
      "store": {
        "metaData": {
          "description": "text",
          "constantJoinFields": "[Circular Reference]",
          "specification": {
            "name": "text",
            "enabled": true
          }
        },
        "storeType": "KEY_VALUE",
        "initialImage": {
          "": {},
          "present": true,
          "empty": true
        }
      },
      "storeType": "KEY_VALUE"
    },
    "metrics": {
      "metricsMap": {
        "ANY_ADDITIONAL_PROPERTY": {
          "": 1,
          "andSet": 1,
          "andIncrement": 1,
          "andDecrement": 1,
          "andAdd": 1,
          "andUpdate": 1,
          "andAccumulate": 1,
          "plain": 1,
          "opaque": 1,
          "acquire": 1
        }
      },
      "metric": 1
    },
    "enabled": true,
    "eventsProcessed": 1,
    "eventsDiscarded": 1,
    "eventsFailed": 1,
    "eventsReceived": 1,
    "eventsIgnored": 1,
    "eventsQueued": 1,
    "responseEventType": "text",
    "projectionExpression": "text",
    "selectAttributes": [
      {
        "originalExpression": "text",
        "metricFamily": "text",
        "metrics": "text",
        "metricKey": "text",
        "metricTokens": [
          "text"
        ],
        "alias": "text",
        "type": "SEARCH_PATH",
        "searchPath": [
          "text"
        ],
        "mathProcessor": {
          "context": {
            "": {},
            "present": true,
            "empty": true
          },
          "uUID": "text",
          "name": "text",
          "properties": {
            "property": "text",
            "empty": true,
            "": {},
            "orDefault": {}
          },
          "cloneEvent": true,
          "stateful": true,
          "dataStores": {
            "ANY_ADDITIONAL_PROPERTY": "[Circular Reference]"
          },
          "groupbyKey": {
            "keys": [
              "text"
            ],
            "partitionKey": 1
          },
          "jouleDBQueryInterface": {
            "name": "text",
            "enabled": true,
            "initialImage": {},
            "storeName": "text",
            "getInitialImage": true,
            "initialImageQuery": "text",
            "store": {
              "metaData": {
                "description": "text",
                "constantJoinFields": "[Circular Reference]",
                "specification": {
                  "name": "text",
                  "enabled": true
                }
              },
              "storeType": "KEY_VALUE",
              "initialImage": {
                "": {},
                "present": true,
                "empty": true
              }
            },
            "storeType": "KEY_VALUE"
          },
          "metrics": {
            "metricsMap": {
              "ANY_ADDITIONAL_PROPERTY": {
                "": 1,
                "andSet": 1,
                "andIncrement": 1,
                "andDecrement": 1,
                "andAdd": 1,
                "andUpdate": 1,
                "andAccumulate": 1,
                "plain": 1,
                "opaque": 1,
                "acquire": 1
              }
            },
            "metric": 1
          },
          "enabled": true,
          "eventsProcessed": 1,
          "eventsDiscarded": 1,
          "eventsFailed": 1,
          "eventsReceived": 1,
          "eventsIgnored": 1,
          "eventsQueued": 1,
          "previousComputedValue": {},
          "responseDataType": "DOUBLE",
          "expression": "text",
          "language": "text",
          "variables": {
            "ANY_ADDITIONAL_PROPERTY": {}
          }
        },
        "queryPredicate": "text",
        "parameters": [
          "text"
        ],
        "aliase": "text"
      }
    ]
  }
}Unregister a stream processing specification from platform
Name of stream to unregistered
quoteStreamSuccessfully unregistered stream
No content
Failed to unregister stream . Check Joule service log files.
Unknown stream.
Stream name must be provided.
DELETE /joule/management/stream/unregister?name=quoteStream HTTP/1.1
Host: 
Accept: */*
No content
Last updated
Was this helpful?
