Monitoring Pipeline Data Transformation

Understanding the SPEKTRA Edge monitoring pipeline data transformation.

Monitoring TimeSerie’s specialty is that all data is numerical, and we should be able to query various stats like percentiles, means, counts, and rates. Monitoring allows to merging of data points using the aggregation param:

  • Within a single TimeSerie object, by specifying alignmnentPeriod and perSeriesAligner.
  • Across multiple TimeSerie objects, by specifying crossSeriesReducer with groupBy fields

Merging TimeSerie in either way is a costly operation, for this reason, Monitoring is merging in the background, then saving merged values. As of the moment of this writing, the following apply:

  • Raw TimeSerie points as submitted by clients are not being saved at all in the time series storage. There is a setting in MetricDescriptor which causes data to be saved, but nobody is practically using it.
  • Raw TimeSerie points are accumulated and Monitoring computes merged points for all supported alignment periods, ranging from one minute to one day.
  • Monitoring saves aligned TimeSerie points in the background, each column family gets different data for different APs.
  • When Monitoring merges TimeSerie points within TimeSerie, it forms Distribution values. This is because things like mean value, standard deviation, count, min, max, etc., all can be derived from distribution. It makes no sense to have separate saves per various ALIGN_* types, we can store ALIGN_SUMMARY. This is why when we query for data, we switch to a different type. There is some exception though, but we will describe it later in this document.
  • Distribution has many various supported types: Explicit, Dynamic, Linear, and Exponential. The type used depends on the original type indicated by MetricDescriptor.
  • As of now, monitoring does not merge TimeSeries across their keys (pre-aggregation). Unlike merging within TimeSerie, where we know all alignment periods, it is a bit more difficult, tasks for pre-aggregating data are still not finished, but on the roadmap. This is why, if you study file store_querying.go for TimeSeries, you will see we do it entirely in RAM (we pass reducing mask, then we use reducer module from monitoring/ts_reducer/v4).

When raw points are sent by the client, the following happens:

  • Server “saves” TimeSerie in the storage, but storage will quietly ignore all the raw points - it will still allocate uid.Key based on all the metric/resource types and labels forming uid.SKey, but will not save points.
  • Server will publish TimeSeries to a special TimeSerie stream queue, currently, we support Kafka or PubSub (GCP only). This queue is the “raw metrics” one.
  • We have a special monitoring streaming pipeline, that reads raw TimeSeries from the raw metrics queue and makes per-series accumulations. This is a 24/7 background job. It produces accumulated values and sends them back to the stream queue (on a different topic, server rollups).
  • Monitoring has a special module, rollup connector, that reads accumulated points from the server rollups queue and writes to TimeSeries storage. This module currently resides in Monitoring DbController.

Now, regarding code pointers:

  • See file monitoring/server/v4/time_serie/time_serie_service.go. After we “save” the time series, we are publishing it with function publishRawTimeSeries.
  • See file monitoring/ts_streaming/v4/publisher.go, it is what the server uses when publishing raw time series. In the config for the API server, you can see: monitoring/config/apiserver.proto, and option raw_metrics_publisher.
  • The streaming job is completely separate, it is even written in a different language, Java. Currently, we use Flink or Google Dataflow (Apache beam).
  • See the monitoring/cmd/monitoringdbcontroller/main.go file. Apart from standard DbController modules (syncing and constraint controllers, we have extra components). See runV4Connectors function, which is currently leading. There, we create a rollup subscription (variable rollupSubscription), which is where the Monitoring streaming job writes to! Then we create realTimeConnector, which continuously reads from the stream and writes to time series storage. The real-time connector is the crucial component here, file monitoring/rollup_connector/connectors/v2/realtime_connector.go.
  • Subscriber interface: monitoring/ts_streaming/v4/subscriber.go.

Here is the simplified pipeline flow:

Apart from ordered steps, there is a monitoring controller, which generates Phantom TimeSeries points (once per minute). This is also a kind of raw metric, so the monitoring streaming job combines those two data sources when generating the final time series. You can see the code here: monitoring/controller/v4/pts_publisher/pts_publisher.go. It generates once-per-minute phantom metrics (OnGeneratePhantomMetrics), which on separate goroutines are published (function Run).

Depending on whether the SPEKTRA Edge cluster runs on GCP or somewhere else, we use different technologies and the architecture is even a bit different:

  • On GCP, we use PubSub for all queues, otherwise, we use Kafka.
  • In PubSub, we don’t have still message ordering, therefore, messages can be delivered to the streaming job or rollup connector out of the original order. Kafka is ordering messages per partition, but we utilize this feature only for raw metrics queue, we are using TimeSerie Key to compute the partition where data will be sent. This way we guarantee that data for a single Key will always be ordered. This guarantee is reinforced by the fact that the Monitoring API Server requires all data to arrive timestamp ordered, this is part of the TimeSeries spec!
  • For PubSub, we have two topics for raw metrics, one topic is for regular raw time series, other topic is for phantom used by the monitoring controller. For Kafka, we write all metrics to one Kafka topic (simpler design).
  • In GCP, we use Google Dataflow, and the job is written in the Apache beam framework. Code location in SPEKTRA Edge repository: src/main/java/com/ntt/monitoring/beam/pipelines/MonitoringRealtimePipeliene.java.
  • In non-GCP, we use Apache Flink, code location in SPEKTRA Edge repo: monitoring/streaming_jobs/flink-realtime/src/main/java/com/monitoring/pipelines/RealtimePipeline.java.
  • When the Streaming job publishes aligned TimeSeries to rollups topic (Kafka or pub-sub), we are accumulating data in batches, and send randomly to some partition.
  • In PubSub, messages are acknowledged individually, in Kafka we acknowledge when all previous messages are acknowledged. This is because in Kafka we have data ordering.

Both Kafka and PubSub queues work on at least once principle: We guarantee data will be sent at least once, but if there is some failure, it will be retried. If there was a failure when confirming successful delivery, then the message will be delivered twice in fact. The streaming job has no issues here: If it detects the same TimeSerie with the same Key and same timestamp, then it will discard the duplicate. The Rollup connector just saves a snapshot of data to TimeSeries storage. If it writes twice, it will simply overwrite one value with the same value. The minimal requirement for whole monitoring is AT LEAST ONCE principle. This is why RollupConnector only acknowledges messages after successful processing! Otherwise, we would risk losing some data.

Monitoring jobs (Flink or Dataflow) also operate on at least one principle. They have deduplication mechanisms encoded, so we don’t need to worry about raw metrics publishers’ side. They are stateful jobs: Both Dataflow and Flink use persistent storage to accumulate all states (per each TimeSerie Key). They are “acking” messages from raw metrics topics only after processing them (when they incorporate data points into their state). This is happening from the “end” of the job (around rollups topic) to the front (raw metrics topics): when the streaming job successfully writes data to the rollups topic, then it sends backward a signal about progress, which propagates through all the stages. Streaming job “acks” raw metrics topic last! During this whole process state data. In case of a crash, it will reprocess all data from the last recovery point, we are guaranteed to not lose data thanks to synchronized syncing. We may get duplicated points though in the rollup connector, but as said, we are prepared for that.

In the streaming job, we are calculating the first points with an alignment period of 1 minute:

  • We accumulate points for specified Key (phantom and real points)
  • If we got phantom points only, we will use phantom data.
  • If we got real, we will discard phantom points if present.
  • We try to output point with 1 minute AP soon after minute passes if possible. For example, if we hit 12:06:00, then we will output the point with timestamp 12:06:00 as soon as possible. However, we need to wait a bit more time, there is a chance we will receive let’s say (example) data point with timestamp 12:05:59 at 12:06:07.
  • Because of possible late arrivals, output for HH:MM:00 we will send in range HH:MM:20-HH:MM:40.
  • If we get points AFTER we output the aligned data point, the streaming job sends a new corrected value!

Further data points are accumulated further:

  • Using one minute of accumulated points, we can combine: three minute and five minute points
  • Using five minute accumulated points, we can generate 15 minute point
  • Using 15 minute accumulated points, we can generate 30 minute point
  • Using 30 minute accumulated points, we can generate one hour point
  • Using one hour of accumulated points, we can generate a three hour point
  • Using three hour accumulated points, we can generate six hour point
  • Using six hour accumulated points, we can generate 12 hour point
  • Using 12 hour accumulated points, we can generate one day point
  • In case of correction (late arrivals), we will trigger a re-computation of all “downstream” points.

Streaming job publishes usually Distribution points because those are the most generic and can provide other types. Basically we have the following mappings:

  • When MetricDescriptor has {valueType: INT64/DOUBLE, metricKind: GAUGE/DELTA}, then output points use DISTRIBUTION output values and distribution type is Dynamic, with compression = 100.0, PerSeriesAligner,ALIGN_SUMMARY.
  • When MetricDescriptor has {valueType: DISTRIBUTION, metricKind: GAUGE/DELTA}, then output points use Distribution output values, and distribution type is the same as described in MetricDescriptor, field distribution_bucket_options, PerSeriesAligner, ALIGN_SUMMARY.
  • When MetricDescriptor has {valueType: INT64/DOUBLE, metricKind: CUMULATIVE}, then we produce actually three output points per each key and timestamp:
    • Distribution Dynamic type, with compression = 100.0, PerSeriesAligner, ALIGN_SUMMARY.
    • Double type, PerSeriesAligner, ALIGN_RATE
  • Integer/Double type (depending on original valueType), PerSeriesAligner, ALIGN_DELTA
  • MetricDescriptor cannot specify CUMULATIVE kind with DISTRIBUTION value type.
  • We do not support BOOL value types (but we will, it is similar to INT64). We do not and will not support MONEY or STRING. Use double/int64 for money, string types will match the logging service more.

All raw input points are mapped to distributions, with exception for CUMULATIVE metrics, where point values are increasing like 10, 14, 16, 17, 19, 22, etc. We still form them into a distribution of dynamic type (means will have values from 10 to 22), however, CUMULATIVE metrics are most used for ALIGN_DELTA or ALIGN_RATE types. Unfortunately, those two data points cannot be derived from the Distribution value. Example (integer points):

  • For 12:00:00, we got data point with a value of 30
  • For 12:01:00, we got data point with a value of 33
  • For 12:02:00, we got data point with a value of 36
  • For 12:03:00, we got data point with a value of 40

Distribution of Dynamic type, with perSeriesAligner ALIGN_SUMMARY for 12:01:00 (AP = 1 minute) will contain value 33 (mean = 33, count = 1 and so on). Integer with perSeriesAligner ALIGN_DELTA (AP = 1 minute) for 12:01:00 will contain value three because 33 - 30 from the previous point is 3. Double with perSeriesAligner ALIGN_RATE (AP = 1 minute) will contain 3.0 / 60.0 = 0.05 (seconds).

For AP = three minutes, timestamp 12:03:00, we will produce in streaming job:

  • Distribution (Dynamic, count = 3, mean = (33+36+40) / 3.0, etc.), perSeriesAligner: ALIGN_SUMMARY. Note it does not contain a value from the previous point (30).
  • Int64 with value 10 (because 40 - 30 is 10), perSeriesAligner: ALIGN_DELTA
  • Double with value 10.0 / 180.0 (delta divided by time range), perSeriesAligner: ALIGN_RATE

No matter how much you try, you won’t be able to get RATE/DELTA from Distribution, because we need value from the previous point. This is why we have special processing for CUMULATIVE metric kinds, and we produce extra values! This is also why, in file monitoring/ts_store/v4/store_querying.go we have code like:

case common.Aggregation_ALIGN_RATE, common.Aggregation_ALIGN_DELTA:
    if orGroup.wrappedMd.md.MetricKind == rmd.MetricDescriptor_DELTA {
        storeAligner = common.Aggregation_ALIGN_SUMMARY
    }

Those two align types are valid for CUMULATIVE and DELTA, but we switch to SUMMARY only for DELTA type.