Metrics API
Tracks component events and data flow with counters
Overview
Joule provides a simple Monitoring API, which tracks various operational metrics by incrementing or decrementing counters for different components, including custom processors, connectors and storage components.
Metrics API
Every component within the Joule system has the ability to be monitored. This means any custom processor, transport or storage component inherits this feature within the component context.
The API includes methods to adjust counters for metrics (i.e., increment or decrement a value, set an initial value).
Available Methods
// Increment a metric by 1
void incrementMetric(Metric metric)
// Increment a metric by a delta
void incrementMetric(Metric metric, int delta)
// Decrement a metric by 1
void decrementMetric(Metric metric)
// Increment a metric by a delta
void decrementMetric(Metric metric, int delta)
// Initialises a metric to a given start value
void setMetric(Metric metric, long value)
Metric types
Metrics cover different system aspects:
Components Tracks counts for events received, processed, failed, discarded, etc.
Data stores Records data read / write operations and byte volume.
Processing stream Logs start, shutdown, suspension and resumption times.
This is an enum
data type which represents various classes of metrics which can be tracked over time.
// Component
RECEIVED("received"),
PROCESSED("processed"),
PROCESS_FAILED("process_failed"),
DISCARDED("discard"), QUEUED_EVENT("queued_event"),
AVG_LATENCY("averageLatency"),
IGNORED("ignored"),
// Data stores
READ_STORE("read"), WRITE_STORE("write"),
BYTES_READ("bytes_read"), BYTES_WRITE("bytes_write"),
// Processing stream
START_TIME("start_time"), SHUTDOWN_TIME("stop_time")
SUSPENDED_TIME("suspended_time"), RESUMED_TIME("resumed_time")
Example
This example demonstrates how to apply metrics in a filter processor, incrementing specific counters based on processing outcomes.
try(var filterContext = prepareEnvironment(event, context)){
var val = filterContext.eval(expressionSource);
if (!val.isNull() && val.isBoolean() && val.asBoolean()) {
// PROCESSED METRIC
metrics.incrementMetric(Metric.PROCESSED);
} else {
// DISCARDED METRIC
metrics.incrementMetric(Metric.DISCARDED);
return null;
}
} catch (Exception e) {
// FAILED METRIC
metrics.incrementMetric(Metric.PROCESS_FAILED);
throw new StreamsException("Failed processing ....", e);
}
return event;
Last updated
Was this helpful?