Monitoring Pipeline Streaming Jobs
Flink and Dataflow
Let’s talke about the streaming jobs, one is implemented by Apache Flink and the other by Google Dataflow.
They are significantly different in structure, in Dataflow, we have multiple stages, with each AP having a different one. We also need to handle messages arriving out of order, so we keep at least data for the last hour. We also add random delays for points with AP larger than one minute. This increases the chances of correction risks (we may output a five minutes rollup with four minute data points only because the random delay for five minutes was lower than one minute). We also needed to spread outputs to make smooth CPU usage. This prevents autoscaler from going crazy. In Flink, we have an entirely different internal architecture: We know Kafka sends ordered data only. We have support for more data types, like maps. Autoscaler also can average CPU usage, so we don’t need various random delays across keys. This allows for a much more efficient Flink job:
- We know we can output data points when we get raw points from the next window (for example, if we get data point with timestamp 12:00:45, we know we can output points for timestamp 12:00:00 and APs one minute, three minutes, five minutes, 15 minutes, 30 minutes, one hour, three hours, six hours, 12 hours). Data is ordered, so we won’t get past data.
- Less corrections, and smaller latencies for output points.
- We keep a much smaller state because per Key we remember the last data point only.
In the future, I plan to drop Dataflow entirely, and modify PubSub (use ordered PubSub Lite, add order to regular PubSub, or move to Kafka). This should make streaming jobs much more efficient.
Scalability
Monitoring API Servers and Monitoring Controllers are horizontally scalable (and recommended vertically too), so new pods are deployed, and the generation of phantom metrics, or submission of real-time raw metrics can be divided. PubSub and Kafka can work with multiple publishers for the same topic.
Both Dataflow and Flink are horizontally scalable (automatic). Flink can be diagonally scalable too, if we add a bigger nodes and larger parallelism per job.
Monitoring Db Controller can scale vertically (horizontally automatically). When the new pod is added, then rollup TimeSeries are split across all subscribers.
Kafka can horizontally scale up to partition count.
Job Recovery
In case of some disaster in the streaming job, data loss for the last days, or corruption resulting from a bug, we have additional components for recovery. How it works: All raw data (phantom and real raw metrics) is not only going to streaming job. It is also copied to special recovery storage with data retention configured for the last full seven days at least. If there is a need to “replay” historical streaming data, we can run an additional Flink/Dataflow BATCH job (not streaming), that will get data from recovery storage (instead of raw metrics topic) and write to separate rollups recovery topic.
See the recovery storage module in
monitoring/ts_recovery_store/v4/provider/provider.go
. This module
provides a recovery store, that subscribes also to raw metrics topics
and writes to recovery storage. We support two backends now, Azure Blob
Storage and Google Cloud Storage (depending on the backend, Azure or GCP).
We will need to add S3 storage for AWS or On-premise option.
Recovery storage is subscriber + writer. It is right now part of
the monitoring controller (See main.go
for monitoring controller,
it creates and runs recovery store writer as separate goroutine). It
may move to db-controller though (I consider this may be a better place).
It works 24/7, writing raw TimeSeries to external storage with some
compression.
We have a separate rollup topic for recovered TimeSeries too, see
main.go
of monitoring the db controller again. We are creating
an additional recovery connector apart from the real-time connector!
Implementation is in monitoring/connectors/v2/recovery_connector.go
.
Normally, this separate rollup recovery topic is silent, no message is being published, but the rollup connector is always listening anyway (it is assumed as cheap enough).
Dataflow/Flink jobs however are not running normally, we need to
schedule them manually, for example with script in edgelq repo,
scripts/run-monitoring-gcp-recovery-pipeline.sh
. We need to specify
the time range from which data will be loaded, the recovery job is
a batch type. It will run, compute all the data, and submit to
a separate recovery rollup topic. Once the job finishes, it is
deleted, this is the nature of recovery jobs here. As of now,
we have Apache Beam implementation only for these jobs: file
src/main/java/com/ntt/monitoring/beam/pipelines/MonitoringRecoveryPipeline.java
.
In this recovery pipeline, remember to set the correct SHARDS_COUNT
and BLOB_DURATION
. Those should be synchronized with settings in:
cuttle monitoring list recovery-store-sharding-infos
This also shows what periods are possible to recover. Value
BLOB_DURATION
must be taken from spec.tsBlobPeriod
(PT1H is 3600 seconds), and value SHARDS_COUNT
must be equal to
spec.shardsCount
.
This script in SPEKTRA Edge is exclusively for GCP, and will eventually
be obsolete. For Flink, we have instructions in edgelq-deployment
repository, file README-Azure-Devops.md, and find the Monitoring recovery job
part.