Time based metrics
The metrics engine can be leveraged to provide greater insights over a larger data set. These metrics can be used within custom processors to drive complex use case
This use case provides a simple demonstration on how metrics generated by the internal metrics engine can be emitted to connected publisher sinks using the select projection process.
Use case configuration
File: app-metrics-file.env
SOURCEFILE=conf/sources/stockQuoteStream.yaml
ENGINEFILE=conf/usecases/metricsAnalyticsExample.yaml
PUBLISHFILE=conf/publishers/fileStandardAnalytics.yaml
Pipeline configuration
The pipeline configures the metrics computation cycle of 1 minute, metrics to be generated, filtering and the select expression which emits the group by metrics to the connected publishing sink.
stream:
name: standardQuoteAnalyticsStream
enabled: true
eventTimeType: EVENT_TIME
sources: [ nasdaq_quotes_stream ]
processing unit:
metrics engine:
runtime policy:
frequency: 1
startup delay: 1
time unit: MINUTES
foreach metric compute:
metrics:
-
name: BidMovingAverage
metric key: symbol
table definition: bid_moving_averages
(symbol VARCHAR, avg_bid_min FLOAT,
avg_bid_avg FLOAT,avg_bid_max FLOAT,
createdTimestamp TIMESTAMP)
query:
SELECT symbol,
MIN(bid) AS 'avg_bid_min',
AVG(bid) AS 'avg_bid_avg',
MAX(bid) AS 'avg_bid_max'
FROM quotes.nasdaq
WHERE
ingestTime >= epoch_ms(date_trunc('minutes',now() - INTERVAL 3 MINUTES)) AND ingestTime <= epoch_ms(now())
GROUP BY symbol
ORDER BY 1;
truncate on start: true
compaction policy:
frequency: 8
time unit: HOURS
pipeline:
- tap:
target schema: quotes
flush frequency: 5
index:
unique: false
fields:
- symbol
emit:
select: "symbol, BidMovingAverage.avg_bid_max;WHERE symbol=${symbol} 'avg_bid_max'"
group by:
- symbol
Last updated
Was this helpful?