API


Currently the monitoring API is limited to only publishing counter metrics using the following method

Metrics API

Every component within the Joule system has the ability to be monitored. This means any custom processor, transport or storage component inherits this feature within the component context.

Available Methods

// Increment a metric by 1
void incrementMetric(Metric metric)
// Increment a metric by a delta
void incrementMetric(Metric metric, int delta)
// Decrement a metric by 1
void decrementMetric(Metric metric)
// Increment a metric by a delta
void decrementMetric(Metric metric, int delta)
// Initialises a metric to a given start value
void setMetric(Metric metric, long value)

Metric

This is an enum data type which represents various classes of metrics which can be tracked over time.

// Component
RECEIVED("received"),
PROCESSED("processed"),
PROCESS_FAILED("process_failed"),
DISCARDED("discard"), QUEUED_EVENT("queued_event"),
AVG_LATENCY("averageLatency"),
IGNORED("ignored"),

// Data stores
READ_STORE("read"), WRITE_STORE("write"),
BYTES_READ("bytes_read"), BYTES_WRITE("bytes_write"),

// Processing stream
START_TIME("start_time"), SHUTDOWN_TIME("stop_time")
SUSPENDED_TIME("suspended_time"), RESUMED_TIME("resumed_time")

Example

Using the filter processor as an example on how metrics are applied

try(var filterContext = prepareEnvironment(event, context)){
    var val = filterContext.eval(expressionSource);
    if (!val.isNull() && val.isBoolean() && val.asBoolean()) {
        metrics.incrementMetric(Metric.PROCESSED);
    } else {
        metrics.incrementMetric(Metric.DISCARDED);
        return null;
    }
} catch (Exception e) {
    metrics.incrementMetric(Metric.PROCESS_FAILED);
    throw new StreamsException("Failed processing ....", e);
}
return event;

Last updated