Monitoring Pipeline Data Transformation
Monitoring TimeSerie’s specialty is that all data is numerical, and
we should be able to query various stats like percentiles, means,
counts, and rates. Monitoring allows to merging of data points using
the aggregation
param:
- Within a single TimeSerie object, by specifying alignmnentPeriod and perSeriesAligner.
- Across multiple TimeSerie objects, by specifying crossSeriesReducer with groupBy fields
Merging TimeSerie in either way is a costly operation, for this reason, Monitoring is merging in the background, then saving merged values. As of the moment of this writing, the following apply:
- Raw TimeSerie points as submitted by clients are not being saved at all in the time series storage. There is a setting in MetricDescriptor which causes data to be saved, but nobody is practically using it.
- Raw TimeSerie points are accumulated and Monitoring computes merged points for all supported alignment periods, ranging from one minute to one day.
- Monitoring saves aligned TimeSerie points in the background, each column family gets different data for different APs.
- When Monitoring merges TimeSerie points within TimeSerie, it
forms Distribution values. This is because things like mean value,
standard deviation, count, min, max, etc., all can be derived from
distribution. It makes no sense to have separate saves per various
ALIGN_*
types, we can storeALIGN_SUMMARY
. This is why when we query for data, we switch to a different type. There is some exception though, but we will describe it later in this document. - Distribution has many various supported types: Explicit, Dynamic, Linear, and Exponential. The type used depends on the original type indicated by MetricDescriptor.
- As of now, monitoring does not merge TimeSeries across their keys
(pre-aggregation). Unlike merging within TimeSerie, where we know
all alignment periods, it is a bit more difficult, tasks for
pre-aggregating data are still not finished, but on the roadmap.
This is why, if you study file
store_querying.go
for TimeSeries, you will see we do it entirely in RAM (we pass reducing mask, then we use reducer module frommonitoring/ts_reducer/v4
).
When raw points are sent by the client, the following happens:
- Server “saves” TimeSerie in the storage, but storage will quietly
ignore all the raw points - it will still allocate
uid.Key
based on all the metric/resource types and labels forminguid.SKey
, but will not save points. - Server will publish TimeSeries to a special TimeSerie stream queue, currently, we support Kafka or PubSub (GCP only). This queue is the “raw metrics” one.
- We have a special monitoring streaming pipeline, that reads raw TimeSeries from the raw metrics queue and makes per-series accumulations. This is a 24/7 background job. It produces accumulated values and sends them back to the stream queue (on a different topic, server rollups).
- Monitoring has a special module, rollup connector, that reads accumulated points from the server rollups queue and writes to TimeSeries storage. This module currently resides in Monitoring DbController.
Now, regarding code pointers:
- See file
monitoring/server/v4/time_serie/time_serie_service.go
. After we “save” the time series, we are publishing it with functionpublishRawTimeSeries
. - See file
monitoring/ts_streaming/v4/publisher.go
, it is what the server uses when publishing raw time series. In the config for the API server, you can see:monitoring/config/apiserver.proto
, and optionraw_metrics_publisher
. - The streaming job is completely separate, it is even written in a different language, Java. Currently, we use Flink or Google Dataflow (Apache beam).
- See the
monitoring/cmd/monitoringdbcontroller/main.go
file. Apart from standard DbController modules (syncing and constraint controllers, we have extra components). SeerunV4Connectors
function, which is currently leading. There, we create a rollup subscription (variablerollupSubscription
), which is where the Monitoring streaming job writes to! Then we createrealTimeConnector
, which continuously reads from the stream and writes to time series storage. The real-time connector is the crucial component here, filemonitoring/rollup_connector/connectors/v2/realtime_connector.go
. - Subscriber interface:
monitoring/ts_streaming/v4/subscriber.go
.
Here is the simplified pipeline flow:

Apart from ordered steps, there is a monitoring controller, which
generates Phantom TimeSeries points (once per minute). This is also
a kind of raw metric, so the monitoring streaming job combines those
two data sources when generating the final time series. You can see
the code here: monitoring/controller/v4/pts_publisher/pts_publisher.go
.
It generates once-per-minute phantom metrics (OnGeneratePhantomMetrics
),
which on separate goroutines are published (function Run
).
Depending on whether the SPEKTRA Edge cluster runs on GCP or somewhere else, we use different technologies and the architecture is even a bit different:
- On GCP, we use PubSub for all queues, otherwise, we use Kafka.
- In PubSub, we don’t have still message ordering, therefore, messages can be delivered to the streaming job or rollup connector out of the original order. Kafka is ordering messages per partition, but we utilize this feature only for raw metrics queue, we are using TimeSerie Key to compute the partition where data will be sent. This way we guarantee that data for a single Key will always be ordered. This guarantee is reinforced by the fact that the Monitoring API Server requires all data to arrive timestamp ordered, this is part of the TimeSeries spec!
- For PubSub, we have two topics for raw metrics, one topic is for regular raw time series, other topic is for phantom used by the monitoring controller. For Kafka, we write all metrics to one Kafka topic (simpler design).
- In GCP, we use Google Dataflow, and the job is written in
the Apache beam framework. Code location in SPEKTRA Edge repository:
src/main/java/com/ntt/monitoring/beam/pipelines/MonitoringRealtimePipeliene.java
. - In non-GCP, we use Apache Flink, code location in SPEKTRA Edge repo:
monitoring/streaming_jobs/flink-realtime/src/main/java/com/monitoring/pipelines/RealtimePipeline.java
. - When the Streaming job publishes aligned TimeSeries to rollups topic (Kafka or pub-sub), we are accumulating data in batches, and send randomly to some partition.
- In PubSub, messages are acknowledged individually, in Kafka we acknowledge when all previous messages are acknowledged. This is because in Kafka we have data ordering.
Both Kafka and PubSub queues work on at least once principle: We guarantee data will be sent at least once, but if there is some failure, it will be retried. If there was a failure when confirming successful delivery, then the message will be delivered twice in fact. The streaming job has no issues here: If it detects the same TimeSerie with the same Key and same timestamp, then it will discard the duplicate. The Rollup connector just saves a snapshot of data to TimeSeries storage. If it writes twice, it will simply overwrite one value with the same value. The minimal requirement for whole monitoring is AT LEAST ONCE principle. This is why RollupConnector only acknowledges messages after successful processing! Otherwise, we would risk losing some data.
Monitoring jobs (Flink or Dataflow) also operate on at least one principle. They have deduplication mechanisms encoded, so we don’t need to worry about raw metrics publishers’ side. They are stateful jobs: Both Dataflow and Flink use persistent storage to accumulate all states (per each TimeSerie Key). They are “acking” messages from raw metrics topics only after processing them (when they incorporate data points into their state). This is happening from the “end” of the job (around rollups topic) to the front (raw metrics topics): when the streaming job successfully writes data to the rollups topic, then it sends backward a signal about progress, which propagates through all the stages. Streaming job “acks” raw metrics topic last! During this whole process state data. In case of a crash, it will reprocess all data from the last recovery point, we are guaranteed to not lose data thanks to synchronized syncing. We may get duplicated points though in the rollup connector, but as said, we are prepared for that.
In the streaming job, we are calculating the first points with an alignment period of 1 minute:
- We accumulate points for specified Key (phantom and real points)
- If we got phantom points only, we will use phantom data.
- If we got real, we will discard phantom points if present.
- We try to output point with 1 minute AP soon after minute passes if possible. For example, if we hit 12:06:00, then we will output the point with timestamp 12:06:00 as soon as possible. However, we need to wait a bit more time, there is a chance we will receive let’s say (example) data point with timestamp 12:05:59 at 12:06:07.
- Because of possible late arrivals, output for HH:MM:00 we will send in range HH:MM:20-HH:MM:40.
- If we get points AFTER we output the aligned data point, the streaming job sends a new corrected value!
Further data points are accumulated further:
- Using one minute of accumulated points, we can combine: three minute and five minute points
- Using five minute accumulated points, we can generate 15 minute point
- Using 15 minute accumulated points, we can generate 30 minute point
- Using 30 minute accumulated points, we can generate one hour point
- Using one hour of accumulated points, we can generate a three hour point
- Using three hour accumulated points, we can generate six hour point
- Using six hour accumulated points, we can generate 12 hour point
- Using 12 hour accumulated points, we can generate one day point
- In case of correction (late arrivals), we will trigger a re-computation of all “downstream” points.
Streaming job publishes usually Distribution points because those are the most generic and can provide other types. Basically we have the following mappings:
- When MetricDescriptor has
{valueType: INT64/DOUBLE, metricKind: GAUGE/DELTA}
, then output points useDISTRIBUTION
output values and distribution type isDynamic
, with compression = 100.0, PerSeriesAligner,ALIGN_SUMMARY
. - When MetricDescriptor has
{valueType: DISTRIBUTION, metricKind: GAUGE/DELTA}
, then output points useDistribution
output values, and distribution type is the same as described in MetricDescriptor, fielddistribution_bucket_options
, PerSeriesAligner,ALIGN_SUMMARY
. - When MetricDescriptor has
{valueType: INT64/DOUBLE, metricKind: CUMULATIVE}
, then we produce actually three output points per each key and timestamp:- Distribution Dynamic type, with compression = 100.0,
PerSeriesAligner,
ALIGN_SUMMARY
. - Double type, PerSeriesAligner,
ALIGN_RATE
- Distribution Dynamic type, with compression = 100.0,
PerSeriesAligner,
- Integer/Double type (depending on original valueType),
PerSeriesAligner,
ALIGN_DELTA
- MetricDescriptor cannot specify CUMULATIVE kind with DISTRIBUTION value type.
- We do not support BOOL value types (but we will, it is similar to INT64). We do not and will not support MONEY or STRING. Use double/int64 for money, string types will match the logging service more.
All raw input points are mapped to distributions, with exception
for CUMULATIVE metrics, where point values are increasing like 10,
14, 16, 17, 19, 22, etc. We still form them into a distribution
of dynamic type (means will have values from 10 to 22), however,
CUMULATIVE metrics are most used for ALIGN_DELTA
or ALIGN_RATE
types. Unfortunately, those two data points cannot be derived
from the Distribution value. Example (integer points):
- For 12:00:00, we got data point with a value of 30
- For 12:01:00, we got data point with a value of 33
- For 12:02:00, we got data point with a value of 36
- For 12:03:00, we got data point with a value of 40
Distribution of Dynamic type, with perSeriesAligner ALIGN_SUMMARY
for 12:01:00 (AP = 1 minute) will contain value 33 (mean = 33, count = 1
and so on). Integer with perSeriesAligner ALIGN_DELTA
(AP = 1 minute)
for 12:01:00 will contain value three because 33 - 30 from the previous
point is 3. Double with perSeriesAligner ALIGN_RATE
(AP = 1 minute)
will contain 3.0 / 60.0 = 0.05 (seconds).
For AP = three minutes, timestamp 12:03:00, we will produce in streaming job:
- Distribution (Dynamic, count = 3, mean = (33+36+40) / 3.0, etc.),
perSeriesAligner:
ALIGN_SUMMARY
. Note it does not contain a value from the previous point (30). - Int64 with value 10 (because 40 - 30 is 10), perSeriesAligner:
ALIGN_DELTA
- Double with value 10.0 / 180.0 (delta divided by time range),
perSeriesAligner:
ALIGN_RATE
No matter how much you try, you won’t be able to get RATE/DELTA from
Distribution, because we need value from the previous point. This is
why we have special processing for CUMULATIVE metric kinds, and we
produce extra values! This is also why, in file
monitoring/ts_store/v4/store_querying.go
we have code like:
case common.Aggregation_ALIGN_RATE, common.Aggregation_ALIGN_DELTA:
if orGroup.wrappedMd.md.MetricKind == rmd.MetricDescriptor_DELTA {
storeAligner = common.Aggregation_ALIGN_SUMMARY
}
Those two align types are valid for CUMULATIVE and DELTA, but we switch to SUMMARY only for DELTA type.