Metrics API

Tracks component events and data flow with counters

Overview

Joule provides a simple Monitoring API, which tracks various operational metrics by incrementing or decrementing counters for different components, including custom processors, connectors and storage components.

Currently the monitoring API is limited to only publishing counter metrics using the below method

Metrics API

Every component within the Joule system has the ability to be monitored. This means any custom processor, transport or storage component inherits this feature within the component context.

The API includes methods to adjust counters for metrics (i.e., increment or decrement a value, set an initial value).

Available Methods

// Increment a metric by 1
void incrementMetric(Metric metric)
// Increment a metric by a delta
void incrementMetric(Metric metric, int delta)
// Decrement a metric by 1
void decrementMetric(Metric metric)
// Increment a metric by a delta
void decrementMetric(Metric metric, int delta)
// Initialises a metric to a given start value
void setMetric(Metric metric, long value)

Metric types

Metrics cover different system aspects:

  1. Components Tracks counts for events received, processed, failed, discarded, etc.

  2. Data stores Records data read / write operations and byte volume.

  3. Processing stream Logs start, shutdown, suspension and resumption times.

This is an enum data type which represents various classes of metrics which can be tracked over time.

// Component
RECEIVED("received"),
PROCESSED("processed"),
PROCESS_FAILED("process_failed"),
DISCARDED("discard"), QUEUED_EVENT("queued_event"),
AVG_LATENCY("averageLatency"),
IGNORED("ignored"),

// Data stores
READ_STORE("read"), WRITE_STORE("write"),
BYTES_READ("bytes_read"), BYTES_WRITE("bytes_write"),

// Processing stream
START_TIME("start_time"), SHUTDOWN_TIME("stop_time")
SUSPENDED_TIME("suspended_time"), RESUMED_TIME("resumed_time")

Example

This example demonstrates how to apply metrics in a filter processor, incrementing specific counters based on processing outcomes.

try(var filterContext = prepareEnvironment(event, context)){
    var val = filterContext.eval(expressionSource);
    if (!val.isNull() && val.isBoolean() && val.asBoolean()) {
        // PROCESSED METRIC
        metrics.incrementMetric(Metric.PROCESSED);
    } else {
        // DISCARDED METRIC    
        metrics.incrementMetric(Metric.DISCARDED);
        return null;
    }
} catch (Exception e) {
    // FAILED METRIC
    metrics.incrementMetric(Metric.PROCESS_FAILED);
    throw new StreamsException("Failed processing ....", e);
}
return event;

Last updated