Time based metrics

The metrics engine can be leveraged to provide greater insights over a larger data set. These metrics can be used within custom processors to drive complex use case


This use case provides a simple demonstration on how metrics generated by the internal metrics engine can be emitted to connected publisher sinks using the select projection process.

Use case configuration

File: app-metrics-file.env

SOURCEFILE=conf/sources/stockQuoteStream.yaml
ENGINEFILE=conf/usecases/metricsAnalyticsExample.yaml
PUBLISHFILE=conf/publishers/fileStandardAnalytics.yaml

Pipeline configuration

The pipeline configures the metrics computation cycle of 1 minute, metrics to be generated, filtering and the select expression which emits the group by metrics to the connected publishing sink.

stream:
  name: standardQuoteAnalyticsStream
  enabled: true
  eventTimeType: EVENT_TIME
  sources: [ nasdaq_quotes_stream ]

  processing unit:
    metrics engine:
      runtime policy:
        frequency: 1
        startup delay: 1
        time unit: MINUTES

      foreach metric compute:
        metrics:
          -
            name: BidMovingAverage
            metric key: symbol
            table definition: bid_moving_averages 
                              (symbol VARCHAR, avg_bid_min FLOAT, 
                              avg_bid_avg FLOAT,avg_bid_max FLOAT,
                              createdTimestamp TIMESTAMP)
            query:
              SELECT symbol,
              MIN(bid) AS 'avg_bid_min',
              AVG(bid) AS 'avg_bid_avg',
              MAX(bid) AS 'avg_bid_max'
              FROM quotes.nasdaq
              WHERE
              ingestTime >= epoch_ms(date_trunc('minutes',now() - INTERVAL 3 MINUTES)) AND ingestTime <= epoch_ms(now())
              GROUP BY symbol
              ORDER BY 1;
            truncate on start: true
            compaction policy:
              frequency: 8
              time unit: HOURS

    pipeline:
      - tap:
          target schema: quotes
          flush frequency: 5
          index:
            unique: false
            fields:
              - symbol

  emit:
    select: "symbol, BidMovingAverage.avg_bid_max;WHERE symbol=${symbol} 'avg_bid_max'"

  group by:
    - symbol

Last updated