With the data layout of the wide-column data store explained in the
previous wide-column store page, let’s talk about
the monitoring pipeline aspect of the SPEKTRA Edge monitoring system.
Unlike in Logging or Audit services, usage of WideColumn is not the only
specific trait of Monitoring TimeSerie resource.
When a client submits a TimeSerie object, the point values match those
declared in the metric descriptor. For example, if we have something like:
- name: ...type: ...displayName: ...metricKind: GAUGEvalueType: INT64# Other fields...
Then, given TimeSerie will have points from single writer (let’s assume
that it sends one point per 30 seconds):
However, unlike logs, querying will not return the same data points,
in fact, it is likely not possible at all, unless we enable raw storage
(unaligned). Request QueryTimeSeries typically requires an aggregation
field provides, with alignmentPeriod ranging from one minute to one day,
and perSeriesAligner equal to some supported value, like ALIGN_SUMMARY,
ALIGN_MEAN etc. For example, if we cuttle monitoring service like:
All points across one-minute intervals are merged into distributions.
Point at 12:07:00 contains all data points from 12:06:00.001 till
12:07:00.000.
Distribution type is much more descriptive than other types.
For example, if we queried from ALIGN_MEAN, then we would get
doubleValue instead of distributionValue, with mean values only.
We can specify more APs: three minutes, five minutes, 15 minutes,
…, 1 day. Each larger value is going to have more data points
merged.
If you check file monitoring/ts_store/v4/store_writing.go, you should
note that:
When we query, we are changing per series aligner from query to
other type in storage (function createWCQuery).
To summarize, the data that we query, and the data that the client
submits are not the same, and this document describes what is going on
in Monitoring service.
1 - Monitoring Pipeline Data Transformation
Understanding the SPEKTRA Edge monitoring pipeline data transformation.
Monitoring TimeSerie’s specialty is that all data is numerical, and
we should be able to query various stats like percentiles, means,
counts, and rates. Monitoring allows to merging of data points using
the aggregation param:
Within a single TimeSerie object, by specifying alignmnentPeriod
and perSeriesAligner.
Across multiple TimeSerie objects, by specifying crossSeriesReducer
with groupBy fields
Merging TimeSerie in either way is a costly operation, for this reason,
Monitoring is merging in the background, then saving merged values.
As of the moment of this writing, the following apply:
Raw TimeSerie points as submitted by clients are not being saved
at all in the time series storage. There is a setting in
MetricDescriptor which causes data to be saved, but nobody is
practically using it.
Raw TimeSerie points are accumulated and Monitoring computes
merged points for all supported alignment periods, ranging
from one minute to one day.
Monitoring saves aligned TimeSerie points in the background,
each column family gets different data for different APs.
When Monitoring merges TimeSerie points within TimeSerie, it
forms Distribution values. This is because things like mean value,
standard deviation, count, min, max, etc., all can be derived from
distribution. It makes no sense to have separate saves per various
ALIGN_* types, we can store ALIGN_SUMMARY. This is why when we
query for data, we switch to a different type. There is some exception
though, but we will describe it later in this document.
Distribution has many various supported types: Explicit, Dynamic,
Linear, and Exponential. The type used depends on the original
type indicated by MetricDescriptor.
As of now, monitoring does not merge TimeSeries across their keys
(pre-aggregation). Unlike merging within TimeSerie, where we know
all alignment periods, it is a bit more difficult, tasks for
pre-aggregating data are still not finished, but on the roadmap.
This is why, if you study file store_querying.go for TimeSeries,
you will see we do it entirely in RAM (we pass reducing mask,
then we use reducer module from monitoring/ts_reducer/v4).
When raw points are sent by the client, the following happens:
Server “saves” TimeSerie in the storage, but storage will quietly
ignore all the raw points - it will still allocate uid.Key based
on all the metric/resource types and labels forming uid.SKey, but
will not save points.
Server will publish TimeSeries to a special TimeSerie stream queue,
currently, we support Kafka or PubSub (GCP only). This queue is
the “raw metrics” one.
We have a special monitoring streaming pipeline, that reads raw
TimeSeries from the raw metrics queue and makes per-series accumulations.
This is a 24/7 background job. It produces accumulated values and sends
them back to the stream queue (on a different topic, server rollups).
Monitoring has a special module, rollup connector, that reads
accumulated points from the server rollups queue and writes to
TimeSeries storage. This module currently resides in Monitoring
DbController.
Now, regarding code pointers:
See file monitoring/server/v4/time_serie/time_serie_service.go.
After we “save” the time series, we are publishing it with function
publishRawTimeSeries.
See file monitoring/ts_streaming/v4/publisher.go, it is what
the server uses when publishing raw time series. In the config
for the API server, you can see: monitoring/config/apiserver.proto,
and option raw_metrics_publisher.
The streaming job is completely separate, it is even written in
a different language, Java. Currently, we use Flink or Google
Dataflow (Apache beam).
See the monitoring/cmd/monitoringdbcontroller/main.go file.
Apart from standard DbController modules (syncing and constraint
controllers, we have extra components). See runV4Connectors
function, which is currently leading. There, we create a rollup
subscription (variable rollupSubscription), which is where
the Monitoring streaming job writes to! Then we create
realTimeConnector, which continuously reads from the stream
and writes to time series storage. The real-time connector is
the crucial component here, file
monitoring/rollup_connector/connectors/v2/realtime_connector.go.
Apart from ordered steps, there is a monitoring controller, which
generates Phantom TimeSeries points (once per minute). This is also
a kind of raw metric, so the monitoring streaming job combines those
two data sources when generating the final time series. You can see
the code here: monitoring/controller/v4/pts_publisher/pts_publisher.go.
It generates once-per-minute phantom metrics (OnGeneratePhantomMetrics),
which on separate goroutines are published (function Run).
Depending on whether the SPEKTRA Edge cluster runs on GCP or somewhere else,
we use different technologies and the architecture is even a bit
different:
On GCP, we use PubSub for all queues, otherwise, we use Kafka.
In PubSub, we don’t have still message ordering, therefore,
messages can be delivered to the streaming job or rollup
connector out of the original order. Kafka is ordering messages
per partition, but we utilize this feature only for raw metrics
queue, we are using TimeSerie Key to compute the partition where
data will be sent. This way we guarantee that data for a single
Key will always be ordered. This guarantee is reinforced by
the fact that the Monitoring API Server requires all data to
arrive timestamp ordered, this is part of the TimeSeries spec!
For PubSub, we have two topics for raw metrics, one topic is
for regular raw time series, other topic is for phantom used
by the monitoring controller. For Kafka, we write all metrics
to one Kafka topic (simpler design).
In GCP, we use Google Dataflow, and the job is written in
the Apache beam framework. Code location in SPEKTRA Edge repository:
src/main/java/com/ntt/monitoring/beam/pipelines/MonitoringRealtimePipeliene.java.
In non-GCP, we use Apache Flink, code location in SPEKTRA Edge repo:
monitoring/streaming_jobs/flink-realtime/src/main/java/com/monitoring/pipelines/RealtimePipeline.java.
When the Streaming job publishes aligned TimeSeries to rollups
topic (Kafka or pub-sub), we are accumulating data in batches,
and send randomly to some partition.
In PubSub, messages are acknowledged individually, in Kafka we
acknowledge when all previous messages are acknowledged. This is
because in Kafka we have data ordering.
Both Kafka and PubSub queues work on at least once principle: We
guarantee data will be sent at least once, but if there is some failure,
it will be retried. If there was a failure when confirming successful
delivery, then the message will be delivered twice in fact. The streaming
job has no issues here: If it detects the same TimeSerie with the same Key
and same timestamp, then it will discard the duplicate. The Rollup
connector just saves a snapshot of data to TimeSeries storage. If it
writes twice, it will simply overwrite one value with the same value.
The minimal requirement for whole monitoring is AT LEAST ONCE principle.
This is why RollupConnector only acknowledges messages after successful
processing! Otherwise, we would risk losing some data.
Monitoring jobs (Flink or Dataflow) also operate on at least
one principle. They have deduplication mechanisms encoded,
so we don’t need to worry about raw metrics publishers’ side.
They are stateful jobs: Both Dataflow and Flink use persistent
storage to accumulate all states (per each TimeSerie Key). They are
“acking” messages from raw metrics topics only after processing them
(when they incorporate data points into their state). This is
happening from the “end” of the job (around rollups topic) to
the front (raw metrics topics): when the streaming job successfully
writes data to the rollups topic, then it sends backward a signal
about progress, which propagates through all the stages. Streaming
job “acks” raw metrics topic last! During this whole process state
data. In case of a crash, it will reprocess all data from the last
recovery point, we are guaranteed to not lose data thanks to
synchronized syncing. We may get duplicated points though in the rollup
connector, but as said, we are prepared for that.
In the streaming job, we are calculating the first points with
an alignment period of 1 minute:
We accumulate points for specified Key (phantom and real points)
If we got phantom points only, we will use phantom data.
If we got real, we will discard phantom points if present.
We try to output point with 1 minute AP soon after minute passes
if possible. For example, if we hit 12:06:00, then we will output
the point with timestamp 12:06:00 as soon as possible. However,
we need to wait a bit more time, there is a chance we will receive
let’s say (example) data point with timestamp 12:05:59 at 12:06:07.
Because of possible late arrivals, output for HH:MM:00 we will send
in range HH:MM:20-HH:MM:40.
If we get points AFTER we output the aligned data point, the streaming
job sends a new corrected value!
Further data points are accumulated further:
Using one minute of accumulated points, we can combine: three minute
and five minute points
Using five minute accumulated points, we can generate 15 minute point
Using 15 minute accumulated points, we can generate 30 minute point
Using 30 minute accumulated points, we can generate one hour point
Using one hour of accumulated points, we can generate a three hour point
Using three hour accumulated points, we can generate six hour point
Using six hour accumulated points, we can generate 12 hour point
Using 12 hour accumulated points, we can generate one day point
In case of correction (late arrivals), we will trigger a re-computation
of all “downstream” points.
Streaming job publishes usually Distribution points because those are
the most generic and can provide other types. Basically we have
the following mappings:
When MetricDescriptor has {valueType: INT64/DOUBLE, metricKind: GAUGE/DELTA}, then output points use DISTRIBUTION output values
and distribution type is Dynamic, with compression = 100.0,
PerSeriesAligner,ALIGN_SUMMARY.
When MetricDescriptor has {valueType: DISTRIBUTION, metricKind: GAUGE/DELTA}, then output points use Distribution output values,
and distribution type is the same as described in MetricDescriptor,
field distribution_bucket_options, PerSeriesAligner, ALIGN_SUMMARY.
When MetricDescriptor has {valueType: INT64/DOUBLE, metricKind: CUMULATIVE}, then we produce actually three output
points per each key and timestamp:
Distribution Dynamic type, with compression = 100.0,
PerSeriesAligner, ALIGN_SUMMARY.
Double type, PerSeriesAligner, ALIGN_RATE
Integer/Double type (depending on original valueType),
PerSeriesAligner, ALIGN_DELTA
MetricDescriptor cannot specify CUMULATIVE kind with DISTRIBUTION
value type.
We do not support BOOL value types (but we will, it is similar
to INT64). We do not and will not support MONEY or STRING.
Use double/int64 for money, string types will match the logging
service more.
All raw input points are mapped to distributions, with exception
for CUMULATIVE metrics, where point values are increasing like 10,
14, 16, 17, 19, 22, etc. We still form them into a distribution
of dynamic type (means will have values from 10 to 22), however,
CUMULATIVE metrics are most used for ALIGN_DELTA or ALIGN_RATE
types. Unfortunately, those two data points cannot be derived
from the Distribution value. Example (integer points):
For 12:00:00, we got data point with a value of 30
For 12:01:00, we got data point with a value of 33
For 12:02:00, we got data point with a value of 36
For 12:03:00, we got data point with a value of 40
Distribution of Dynamic type, with perSeriesAligner ALIGN_SUMMARY
for 12:01:00 (AP = 1 minute) will contain value 33 (mean = 33, count = 1
and so on). Integer with perSeriesAligner ALIGN_DELTA (AP = 1 minute)
for 12:01:00 will contain value three because 33 - 30 from the previous
point is 3. Double with perSeriesAligner ALIGN_RATE (AP = 1 minute)
will contain 3.0 / 60.0 = 0.05 (seconds).
For AP = three minutes, timestamp 12:03:00, we will produce in
streaming job:
Distribution (Dynamic, count = 3, mean = (33+36+40) / 3.0, etc.),
perSeriesAligner: ALIGN_SUMMARY. Note it does not contain a value
from the previous point (30).
Int64 with value 10 (because 40 - 30 is 10), perSeriesAligner:
ALIGN_DELTA
Double with value 10.0 / 180.0 (delta divided by time range),
perSeriesAligner: ALIGN_RATE
No matter how much you try, you won’t be able to get RATE/DELTA from
Distribution, because we need value from the previous point. This is
why we have special processing for CUMULATIVE metric kinds, and we
produce extra values! This is also why, in file
monitoring/ts_store/v4/store_querying.go we have code like:
Those two align types are valid for CUMULATIVE and DELTA, but we switch
to SUMMARY only for DELTA type.
2 - Monitoring Pipeline Streaming Jobs
Understanding the SPEKTRA Edge monitoring pipeline streaming jobs.
Flink and Dataflow
Let’s talke about the streaming jobs, one is implemented by Apache
Flink and the other by Google Dataflow.
They are significantly different in structure, in Dataflow, we have
multiple stages, with each AP having a different one. We also need to
handle messages arriving out of order, so we keep at least data for
the last hour. We also add random delays for points with AP larger
than one minute. This increases the chances of correction risks (we may
output a five minutes rollup with four minute data points only because
the random delay for five minutes was lower than one minute). We also
needed to spread outputs to make smooth CPU usage. This prevents
autoscaler from going crazy. In Flink, we have an entirely different
internal architecture: We know Kafka sends ordered data only. We have
support for more data types, like maps. Autoscaler also can average
CPU usage, so we don’t need various random delays across keys. This
allows for a much more efficient Flink job:
We know we can output data points when we get raw points from
the next window (for example, if we get data point with timestamp
12:00:45, we know we can output points for timestamp 12:00:00 and
APs one minute, three minutes, five minutes, 15 minutes, 30 minutes,
one hour, three hours, six hours, 12 hours). Data is ordered, so
we won’t get past data.
Less corrections, and smaller latencies for output points.
We keep a much smaller state because per Key we remember the last
data point only.
In the future, I plan to drop Dataflow entirely, and modify PubSub
(use ordered PubSub Lite, add order to regular PubSub, or move to
Kafka). This should make streaming jobs much more efficient.
Scalability
Monitoring API Servers and Monitoring Controllers are horizontally
scalable (and recommended vertically too), so new pods are deployed,
and the generation of phantom metrics, or submission of real-time
raw metrics can be divided. PubSub and Kafka can work with multiple
publishers for the same topic.
Both Dataflow and Flink are horizontally scalable (automatic).
Flink can be diagonally scalable too, if we add a bigger nodes
and larger parallelism per job.
Monitoring Db Controller can scale vertically (horizontally automatically).
When the new pod is added, then rollup TimeSeries are split across all
subscribers.
Kafka can horizontally scale up to partition count.
Job Recovery
In case of some disaster in the streaming job, data loss for
the last days, or corruption resulting from a bug, we have
additional components for recovery. How it works: All raw data
(phantom and real raw metrics) is not only going to streaming job.
It is also copied to special recovery storage with data retention
configured for the last full seven days at least. If there is a need
to “replay” historical streaming data, we can run an additional
Flink/Dataflow BATCH job (not streaming), that will get data from
recovery storage (instead of raw metrics topic) and write to separate
rollups recovery topic.
See the recovery storage module in
monitoring/ts_recovery_store/v4/provider/provider.go. This module
provides a recovery store, that subscribes also to raw metrics topics
and writes to recovery storage. We support two backends now, Azure Blob
Storage and Google Cloud Storage (depending on the backend, Azure or GCP).
We will need to add S3 storage for AWS or On-premise option.
Recovery storage is subscriber + writer. It is right now part of
the monitoring controller (See main.go for monitoring controller,
it creates and runs recovery store writer as separate goroutine). It
may move to db-controller though (I consider this may be a better place).
It works 24/7, writing raw TimeSeries to external storage with some
compression.
We have a separate rollup topic for recovered TimeSeries too, see
main.go of monitoring the db controller again. We are creating
an additional recovery connector apart from the real-time connector!
Implementation is in monitoring/connectors/v2/recovery_connector.go.
Normally, this separate rollup recovery topic is silent, no message
is being published, but the rollup connector is always listening anyway
(it is assumed as cheap enough).
Dataflow/Flink jobs however are not running normally, we need to
schedule them manually, for example with script in edgelq repo,
scripts/run-monitoring-gcp-recovery-pipeline.sh. We need to specify
the time range from which data will be loaded, the recovery job is
a batch type. It will run, compute all the data, and submit to
a separate recovery rollup topic. Once the job finishes, it is
deleted, this is the nature of recovery jobs here. As of now,
we have Apache Beam implementation only for these jobs: file
src/main/java/com/ntt/monitoring/beam/pipelines/MonitoringRecoveryPipeline.java.
In this recovery pipeline, remember to set the correct SHARDS_COUNT
and BLOB_DURATION. Those should be synchronized with settings in:
cuttle monitoring list recovery-store-sharding-infos
This also shows what periods are possible to recover. Value
BLOB_DURATION must be taken from spec.tsBlobPeriod
(PT1H is 3600 seconds), and value SHARDS_COUNT must be equal to
spec.shardsCount.
This script in SPEKTRA Edge is exclusively for GCP, and will eventually
be obsolete. For Flink, we have instructions in edgelq-deployment
repository, file README-Azure-Devops.md, and find the Monitoring recovery job part.