This is the multi-page printable view of this section. Click here to print.
Platform Developer Guide
- 1: SPEKTRA Edge IAM Service Design
- 1.1: SPEKTRA Edge IAM Principals
- 1.2: SPEKTRA Edge IAM Authentication
- 1.3: SPEKTRA Edge IAM Authorization
- 1.4: SPEKTRA Edge IAM Cache Invalidation
- 1.5: SPEKTRA Edge Multi-Service Authorization
- 1.6: SPEKTRA Edge E-mail Sender
- 1.7: SPEKTRA Edge Multi-Service Environment Safety
- 1.8: SPEKTRA Edge IAM Principal Tracking
- 1.9: SPEKTRA Edge Principal Service Access
- 2: SPEKTRA Edge Limits Service Design
- 2.1: Service Limit Initialization
- 2.2: Project and Organization Limit Initialization
- 2.3: AcceptedPlan Update Process
- 2.4: Resource Limit Assignment
- 2.5: Resource Limit Tracking
- 2.6: Project and Organization Deletion Process
- 3: SPEKTRA Edge Wide-Column Store Usage
- 3.1: Wide-Column Data Models
- 3.2: SPEKTRA Edge Wide-Column Versions
- 3.3: SPEKTRA Edge Wide-Column Usage Example
- 3.4: SPEKTRA Edge Wide-Column Annotations
- 3.5: SPEKTRA Edge Monitoring Time Series
- 3.6: SPEKTRA Edge ActivityLogs and ResourceChangeLogs
- 3.7: SPEKTRA Edge Logging Store
- 4: SPEKTRA Edge Monitoring Pipeline Design
1 - SPEKTRA Edge IAM Service Design
Service iam.edgelq.com plays one of the central parts of the SPEKTRA Edge platform, in charge of authentication and authorization. It enables multi-tenant and multi-service environments. By default, Goten does not come with authentication and authorization feature, only IAM fills this hole and allows services to work with each other without trust.
IAM Concepts, Actors, Role management, and Tenant management should be
already known from the user guide, customizations, and basic usage of
Authenticator, Authorizer, and Authorization middleware, from the
developer guide. This document dives more into details about what is
happening in the unique components provided by the IAM service. It is
assumed reader can understand the general and standard structure of
the IAM codebase at this point, located in the directory iam
in
the SPEKTRA Edge repository.
We will focus here not so much on IAM service but in big part on what IAM provides. Authenticator and Authorizer are modules provided by IAM but are linked in during each server compilation. Therefore, each API server of any backend service has these in its runtime.
1.1 - SPEKTRA Edge IAM Principals
In IAM, we identify two types of principals that can be uniquely identified and authenticated:
- Users
- ServiceAccounts
They have very different authentication methods. First, IAM does not
manage users that much. The third party is responsible for actual Users'
management. When a user sends a request to a service, it provides
an authorization token with some claims. API servers must use the jwks
endpoint, which provides a json web key set
, to verify the signature
of the access token. Verification ensures we can trust the claims stored
in the token. Inside claims, we have more details like User unique
identifier, which we can use to extract User resource from IAM.
As of now, we use Auth0 3rd party service for users. It is creating and signing access tokens that SPEKTRA Edge receives. They are giving us a jwks endpoint from which we get public keys for verification. Token signing, rotation, and user list management are all handled by Auth0, although IAM has several methods where that connect to Auth0 for management purposes.
When a user joins the system, it is not an IAM that is notified first.
The request goes to Auth0 where data is created. Record in IAM is created
later on when the user starts interacting with SPEKTRA Edge. User resources
may get created in IAM during the first Authentication. It may also be
saved/updated when it gets RefreshUserFromIdToken
.
On the other side, ServiceAccounts are typically managed by SPEKTRA Edge, or by any other entity that creates ServiceAccounts in IAM service. How it is done:
-
ServiceAccount is created by some clients. Without ServiceAccountKey though, it is not usable.
-
ServiceAccountKey is created by a client. Regarding public-private key, there are 2 options:
-
Client generates both private and public key pair. It sends CreateServiceAccountKey with a public key only, so IAM never sees the private key. The client is fully responsible for securing it. It is a recommended procedure.
-
It is possible to ask IAM to create a public-private key pair during ServiceAccountKey creation. In this case, IAM saves only the public key in the database, private key is returned in response. In this case, still client is still fully responsible for securing it. This method allows to skip generation only.
-
In summary, ServiceAccounts are the responsibility of the clients, who need to secure private keys. Those private keys are then later used to create access tokens. During authentication, the backend service will grab the public key from IAM.
1.2 - SPEKTRA Edge IAM Authentication
The module for authentication is in the SPEKTRA Edge repository, file
iam/auth/authenticator.go
. It also provides an Authentication
function (grpc_auth.AuthFunc
) that is passed to the grpc server.
If you see any main.go
of API Server runtime, you should find code
like:
grpcserver.NewGrpcServer(
authenticator.AuthFunc(),
commonCfg.GetGrpcServer(),
log,
)
This function is used by grpc interceptors, so authentication is done
outside any server middleware. As a result, the context object associated
with the request will contain the AuthToken
object, as defined in
iam/auth/types/auth_token.go
, before any server processes the call.
Authenticator Tasks
Authenticator uses HTTP headers to find out who is making a call.
The primary header in use is authorization
. It should contain
the Bearer <AccessToken>
value, and we want this token part.
For now, ignore x-goten-original-auth
and additional access tokens,
which will be described in distributed authorization process
section.
Usually, we expect just a single token in authorization
, based on which
authentication happens.
Authenticator delegates access token identification to a module called
AuthInfoProvider
. It returns the Principal
object, as defined in
iam/auth/types/principal.go
. Under the hood, it can be ServiceAccount,
User, or Anonymous.
We will dive into it in the AuthInfoProvider for authentication section below, but for now let’s touch another important topic regarding authentication.
When AuthInfoProvider returns principal data, Authenticator also needs
to validate all claims are as expected, see addKeySpecificClaims
.
Of these claims, the most important part is the audience. We need to
protect against cases, where someone gets an access token and tries
to pose as a given user in some other service. For this reason, look at
the expected audience for User:
claims.Audience = []string{a.cfg.AccessTokenAudience}
This AccessTokenAudience is equal to the audience specific to SPEKTRA Edge:
https://apis.edgelq.com
for example, configured for some of our services.
If this expected audience does not match what is actually in the claims,
it may be because someone got this token and uses this in different
services. It’s like us: we have access tokens from users, so if we knew
where else the user has an account, we could try to log in somewhere else.
To prevent issues, we check the audience. However, note that audience
is one global value for all SPEKTRA Edge platforms. So, one service on
SPEKTRA Edge can then connect to another service in SPEKTRA Edge and use
this access token, successfully posing as a user. As long as services on
SPEKTRA Edge can trust each other, it is not an issue. For untrusted third
party services, it may be a problem: if a user sends a request to them, they
potentially can take it and use it in other SPEKTRA Edge services. In the
future, we may provide additional claims, but it means that the user will
need to ask jwks provider for an access token for a specific SPEKTRA Edge
service, and perhaps for each of them, or groups of them.
In API Server config, see Authenticator settings, field accessTokenAudience
.
A bit easier situation is with ServiceAccounts, when they send a request to service, the audience contains the endpoint of the specific service they call. Therefore, if they send requests to devices, devices won’t be able to send requests to let’s say applications. The problem may be API keys, which are global for the whole SPEKTRA Edge, but it’s the user’s choice to use this method, which was insisted as less “complicated”. It should be fine if this API key is used strictly anyway.
In API Server config, see Authenticator settings, field
serviceAccountIdTokenAudiencePrefixes
. This is a list of prefixes
from which the audience can start.
AuthInfoProvider for Authentication
AuthInfoProvider is a common module for both authenticator and authorizer.
You can see it in the SPEKTRA Edge repo, file iam/auth/auth_info_provider.go
.
For authenticator, only one method counts: It is GetPrincipal.
Inside GetPrincipal
of AuthInfoProvider
we still don’t get the full
principal. The reason is, that getting principal is a tricky thing: if
AuthInfoProvider is running on the IAM Server, then it may use a local
database. If it is part of a different server, then it will need to ask
the IAM Server to give principal data. Since it can’t fully get principal,
it does what it can:
- First, we check the ID of the key from the authorization token.
- If the ID is equal to the one ServiceAccountKey ID Server instance uses it means that it is requesting itself. Perhaps it is a controller trying to connect to the Server instance. If this is the case, we just return “us”. This is a helpful trick when a service is bootstrapping for the first time. API Server may not be listening on the port, or the database may have missing records.
- Mostly, however, AuthInfoProvider is one giant cache object and this includes storage of principals. Caching principals locally, with some long-term cache, significantly lowers pressure on IAM and reduces latencies.
AuthInfoProvider uses the PrincipalProvider
interface to get actual
instances. There are two providers:
- LocalPrincipalProvider in
iam/auth/internal/local_principal_provider.go
- RemotePrincipalProvider in
iam/auth/internal/remote_principal_provider.go
Local providers must be used only by IAM Servers, others must use
the remote option. Let’s start with the remote. If you check
GetPrincipal
of RemotePrincipalProvider
, you can see that it
just connects to the IAM service, and uses GetPrincipal
method, which
is defined in the API skeleton file. for ServiceAccount type, it however
first needs to fetch a project resource, to figure out in which regions
ServiceAccountKey is available.
It is worth mentioning, that services are not supposed to trust each other, this also means IAM does not necessarily trust services requesting access to user information, even if they have a token. Perhaps access to the authorization token should be enough for IAM to return user information, but in GetPrincipalRequest we also require information which service is asking. IAM will validate if this service is allowed to see the given principal.
You should jump into different parts of the code, see the GetPrincipal
implementation in file iam/server/v1/authorization/authorization_service.go
.
The file name may be a bit misleading, but this service has actions used
for both authentication and authorization, it may be worth moving to
a different API group, and making a deprecation of the current action
declaration. But it’s a side note for me to correct, generally.
Implementation of GetPrincipal will stay, so you should see what happens under the hood.
In IAM Server, GetPrincipal uses PrincipalProvider
that it gets from
AuthInfoProvider! Therefore, AuthInfoProvider on a different server than
IAM will try to use cache - in case of a miss, it will ask the remote
PrincipalProvider. RemotePrincipalProvider will send GetPrincipalRequest
to IAM, which then checks LocalPrincipalProvider, so we will land in
LocalPrincipalProvider anyway.
Before jumping into LocalPrincipalProvider see the rest of the GetPrincipal
server implementation. Inside, we are checking User or ServiceAccount data,
and iterate over the metadata.services.allowed_services
slice. If it
contains the service provided in GetPrincipalRequest, it means that this
service is allowed to see the given principal, so we can just return it
safely. This field is automatically updated when User/ServiceAccount gets
access to a new service (or has access revoked). We work in this principle:
If a User/ServiceAccount is a participant in a service, they must be able
to see each other.
Now, you can jump into the GetPrincipal code for LocalPrincipalProvider
.
It has separate paths for users and service accounts, but generally, it is
similar. We are getting User or ServiceAccount from the local database
if possible (not all regions may have ServiceAccount). If we need to make
any “Save” (users!), it has to be on the primary region, because this is
where users are supposed to be saved.
Distributed Authorization Process
Imagine User X asked devices.edgelq.com to assign ServiceAccount to
some Device. It sends a request to the Devices service, with an
authorization
token containing a User X access token. Devices will
successfully authenticate and authorize the user. However, ServiceAccount
belongs to IAM, therefore devices.edgelq.com will need to ask
iam.edgelq.com to provide ServiceAccount. When devices.edgelq.com
sends a request to iam.edgelq.com, the header authorization
will not
have the access token of the user. It will have an access token of
ServiceAccount that is used by devices.edgelq.com. This will be
always true, the authorization
token must contain the token of the entity
sending the current request. However, devices.edgelq.com may store
original access token with x-goten-original-auth
header. It is an array
of tokens. In theory authorization
token also may have many, but it does
not work on all Ingresses.
In EnvRegistry we have Dial*
methods with and without the FCtx
suffix.
Those with suffixes copy and paste all HTTP headers with the x-
prefix.
They also copy authorization
into x-goten-original-auth
. If the latter
is already present, it will be appended. The current authorization
is
cleared and space for new is added.
It is up to the service to decide if they want to forward HTTP headers or not. There is some work needed from EnvRegistry though, caller should be able to customize what and how headers are passed from the current context to the next, but for current needs it is sufficient.
Authorization in this context has issues with audience claim though, when we forward authorization tokens to different service entirely, the audience may not be the one we expect.
By default, we use just Dial without FCtx. We have two known cases where it is used:
-
MultiRegion routing
It is when requests need to be provided to other regions or split across many regions.
-
When Constraint Store sends EstablishReferences to another service
This is because we have references in saved resources to other services. The problem here is that we assume Service may not be allowed to establish references (lack of attach checks). The user may have attach permissions though, so we send two authorization tokens.
Of the two cases above, Authorization and Audience validation work well for the first one, because we forward within service. EstablishReferences is a more difficult topic, we will need probably to ensure that the Service has always attach permissions, without relying on the user. We will need however to refactor attach permissions, so there is just one per resource type. With this, we need to fix conditions, so they can apply to attach checks. Right now they simply don’t work.
1.3 - SPEKTRA Edge IAM Authorization
Authorization happens in its dedicated server middleware, see any generated one, like https://github.com/cloudwan/edgelq/blob/main/devices/server/v1/device/device_service.pb.middleware.authorization.go.
As Authorization middleware is assumed to be after multi-region routing, we can assume that IAM Service from local region holds all resources required to execute authorization locally, specifically: RoleBindings, Roles, Conditions.
Note that IAM itself does not execute any Authorization, Middleware is
generated for each service. We have an Authorizer
a module that is
compiled with all API servers for all services. What IAM provides is
a list of Roles, RoleBindings, and Conditions. Other services are
allowed to get them, but evaluation happens on the proper server side.
The authorizer rarely needs to ask IAM for any data, if possible, it is I/O less. It relies on the RAM cache to store IAM resources internally. Therefore, checks are evaluated typically fast. More problematic are resource field conditions. If we have them, we will need to get current resources from database. For attach permissions, we may need to fetch them from other services.
Authorization middleware is generated per each method, but the pattern is always the same:
- We create BulkPermissionCheck object, where we collect all permissions
we want to check for this action. It is defined in the
iam/auth/types/bulk_permission_check.go
file. - Authorizer module, defined in
iam/auth/authorizer.go
file, checks if passed BulkPermissionCheck is all good and authenticated user is authorized for asked permissions. Some checks may be optional, like read checks for specific fields.
When we collect permissions for BulkPermissionCheck, we add:
- Main permission for a method. Resource name (or parent) is taken from
the request object, as indicated by
requestPaths
in the specification, or customized via proto file. - If we have some writing request (like Create or Update), and we are setting references to other resources, we need to add attach permission checks. Resource names are taken from referenced objects, not referencing the resource the user tries to write to.
- Optional read/set permissions if some resource fields are restricted. For authorization object strings we pass either collection names or specific resources.
Every permission must be accompanied by some resource or collection name (parent). Refer to the IAM user specification. In this document, we map specifications to code and explain details.
Within the Authorizer module, defined in the iam/auth/authorizer.go
,
we are splitting all checks by main IAM scopes it recognizes: Service,
Organization, Project, or System. Next, we delegate permission checks
to AuthInfoProvider
. It generates a list of PermissionGrantResult
relevant for all PermissionCheck
instances. The relationship between
these two types is many-to-many. A single Grant (assigned via RoleBinding)
can hold multiple permissions, and a user may have many RoleBindings, each
with different Grants: More than one Grant may be giving access to
the same permission.
If AuthInfoProvider notices that some PermissionCheck has unconditional PermissionGrantResult, it skips the rest. However, if there are conditions attached, there is a possibility that some will fail while others succeed. It makes a reason why we need multiple PermissionGrantResult per single PermissionCheck, if at least one is successful, then PermissionCheck passes. It works like an OR operator. Conditions in a single PermissionGrantResult must be evaluated positively.
Therefore, once AuthInfoProvider matches PermissionGrantResult
instances
with PermissionCheck
ones, we must evaluate conditions (if any). One
popular condition type we use is ResourceFieldCondition
. To evaluate
this kind, we fetch resources from the local database, other services,
and other regions. To facilitate this check as much as possible,
the authorizer iterates through all possible conditions and collects
all resources it needs to fetch. It fetches in bulk, connecting to
other services if necessary (attach permissions cases). For this reason,
we put a reference field to the PermissionCheck
object, it will contain
resolved resources, so all conditions may have easy access to it in case
they need it. If the service receives a PermissionDenied error when
checking other services, then PermissionDenied is forwarded to the user
with information that the service cannot see resources itself. It may
indicate an issue with missing the metadata.services.allowed_services
field.
On their own, conditions are simple, they execute fast, without any I/O work. We just check requests/resolved resources and verify whether specified conditions apply, according to IAM Role Grant conditions.
AuthInfoProvider for Authorizer
AuthInfoProvider gets only a set of checks grouped by IAM Scope
(A project, an organization, a service, or a system if none of
the before). As per IAM specification, the service scope inherits
all RoleBindings from the project that owns the service. If we need
to validate permissions in the project scope, we must also accept
RoleBindings from the parent organization (if set), and full ancestry
path. RoleBindings in system scope are valid in all scopes. Moreover,
even the principal may have multiple member IDs (native one with email,
then domain, then allAuthenticatedUsers, allUsers). This creates
lots of potential RoleBindings to check. Furthermore, we should be
aware that Authorizer is part of all API servers! As SPEKTRA Edge provides
a framework for building 3rd party services, they can’t trust each
other. Therefore, AuthInfoProvider of any service it runs on can only
ask for RoleBindings that it is allowed to see (according to
metadata.services.allowed_services
).
IAM Controller is copying organization-level RoleBindings to child
sub-organizations and projects, but we don’t copy (at least yet)
RoleBindings from service project to a service. We also don’t copy
system-level RoleBindings to all existing projects and organizations.
It should typically stay that way, because system-level role bindings
are rather internal, and should not leak to organization/project admins.
The module for copying RoleBindings is in file
iam/controller/v1/iam_scope/org_rbs_copier.go
. It also handles changes
in the parent organization field.
During authorization, AuthInfoProvider must list and fetch all
RoleBindings per each memberId/IAM Scope combination. It must
also only fetch role bindings relevant to the current service.
We first try to get from the local cache, in case of a miss,
we ask IAM. This is why in CheckPermissions
we grab all possible
RoleBindings. We filter out RoleBindings by subScope or role ID
later on. We try to strip all unnecessary fields, to ensure
AuthInfoProvider can hold (RAM-based cache!) as much data as possible.
Additionally, we try to use integer identifiers for roles and
permission names.
To hold RoleBindings per member ID, we may need like, two KiBs
of data on average. If we cache principal, let’s say four. Using
one MiB we could hold data for 256 principals. 256 MiB can hold then
65K of principals. Let’s divide by two for a safety margin. As
a result, we can expect 256 MiB to hold tens of thousands of active
users. This is why AuthInfoProvider caches all RoleBindings principal
can have in each scope. We extract data from IAM only when the cache
expires, for new principals, or when the server starts up for the first
time. This is why GetAssignments
(method of RoleBindings store) is
looking like it looks.
When we have all RoleBindings for relevant members and relevant IAM scope,
then we can iterate PermissionCheck (object + permission) against all
assignments. If many assignments match the given PermissionCheck, then
PermissionCheck will have multiple Results
(variable).
RoleBindings (converted to RoleAssignment
for slimmer RAM usage) are
matched with permissions if:
- they have
owned_objects
which match the object name in thePermissionCheck
. - if the above fails, we check if the Role pointed by RoleBinding
has any Grants containing permissions specified in
PermissionCheck
. - if there are any Grants, we need to check if subScope matches (if it is specified). PermissionCheck contains iam scope and sub-scope forming a full object name. It allows us to have granularity on specific resources.
- if we find a Grant matching PermissionCheck, we store it in Results, note Grant can carry conditions, but we haven’t evaluated them yet.
Thanks to the cache, I/O work by AuthInfoProvider is practically non-existent, typically it can quickly provide list of assigned permissions with a list of conditions.
ConditionChecker for Authorizer
Each PermissionCheck can have multiple results, which can contribute to allowed Permissions. If the result item has no conditions, then we can assume permissions are granted. If it has, then all conditions must be evaluated successfully, so we iterate in the Authorizer code.
ConditionChecker is implemented in file
iam/auth/internal/condition_checker.go
. We have 3 condition types:
- checking by resource field, function
checkByResourceField
- checking by request field, function
checkByRequestField
- checking by CEL condition, function
checkByCELCondition
(will be retired though).
Resource conditions are the most popular, and for good reason, they are
simple and can handle at least full CRUD, and often custom functions too.
For example, suppose we want to assign certain users access to devices
if the field path satisfies metadata.annotations.key = value
:
- CreateDeviceRequest will be forbidden if this field path with a given value is not specified in the resource body.
- UpdateDeviceRequest will be forbidden if we are trying to update this field path to a different value or if the current resource stored in the database does not match.
- DeleteDeviceRequest checks if the Device in the database matches.
- Get/BatchGetDevice(s) are extracted from the database and the condition is checked
- WatchDevice also is checked when the stream starts, we grab resources from the database and evaluate them.
- ListDevices and WatchDevices have a Filter field, so we don’t need to grab anything from DB.
- If there are custom methods, we can still get resources from DB and check if the field path is fine.
We also support attach permissions with resource field conditions, if necessary, we fetch resources from other services. Fetching is done before condition evaluations.
A smaller weakness is the need to have extra checks in the database. The object may be stored in Redis though, giving perhaps a faster answer, but still goes through the network stack. Perhaps another RAM-based cache can be used for storage, but invalidation may be a problem if we want to include List queries. For resource updates, we need to invalidate the previous and new state, and Firestore watch shows us only the new state. Mongo may be more beneficial in this case, especially if we consider the fact that it has active watches for all collections (!!!). It may work for collections especially non-frequently updated.
Checks by request are simpler and aimed at custom methods typically.
Checks by CEL condition are so far being less and less used in v1, but may still have some special use cases if yaml (protobuf) declaration is not enough. They use conditions with bodies specified in the iam.edgelq.com/Condition resource. ConditionChecker uses AuthInfoProvider to grab Conditions from IAM.
1.4 - SPEKTRA Edge IAM Cache Invalidation
AuthInfoProvider relies on RAM cache for low latency processing. The problem is with invalidation. To achieve a long-living cache, we need real-time invalidation straight from the database.
This is why each “store” module in AuthInfoProvider has one or more goroutines using real-time watch. When some object is updated, we may need to update/invalidate the cache. In case of prolonged broken access to IAM, it will invalidate the whole cache and retry.
Invalidation of principals is done using the WatchPrincipals method. This allows IAM to ensure that only selected (allowed) principals are seen by a service.
1.5 - SPEKTRA Edge Multi-Service Authorization
Main authorization happens when the user sends a request to a service,
the authorization is located on the front. However, sometimes a service
executing a request needs to send the next requests to other services.
One often example is EstablishReferences
call in Schema Mixin service.
It is assumed that services don’t trust each other, and it shows here
too. Even if let’s say device service allows UpdateDevice, then IAM
needs to check on its own if UpdateDevice can update the reference to
field spec.service_account
(field in Device resource, pointing as
ServiceAccount from IAM). We are using the fact that cross-region and
cross-service references establishment require a call to
EstablishReferences
.
We have even special authorization for that: see file
mixins/schema/server/v1/resource_shadow/resource_shadow_service_custom_auth.go
.
In this file, we check referenced resources and try to see if this is
allowed for service-making calls, or from the user originally making
the request. In the future, we may opt-out from the original user, and
require that the service has access to referenced resources.
It typically should be the case, ServiceAccount pointed by Device should
be owned by devices (metadata.services.owning_service
). The same goes
for logging or monitoring buckets. We may need proper permission attach
checks for resources first, and support for resource field conditions!
Other than that, service-to-service subsequent calls are treated separately, and service verifies a service.
1.6 - SPEKTRA Edge E-mail Sender
Another 3rd party service we use apart from Auth0 is Sendgrid. You should
see its config in iam/config/apiserver.proto
. It is a second service
for emails, Auth0 itself is used for emails too, like verification accounts.
After all, Users are stored in the Auth0 service, IAM just gets copies.
However, invitations (ProjectInvitations and OrganizationInvitations) are
sent using Sendgrid. See iam/invitationpublisher
directory.
1.7 - SPEKTRA Edge Multi-Service Environment Safety
IAM needs to ensure safety not only between tenants (Organizations, Projects) but also between Services. For this reason RoleBindings are also scoped per Service. There is however a problem with services we need to solve:
- Organizations and Projects can enable services they use, and if they do, they should be able to use these Services. IAM must ensure that the organization/project admin cannot enable services if they don’t have permission to. IAM must ensure the service does not have access to organizations/projects which don’t enable a particular service.
- If the Organization or Project enables Service, then the Service should be able to access the Project/Organization.
- Ideally, the Service should be able to freely access all resources a Project or Organization has, as long as those resources are defined by that service. For example, service devices.edgelq.com must always be able to access any instance of devices.edgelq.com/Device. However, other services should have limited access to devices.edgelq.com/Device collection. It’s called limited access to Projects/Organizations: Project/org admins should be able to regulate which resources are accessible.
- Some Services may be “private”.
- If services are using/importing each other, they need some limited access to each other.
Private services are protected by attach permissions, so project/org admins can’t just enable any Service, this requires updating the list of references of Services after all.
Those things are fixed in IAM fixtures, see iam/fixtures/v1/iam_roles.yaml
.
First, see this role: services/iam.edgelq.com/roles/service-user
. This
gives access to Service data and the ability to attach it. If the project
admin is granted this role in a service scope, they can enable that service.
Then, the next thing, Service should be able to access all relevant
resources projects or organizations have, without specifying the exact
instance. This is why we have the IAM role
services/iam.edgelq.com/roles/base-edgelq-service
, which grants access
to all resources across orgs/projects, as long as certain conditions are
met. Note that we don’t give any create permissions, it would be wrong,
because the Service could start creating resources with the proper
metadata.services
field, without checking if the project/org even uses
the service. It is not an issue for non-creating permissions. To allow
services creating project/organization scope resources, we have
services/iam.edgelq.com/roles/service-to-project-access
and
services/iam.edgelq.com/roles/service-to-org-access
roles. RoleBindings
for these roles are created dynamically by the IAM controller when
the Project/Organization enables some service. This code is located
in iam/controller/v1/iam_scope
.
We also need to regulate service-to-service access. By default, this is
not allowed. However, if one service imports or uses another, we enable
their access to each other. Roles for these scenarios are in
iam/fixtures/v1/per_service_roles.yaml
. Roles with ID
importing-service-access
and imported-service-access
are granted to
importing and imported service, but note it is not symmetrical. It does
not need to be. For example, if one service imports another, then
EstablishReferences is only needed in one direction. Roles with ID
service-to-service-std-access
are used for minimal standard access.
All those RoleBindings regulating access between services, and between
services with projects/organizations, are called “Service RoleBindings”.
They are dynamically created by the IAM Controller when a service is
created/updated, or when an organization/project enables some service.
The module responsible for these RoleBindings is in file
iam/controller/v1/iam_scope/service_rbs_syncer.go
:
- makeDesiredRbsForSvc computes desired Service RoleBindings per each Service.
- makeDesiredRbsForOrg computes desired Service RoleBindings per each Organization.
- makeDesiredRbsForProject computes desired Service RoleBindings per each Project.
Note the convention for mixin services, each service has its copy of
them, like services/<service>/permissions/resourceShadows.listMetaOwnees
.
This is because all services have their schema mixins. Note that
RoleBindings for those “per service roles” are located on the root scope,
see function makeDesiredRbsForSvc
in file
iam/controller/v1/iam_scope/service_rbs_syncer.go
. The reason is that
ResourceShadow is a “root” resource (name pattern is
resourceShadows/{resourceShadow}
, not something like
services/{service}/resourceShadows/{resourceShadow}
). Perhaps it could
have been like this, but it is some continuity from v1alpha2. Also, CLI
commands would become less intuitive. In order then to enable per-service
access, permissions are per-service. If we create
services/<service>/permissions/resourceShadows.listMetaOwnees
per service,
and create root scope RoleBinding containing this permission, in effect
it will be granted for specific services only, not for all.
1.8 - SPEKTRA Edge IAM Principal Tracking
ServiceAccounts are project-scoped resources, but in theory, they can be granted roles in other projects and organizations too. Users are, in IAM terms, global resources, not necessarily bound to any organizational entity. They can however join any project or organization.
Members (Users or ServiceAccounts) are associated with
projects/organizations via RoleBinding resources. Organizational role
bindings are copied to downstream child projects/organizations by
the IAM Controller (iam/controller/v1/iam_scope/org_rbs_copier.go
).
If you visit iam/server/v1/role_binding/role_binding_service.go
, you
should note that, for each written/deleted RoleBinding we are managing
MemberAssignment resource. See iam/proto/v1/member_assignment.proto
for more details, it has described the role.
Generally, though, one instance of MemberAssignment is created per each scope/member combination. This internal resource facilitates tracking of members in organizational entities.
Members can see a list of their projects/organizations via ListMyProjects/ListMyOrganization calls. To make such calls possible, we needed to use MemberAssignment helper collection, we copy also many project/organization fields directly to MemberAssignment instances. Therefore, projects/organizations filter/orderBy/fieldMask/cursor objects can be mostly translated to MemberAssignment ones. To make it work, MemberAssignment is a regional, but globally synced resource (its copies are spread through all IAM regions, period). Regional status ensures that each region is responsible for tracking members in local organizations/projects individually. IamDbController syncs all created copies across all regions, so each region knows the full list of projects/organizations where the given member participates.
In case project/organization fields change (like title), the IAM Controller
is responsible for propagating change to all MemberAssignment instances.
Implementation is in file iam/controller/v1/mem_assignments_meta_updater.go
.
1.9 - SPEKTRA Edge Principal Service Access
RoleBinding is not only binding User/ServiceAccount with
a project/organization. It also binds a member with a service. For
example, a devices manager Role in project X would bind a given member
not only with project X but also with the devices.edgelq.com service
(and applications & secrets, since those are related). Each Role has
a populated field metadata.services
, which points to services where
Role is relevant. RoleBinding also has metadata.services
populated,
and it contains combined services from a Role and a parent object
(organization, project, or service).
When RoleBinding is created, IAM internally creates a MemberAssignment instance per each unique combination of member/scope, and this MemberAssignment will have a “scope” field pointing to the project or organization. However, there is something more to it. IAM will also create additional MemberAssignment objects where the “scope” field points to a Service! Those Service-level MemberAssignment instances are used to track in which services the given Member (User or ServiceAccount) is participating.
IAM Controller has a dedicated module
(iam/controller/v1/iam_scope/service_users_updater.go
), which ensures
that the field metadata.services
is in sync (for Users, ServiceAccounts,
and ServiceAccountKeys). It does it by watching MemberAssignment changes
and making “summary” of services in use. If it notices that some
user/service account has inaccurate data, it will issue an update request
to IAM. Each region of IAM is responsible for watching local members,
but they will access all MemberAssignment instances since those are
synced globally.
Making sure the field metadata.services
of all Users/ServiceAccounts is
synced has double functionality:
- It ensures that a given member can access Service-related data.
- It ensures that the given Service can access member data (via GetPrincipal).
If you check the file iam/fixtures/v1/iam_role_bindings.yaml
, you should
notice special RoleBinding roleBindings/services-participant
. It is
a root-level RoleBinding given to all authenticated members, granting
services/iam.edgelq.com/roles/selected-service-user
role. This role is
a multi-service one. If you see its contents in
iam/fixtures/v1/iam_roles.yaml
, you should see it gives many read/attach
permissions to a holder in a specified list of services. In the RoleBinding
yaml definition, note that this list of services comes from a principal
metadata object! This is how principals get automatic access to a Service.
Role services/iam.edgelq.com/roles/selected-service-user
is similar to
services/iam.edgelq.com/roles/selected-user
. The latter one should be
used on the service level to provide access to that single service, to
someone who has no other access there. The former has an internal purpose,
gives access to many services at once, and will only be assigned to
members who already have some access to specified services. It just ensures
they can access service meta-information.
2 - SPEKTRA Edge Limits Service Design
Limits service is a fairly one of those services with special design.
Service itself should be known from the user guide already and in some
part from the developer guide. Knowledge about plans is assumed. Limits
also is one of the standard services, and its code structure in the
SPEKTRA Edge repository (limits
directory) should be familiar.
What needs explanation is how Limits ensures that “values” don’t get
corrupted, lost, over-allocated, etc. First, resources are allocated
in each service, but resource limits.edgelq.com/Limit
belongs to
a Limits service. Therefore, we can’t easily guarantee counter-integrity
if the resource is created in one service and counted elsewhere. Next,
we know that limit values are passed from service to organizations, then
to potential child organizations, and eventually to projects. From
MultiRegion design, we know that each organization and project may point
to a main region where resources are kept. Therefore, we know that
organizations/{organization}/acceptedPlans/{acceptedPlan}
is in the
organization’s region, and
projects/{project}/planAssignments/{planAssignment}
is in the project’s
region, may be different. This document describes how these Limits work
in this case.
We will also be showing code pointers, where things can be found.
During this guide, you will find out why parallel creations/deletions are not parallel!
2.1 - Service Limit Initialization
When Service boots up, it creates limits.edgelq.com/Plan
instances.
Limits controller, defined in limits/controller/v1
, has LimitsAssigner
processor, defined in
limits/controller/v1/limits_assigner/limits_assigner.go
. It is
created per each possible assigner, therefore, it is created per Service
and Organization. LimitsAssigner is typically responsible for creating
AcceptedPlan
instances for child entities, but, for Services, it makes
an exception: It creates an AcceptedPlan for itself! See file
limits/controller/v1/limits_assigner/default_plan_acceptor.go
, function
calculateSnapshot
computes plans for child entities, and for
the Service itself! This is booting things up, the Service can assign
any values it likes to itself.
2.2 - Project and Organization Limit Initialization
If the project has a parent organization, then this parent organization
is an assigner for a project. If the project is root-level, then its
enabled services are assigners, each service can assign an individual
plan for a project. Same for organizations. When a project/organization
is created, the Limits Controller puts the newly created entity in
the “assigner” box (or boxes for root-level entities). Then it creates
an instance(s) of AcceptedPlan
. Implementation, again, is in
limits/controller/v1/limits_assigner/default_plan_acceptor.go
. It is
worth mentioning however now, that DefaultPlanAcceptor
uses LimitPools
of assigner to see if it will be able to create an AcceptedPlan
resource. If not, it will instead annotate the Project/Organization
that failed to create a plan for. This is why in limits_assigner.go
you can see Syncer for not only AcceptedPlan but also Project and
Organization.
2.3 - AcceptedPlan Update Process
Implementation can be found naturally in the server:
limits/server/v1/accepted_plan/accepted_plan_service.go
. We pass
of course “actual” creation to the core server, but this is just
a small step, the whole logic to execute before any CUD operation
is much more significant here.
When the server processes the AcceptedPlan resource (Create or Update), then we are guaranteed to be in the Limits service region where the assigner resides. Because LimitsPools are a child of Service or Organization, we can guarantee that they reside on the same regional database as AcceptedPlan. Thanks to this, we can verify, within the SNAPSHOT transaction, that the caller does not attempt to create/update any AcceptedPlan that would exceed the limit pools of an assigner! This is the primary guarantee here: Assigner will not be able to exceed allocated values in its LimitPools. We need to check cases where AcceptedPlan increases reservations on Assigner LimitPools. When we decrease (some updates, deletions), then we don’t need to do that.
However, there is some risk with decreasing accepted plans
(some updates and deletions). There is a risk that doing so would
decrease assignee limit values below current usage. To prevent this,
in the function validateAssigneeLimitsAndGetLimitPoolUpdates
in
server implementation, we are checking assignee limit values. This
will work in 99.99% of cases unless some new resources will be
allocated while we confirm that we can decrease limits. Therefore,
we don’t have guarantees here.
In the result, when we create/update AcceptedPlan, we are only increasing LimitPools reservations values of Assigner. When we would decrease LimitPool values, we just don’t yet.
Decreasing values is done by the Limits controller, we have a task for this,
in limits/controller/v1/limits_assigner/limit_pool_state_syncer.go
.
It takes into account all child Limit and LimitPool instances (for assignees),
which are synchronized with PlanAssignment instances. It then sends
UpdateLimitPool requests when it confirms decreased values of AcceptedPlan
action (updated or deleted) took an effect. Reservation is immediate,
release is asynchronous and delayed.
Some cheating however is potentially possible, if the org admin sends UpdateLimitPool trying to minimize the “Reserved” field, after which it can attempt to create a new accepted plan quickly enough before the controller fixes values again. Securing this may be a bit more tricky, but such an update would leave LimitPool with a Reserved value way above the configured size, which will be detectable, along with ActivityLogs, and if not, ResourceChangeLogs. It is unlikely it will be tried this way. A potential way to secure this would be to disable AcceptedPlan updates if the Reserved value of LimitPool decreased recently, with some timeout like 30 seconds. Optionally, we can just put some custom code in the API Server for UpdateLimitPool, and validate only straight service admin updates them (check principal from context). This is not covered by IAM Authorization code-gen middleware, but custom code can simply do.
2.4 - Resource Limit Assignment
It is assumed that organization admins can see and manage AcceptedPlan
instances, but their tenants can only see them. Furthermore, parent
and child organization and other organization/final projects are separate
IAM scopes. Child entities also may reside in different primary regions
than their parent organization (or service). For these reasons, we have
resource type PlanAssignment
, which is even read-only, see its proto
definition. This allows admins to see the plan assigned for them, but
without any modifications, even if they are owners of their scope.
Because PlanAssignment is located in a region pointed by the
project/organization, we can guarantee synchronization with LimitPool/Limit
resources!
When AcceptedPlan is made, the Limits Controller is responsible for
creating PlanAssignment asynchronously, which may be in a different
region than source AcceptedPlan. The code for it is in
limits/controller/v1/limits_assigner/assigned_plans_copier.go
.
It creates an instance of PlanAssignment and sends a request to API
Server. The server implementation is in, naturally, file
limits/server/v1/plan_assignment/plan_assignment_service.go
. Note
that the controller is setting output-only fields, but it is fine, when
the server creates an instance, it will have these fields too. This
only ensures that, if there is any mismatch in the controller, it will
be forced to make another update.
When processing writes to PlanAssignment, the API Server grabs AcceptedPlan from the database, we require the child organization or project to be in a subset of regions available in parents. Therefore, we know at least a synced read-only copy of AcceptedPlan will be in the database. This is where we grab the desired configuration from.
PlanAssignment is synchronized with Limit and LimitPool instances, all of these belong to the same assignee, so we know our database owns these resources. Therefore, we can provide some guarantees based on SNAPSHOT: Configured limit values in Limit/LimitPool resources are guaranteed to match those in PlanAssignment, users don’t get any chance to make any mistake, and the system is not going to be out of sync here.
Note that we are only changing the configured limit, we have also so-called active limits. This is maintained by the controller. There is some chance configured limit is being set below current usage, if this happens, the active limit will stay on a higher value, as large as usage. This will affect the source limit pool reserved value, it will stay elevated! It is assumed however that PlanAssignment and configured limits must stay in sync with AcceptedPlan values, no matter if we are currently allocating/deallocating resources on the final API Server side.
Note that the limits controller tracks the active size and reserved value for LimitPool instances. Limits are on the next level.
2.5 - Resource Limit Tracking
We need to provide a guarantee that the usage tracker stays in sync
with the actual resource counter. The best way to do that is to count
during local transactions. However, resource Limit belongs to the Limits
service, not the actual servicee. This is why we have Limits Mixin
Service in the SPEKTRA Edge repository, mixins/limits
.
It injects one resource type: LocalLimitTracker. Note it is a regional
resource, but not a child of a Project. This means that no project admin
will be able to see this resource ever, or any parent organization. This
resource type is hidden, only service admins will be able to see it. This
prevents any chance of final user mismanagement as well. Because this
resource type is mixed along with final service resources, we can achieve
SNAPSHOT transactions between actual resources and trackers. We can even
prevent bugs that could result in the usage tracker having invalid values.
When we create/update the LocalLimitTracker resource, we can extract
the true counter from the local database, see file
mixins/limits/server/v1/local_limit_tracker/local_limit_tracker_service.go
.
To check how LocalLimitTracker usage is tracked during transactions, check two files:
mixins/limits/resource_allocator/v1/resource_allocator.go
common/store_plugins/resource_allocator.go
This is how the store plugin tracks creations/deletions, at the end of the transaction, it tries to push extra updates, LocalLimitTracker instances for all resource types where several instances changed. This guarantees complete synchronization with the database. But note this does not create LocalLimitTrackers yet.
This is why Limits Mixin comes with not only an API Server
(so LocalLimitTrackers can be accessed) but also a controller, see
mixins/limits/controller/v1
directory. Inside Limits processor we have:
LocalLimitTrackersManager
instance, which Creates/Updates/Deletes instances of LocalLimitTracker for every Limit instance in Limits service.- Synchronizes Limit instances in limits service using LocalLimitTrackers from its region. It means that there is no actual point in meddling with Limit fields, the controller will fix them anyway, and they don’t participate in actual usage checking anyway.
- Maintains also PhantomTimeSeries, so we have special store usage metrics, showing how historically resource counters were changing.
Note that the Limits processor in this controller has built-in multi-region features, primary region for project creates/deletes LocalLimitTrackers, but the final regions are maintaining Limit instances and PhantomTimeSeries.
2.6 - Project and Organization Deletion Process
When Project/Organization is deleted, we need to ensure that limit
values will return to the assigner. This is why AcceptedPlan instances
have assignee reference fields, with the ASYNC_CASCADE_DELETE
option.
When they are deleted, plans follow. This will delete PlanAssignments,
but as it was said, LimitPools are not given reserved values yet.
Instead, db-controllers should be deleting all child resources of the
assignee, like Project. This will decrease Limit usage values, till we
hit 0.
To prevent deletion of Limit/LimitPool instances before they reach zero
values, we utilize metadata.lifecycle.block_deletion
field, as below:
-
limits/server/v1/limit/limit_service.go
Take a look at the update function, UpdateMetadataDeletionBlockFlag.
-
limits/server/v1/limit/limit_pool_service.go
Take a look at the update function, UpdateMetadataDeletionBlockFlag.
This way LimitPool and Limit resources disappear only last. We achieve some order of deletions, so it is not chaotic. The controller for the assignee will confirm the reserved value of LimitPool is decreased only after whole resource collections are truly deleted.
3 - SPEKTRA Edge Wide-Column Store Usage
Wide-column store provides alternative data storage compared to the document storage from Goten. It is focused on wide-column data stores, and currently, we support Google Bigtable and ScyllaDB as the backend engine. However, from the user’s perspective, it is worth mentioning the properties and features of this particular database interface:
- this is a no-relational db with no joins or transactions. It has a simple interface: Query and Save (can overwrite). There is no “Get” equivalent and does not support streaming reading.
- it is semi-schemaless, it mostly stores data in binary and non-queryable format. However, it allows to definition of column families and a set of filterable field keys.
- data is stored using timestamp order. When saving, it is required to provide a timestamp per each column value. Query always requires a time range.
- data is automatically GC-ed after the defined TTL per column family.
- optimized for large data sets and throughput.
- possible query operators: EQUAL, IN, NOT EQUAL, NOT IN
- a single filter can contain many conditions connected with the AND operator.
- query itself may have many filters - effectively supporting OR operator.
This store is fully implemented in the SPEKTRA Edge repository. The store
interface and implementation can be found in
common/widecolumn/store/store.go
. The package for this file also
contains primary user-visible objects.
We have a couple of use cases (resources!) within the SPEKTRA Edge repository where we use this storage:
- in the Monitoring service, we use it for TimeSeries
- in the Audit service, we use it for ActivityLogs and ResourceChangeLogs
- in the Logging service, we use it for Logs
For each of these cases, we are wrapping wide-column store into the proper interface.
In the service specification for these resources, we are disabling metadata and standard CRUD.
3.1 - Wide-Column Data Models
User Data Model
Input/output of widecolumn store is KeyedValues
from store package,
file common/widecolumn/store/keyed_values.go
. It can be thought
of as a resource. It is combined of:
-
Single Key object (uid package).
It is used to uniquely identify resources.
-
Array of Column values (data entry package, object
ColumnValue
).Each column value has:
- Column Family name (string)
- Timestamp
- Optional binary key
- Data (binary data)
KeyedValues can hold multiple column values with timestamps ranging from hours, days, weeks, months, and potentially years. Therefore, the Write method only saves a range of values for a single key (normally it appends!). The query just grabs partial data only for a given range.
Column family and optional binary keys are identifiers of column values for the same timestamp (so they don’t overwrite each other).
Note that this makes “a resource called KeyedValues” a time-series-like resource. In this context, Key represents some header and column values (with timestamps) contain a series of values across an always-increasing time range.
Imagine there is a temperature sensor with some serial number. It sends
the temperature every minute. In that case, we can define the following
Key: serial_number=1234, product=temp_sensor
and Column Family:
temperatures
. Then each temp can be represented as a double value and
associated with a specific timestamp. Double can be encoded as binary
data. We don’t need an optional binary key in this case.
Keys
A key is a unique identifier of a column value series. Key definitions
and manipulation are provided by the uid
package. It is a set of
key-value pairs (KV type). KV without Value part is just a “KId”
object, key identifier. It may be better thought of as key field
identifier, because “the real” Key is an array of these objects.
Each KV pair has an equivalent of a UID object: UID allows to mapping of Keys and KeyValue pairs between string and binary representation UIDs are represented as paths in up-to depth three tree each node may be allocated using parent counter and atomic operations hierarchy:
- Root UID len 1; (always 1), str:
_
- Key UID len 2; (1, kid), example str:
_#serial_number
- Value UID len 3; (1, kid, vid), example str:
_#serial_number#1234
It is worth mentioning that UID with depth 1 is the same as the empty root value. UID with length 2 is just a KId, then full size is equivalent to full KV.
It is important to say that UID is more focused on how data is internally stored in the underlying backend (bigtable or scylla).
Going back to the temperature sensor example, we can notice our Key object has two KV pairs:
serial_number=1234
product=temp_sensor
But if you look at the structure of KV you may notice that it is a pair of integer values. This is because the storage of integers is more efficient than strings. Especially if the given key value repeats many times across time and different keys. Therefore, each type in the uid package has a “String” equivalent:
- KV has SKV
- KId has SKId
- UID has StrUID
Note that the main store interface, apart from Query/Write, provides also functions for:
- Allocating string key values.
- Resolving strings to integers back and forth.
Structure KeyedValues provides Key and SKey as fields. They are meant to be equivalent. However, it is worth to note:
- When executing the Query method, all KeyedValues will only have a Key value set. It is assumed that SKey may not always be needed. Besides, it is more efficient to resolve all skeys once the query is completed in bulk.
- When executing the Write method, the store will check if the Key is defined. If not, it will default to SKey and allocate/resolve at runtime. However, it is recommended to use Key whenever possible for performance reasons.
Storage Data Model
Data is stored in a bit different format than it is presented in
KeyedValues. Object KeyedValues is transformed into a set of DataEntry
objects from the dataentry
package. DataEntry is a combination of:
- Row object (see
dataentry/row.go
) - Array of ColumnValue objects (see
dataentry/column_value.go
)
It may look like Row is equivalent to Key from KeyedValues, but it i not. There is a transformation going on:
-
Key from KeyedValues is being split into two keys, promoted and tail key. This split is defined by the
TableIndex
object from theuid
package. As you may have figured out, this is to help query data in a fast and efficient way when the filter defines a set of keys, then the store will try to pick up the index with the promoted key set most closely to the filter.We are indexing!
-
Column value timestamps are transformed into
RowSequence
objects (seedataentry/row.go
file). Those column values that have the same sequence are grouped. Otherwise, for each unique sequence, a new Row is created, containing the promoted key, tail key, and sequence number. Then it gets assigned column values that have the same sequence number. -
Note that
KeyedValues
are created fromIndexedKeyedValues
when writing, each TableIndex will createDataEntry
object and those indices are full replicas!
Example: Imagine we have the following KeyedValue (single):
- Key:
serial_number=1234, product=temp_sensor
- Values:
temperatures: 123.4, 2020-01-01T00:00:00Z
temperatures: 124.4, 2020-01-01T00:01:00Z
Then it will be transformed into the following DataEntry objects,
provided that serial_number
is used as an index:
- DataEntry 1:
- Row:
- Promoted Key:
serial_number=1234
- Tail key:
product=temp_sensor
- Sequence:
fromTimestamp(2020-01-01T00:00:00Z)
- Promoted Key:
- Values:
temperatures: 123.4, 2020-01-01T00:00:00Z
- Row:
- DataEntry 2:
- Row:
- Promoted Key:
serial_number=1234
- Tail key:
product=temp_sensor
- Sequence:
fromTimestamp(2020-01-01T00:01:00Z)
- Promoted Key:
- Values:
temperatures: 124.4, 2020-01-01T00:01:00Z
- Row:
When data is saved to the underlying DB, repeated fields from “Values” like timestamp may be dropped, as we already have them in the Row object. Promoted/Tail key understanding is important to write good indices! In this example, we assumed a single promoted index, but if we had more, we would have more replicas.
3.2 - SPEKTRA Edge Wide-Column Versions
We have right now two supported storage versions: v2 and v3. Version v2 is old and has lots of influence from the Monitoring service that had widecolumn implementation internally for its private use. Later Audit and Logging services were added but v2 still specifies some fields relevant for monitoring only. It was written also having bigtable in mind, where you can query by not only row range, but column values range. This is however not efficient with ScyllaDB and likely other CQL-based DBs. Moreover, this is not needed, a query from monitoring only needs data from a specific column key, and Audit/logging always queries by whole range only. There is no plan to utilize this feature, so it is better to have it removed. On top of that, sequence numbers in v3 have nanosecond precision, while in v2 only seconds. This may allow us to drop timestamps from column values completely and invalidates the reason why we have many column values in Audit/Logging anyway.
Summary:
- v3 has Sequence in nanosecond precision, v2 in seconds.
- Row in v2 contains: promoted key, tail key, sequence number, empty Aggregation key, empty start timestamp, and alignment period. v3 has promoted key, tail key, and sequence number only.
- Column Value in v2 has extra elements removed in v3: Alignment period, start time.
- Query in v2 allows to specify column key range, v3 not. We are using this feature right now in Monitoring (Aligner is used to specify the column to pick the correct value type). However, we don’t specify a multi-value range, column range start is equal to column value range, always. Therefore, for v3, to keep the same functionality, we need to move Aligner as part of Key in KeyedValues.
TODO:
- Monitoring, Audit, and Logging write/query to v2. We need to implement v3 implementations for these.
- All three services will need to double-write for some time. Queries should be executed on V3 when possible, but v2 should be used as a fallback. Writes should be executed on both v2 and v3. This is to allow for a smooth transition from v2 to v3. We should use the config file to indicate when v2 should be written or from which data point v3 is valid. Specific stores (like TimeSeriesStore) should internally have two references to StoreV2 & StoreV3 of widecolumn.
3.3 - SPEKTRA Edge Wide-Column Usage Example
package example
import (
"context"
"fmt"
"time"
wcde "github.com/cloudwan/edgelq/common/widecolumn/dataentry"
wcstore "github.com/cloudwan/edgelq/common/widecolumn/store"
wcuid "github.com/cloudwan/edgelq/common/widecolumn/uid"
)
func RunDemo(ctx context.Context, store wcstore.WCStoreV3) error {
// Create reusable fields descriptor. Define two key fields:
// serial_number (no flags - 0), no default value, required, not
// negative filters allowed product (no flags - 0), no default value,
// required, not negative filters allowed.
fieldsDesc := wcuid.NewKeyDescriptor().
AddSKId("serial_number", 0, "", true, false).
AddSKId("product", 0, "", true, false)
// Allocate string - int mappings for later. Now keep fieldsDesc "forever".
// This part can be completed at the startup.
strUIDs := fieldsDesc.GetUnresolvedStrUIDs()
uids, err := store.ResolveOrAllocateStrUIDs(ctx, strUIDs)
if err != nil {
return err
}
fieldsDesc.ResolveSKVsInBulk(strUIDs, uids)
// Create specific key - err is if for example key is repeated or required
// key is not provided. Undefined key are stored as unclassified - but they
// can be classified later providing more field descriptors.
describedSKey, err := fieldsDesc.NewDescribedSKeyBuilder().
Add("serial_number", "1234").
Add("product", "temp_sensor").Build()
if err != nil {
return err
}
// Prepare indices by which data must be split into. We want to index
// by serial_number only.
indices := []*wcuid.TableIndex{wcuid.NewTableIndex([]wcuid.UID{
wcuid.KIdUID(
fieldsDesc.GetFieldDescriptorBySKey("serial_number").GetKey(),
0,
),
})}
// Prepare data to write. We could have allocated skey into key, but
// we dont need.
indexedValues := make([]wcstore.IndexedKeyedValues[*wcde.ColumnValueV3], 0)
indexedValues = append(indexedValues, wcstore.IndexedKeyedValues[*wcde.ColumnValueV3]{
Indices: indices,
KeyedValues: wcstore.KeyedValues[*wcde.ColumnValueV3]{
SKey: describedSKey.FormatRawKey(),
Values: []*wcde.ColumnValueV3{
{
Family: "temperatures",
Time: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC),
Data: marshalDouble(123.4),
},
{
Family: "temperatures",
Time: time.Date(2020, 1, 1, 0, 1, 0, 0, time.UTC),
Data: marshalDouble(124.4),
},
},
},
})
// Save the data
err = store.Write(ctx, indexedValues)
if err != nil {
return err
}
// Prepare query - we will get only one of two points saved.
// We filter by specific serial number and all keys should be automatically
// validated (true flag)
filter := wcstore.NewFilterFromSKVs([]wcuid.SKV{
wcuid.NewSKIV("serial_number", "1234"),
}, true)
query := &wcstore.QueryV3{
StartTime: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC),
EndTime: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC),
OrGroups: []*wcstore.QueryV3OrGroup{
{
Indices: indices,
Filter: filter,
ColumnFamily: "temperatures",
},
},
}
tempsByKey := make(map[wcuid.Key][]float64, 0)
uniqueKeys := make([]*wcuid.Key, 0)
err = store.Query(
ctx,
query,
func(
ctx context.Context,
seq wcde.RowSequenceV3,
entries []wcstore.KeyedValues[*wcde.ColumnValueV3],
isDone bool,
) (bool, error) {
for _, entry := range entries {
temps, ok := tempsByKey[*entry.Key]
if !ok {
temps = make([]float64, 0)
uniqueKeys = append(uniqueKeys, entry.Key)
}
for _, v := range entry.Values {
temps = append(temps, unmarshalDouble(v.Data))
}
tempsByKey[*entry.Key] = temps
}
// Return true to continue, false to stop
return true, nil
},
)
if err != nil {
return err
}
skeys, err := store.ResolveKeys(ctx, uniqueKeys)
if err != nil {
return err
}
for i, key := range uniqueKeys {
temps := tempsByKey[*key]
descSKey, _ := fieldsDesc.ValidateSKey(skeys[i], false)
serialNumber := descSKey.GetSVid("serial_number")
fmt.Sprintf(
"Serial number %s has %d temperatures: %v",
serialNumber,
len(temps),
temps,
)
}
return nil
}
It is important to learn how to use indices efficiently, basically to successfully query data, a filter query must define all fields that are defined in at least one index. It is like looking for a proper data partition. Tail keys are filtered out in memory (if there are filter conditions for them), but data is still being pulled.
Note also an important quirk about widecolumn storage: We need to specify indices both when writing and when querying. Widecolumn can detect what index is most optimal, but it does not remember them. There are no functions for saving/getting indices. It is the responsibility of a user to provide indices when saving, and a list of available when querying!
It is generally “fine”, however - all our implementations use descriptors which we need to get from the local database when validating user input anyway! If we get it from the database, then it makes no sense to have extra DB lookups when reading indices. They are used when writing, querying, and also validating.
3.4 - SPEKTRA Edge Wide-Column Annotations
Those four resource types in SPEKTRA Edge (TimeSerie, ActivityLog, ResourceChangeLog, Log) are still declared as Goten resources, but have several opt-outs that make them unique. See in SPEKTRA Edge repo:
- Audit API-Skeleton:
audit/proto/api-skeleton-v1.yaml
, resources:ActivityLog
,ResourceChangeLog
- Logging API-Skeleton:
logging/proto/api-skeleton-v1.yaml
, resources:Log
- Monitoring API-Skeleton:
monitoring/proto/api-skeleton-v4.yaml
, resources:TimeSerie
See opt-out settings: They don’t have a metadata field, standard CRUD is
disabled, and pagination and resource change are also not present. For
TimeSerie, we are also disabling the name field (more for historical reasons,
to match to Google Stackdriver API in version v3). We also disabled
“basicActions” for all (by specifying *
), but this option is not
necessary, since disabling standard CRUD ensures that no basic actions
(like Create, Get, BatchGet…) are generated.
As a result, Goten will not generate anything for these resources in the
access
or store
packages, nor server
will have any basics related
to them. Some standard types in the resources
package will also be
missing.
They will still get Descriptor instances, and they will still satisfy
the gotenresource.Resource
interface (as defined in the
runtime/resource/resource.go
file in the Goten repo), but for example
EnsureMetadata
will return nil value. Function SupportsMetadata
in the gotenresource.Descriptor
interface will return false.
This is how we are taking control of resources from Goten to us, and we can use a different storage type.
3.5 - SPEKTRA Edge Monitoring Time Series
You can see the proper TimeSeries storage interface in the file
monitoring/ts_store/v4/store.go
, TimeSeriesStore
. We will study
here QueryTimeSeries
and SaveTimeSeries
, which are most important
for our case.
TimeSeries are the most special across all widecolumn types: when a user makes queries, the same time series can be completely different: when group by is in use (which is almost always the case!), then time series are merged forming dynamically new ones than submitted! This should be already known from the User guide though.
As per user documentation, it should be clear that we use
MonitoredResourceDescriptor and MetricDescriptor instances to create
final identifiers of TimeSeries (keys), and a list of promoted indices
for faster queries. Before we go into explaining saving/deleting, it is
worth checking the initTsData
function in the
monitoring/ts_store/v4/store.go
file. This is where we initialize:
- tsKeyDesc, which contains the common part of the descriptor for all time series, regardless of labels in metric or resource descriptors
- Common index prefix, consisting of
project
andmetric.type
fields from TimeSerie.
Additional notes:
- TimeSeries storage has no concept of regional replication, each region contains its data only.
- This document describes only TimeSeries storage, but not the full monitoring design, which is described separately.
Overall, object TimeSerie
is mapped into KeyedValues
from WideColumn,
but note this is a bit higher level object. Each mapping requires some
level of design. This is done in the following way:
- TimeSerie fields
project
,region
,metric
, andresource
are mapped to various String key-values, forming togetheruid.SKey
. This SKey is then mapped to someuid.Key
, which is a short integer representation. Note thatuid.Key
is the identifier ofKeyedValues
. - TimeSerie field
key
is binary encodeduid.Key
, which has “compressed” project, region, metric and resource fields. - TimeSerie field
points
, which is a repeated array, is converted into Widecolumndataentry.ColumnValue
, one by one.
Single Point in TimeSerie is mapped to dataentry.ColumnValue
like this:
- TypedValue, which holds an actual value, is mapped to binary data (bytes). We will need to marshal.
- The timestamp of the key naturally maps to time in
dataentry.ColumnValue
. Note that AlignmentPeriod is, apart from 0 value, dividable by 60 seconds. This means that the timestamp will be some multiplication of the AP value. - Family in
dataentry.ColumnValue
will contain AlignmentPeriod (in string format). It means that values from a single AP will be stored in separate column families (tables)! - Interface
dataentry.ColumnValue
also has an optional key, in Monitoring, we will store their perSeriesAligner value from Point Aggregation. For example, this will allow us to saveALIGN_RATE
(some double) andALIGN_SUMMARY
(some distribution) with the same key, and same timestamp, in the same family column, but next to each other.
In monitoring, we will not save in the database raw points (Alignment period = 0 seconds). We will store only aligned values.
Saving
The writing time series is implemented fully in the
monitoring/ts_store/v4/store_writing.go
file.
There is a possibility, when we save time series, that the TimeSerie
object contains already a key
field value, in which case we don’t
need to resolve it ourselves. However, we will need to verify it is
correct still! The client may choose to submit TimeSeries with binary
keys to make a final request a bit smaller (they can drop project,
region, metric, and resource fields).
Moreover, from TimeSerie, we can and must get metric and resource
descriptors, from which we can compute finally indices. This way,
we can wrap KeyedValues
into IndexedKeyedValues
, as required
by widecolumn storage. Note that those two descriptor resources
describe what fields are possible to be in TimeSerie object in general,
they regulate metric.labels
and resource.labels
. If we map
MonitoredResourceDescriptor/MetricDescriptor as widecolumn store types,
they would be mapped to uid.KeyFieldsDescriptor
!
In implementation, each TimeSerie
we will wrap into
CreateTssOperationResult
, type defined in the
monitoring/ts_store/v4/store.go
file, see it. This will contain
params of KeyedValues
, along with associated descriptor resources.
Then, metric and resource descriptor resources will be wrapped together
with uid.KeyFieldsDescriptor
types, to which they de facto map.
When we save time series, we map TimeSerie
into CreateTssOperationResult
already in initiateCreateOperations
. Inside this function, we validate
basic properties of the TimeSerie object, project, metric, and resource
type fields. We use the field descriptor tsKeyDesc
, which was initialized
in the store constructor. At this initial stage, we don’t know the exact
metric and resource descriptor types, so we just validate basic properties
only! If the binary key is provided, we are initializing descKey
instance,
otherwise descSKey
. The former one is better for performance, but not
always possible. Note that at this stage we have described keys, and
validated base properties, but descriptors still have work to do.
In the next stage, we grab descriptor references, see
getWrappedDescriptors
. It does not make any resolutions yet,
the same descriptors may be used across many TimeSerie objects,
so we don’t want to do more resolutions than necessary. With Goten
resource descriptors wrapped with uid.KeyFieldsDescriptor
, we are
resolving in resolveAndPopulateDescriptors
function, where we finally
get field descriptors as required in uid
format. This will allow us
to execute final, proper validation, and compute indices for widecolumn.
Proper validation is done in functions defaultAndValidateHeaders
and
defaultAndValidatePoints
. In the second function, we are also
generating final column values used by the widecolumn storage interface!
However, note some “traps” in defaultAndValidatePoints
:
if ap == 0 && !md.GetStorageConfig().GetStoreRawPoints() {
continue
}
Raw points, unaligned that clients are sending, with AP equal to 0 seconds, we are skipping saving in the database, second condition is pretty much always evaluated to true! It will be more explained in the monitoring design doc.
With data validated, and with output columns populated, we can now ensure
that the output raw key in CreateTssOperationResult
is present. If
the binary key was not submitted when saving TimeSerie (field key
), then
we will need to use resolver to allocate string to integer pair. Mapping
is of course saved in widecolumn storage. See the
ensureOutputKeysAreAllocated
function.
Next, with column values and raw keys, we need to wrap KeyedValues
into
indexed ones IndexedKeyedValues
. This is what we finally pass to
widecolumn storage. Inside keyed values are duplicated per each index
and saved in underlying storage.
Querying
When we query time series we need:
- Convert time series query params into WideColumn query object (mapping).
- Create a batch processor, that maps KeyedValues from Widecolumn storage into TimeSerie objects.
This implementation is fully in monitoring/ts_store/v4/store_querying.go
.
Widecolumn store, unlike regular Goten document one, supports OR groups, but it is more like executing multiple queries at once. Each query group represents a filter with a set of AND conditions, plus can be executed on different indices. We need to deal with this specific WideColumn interface trait, where we must specify indices when saving and querying.
When executing a query, we gather all input parameters and convert them
into a tssQuery
object, with tssQueryGroup
as one OR group. This is
not the format required by widecolumn but by some intermediary. See
function createTssQuery
.
We support two types of queries for TimeSeries:
- With filter specifying one or more binary keys (
WHERE key = "..."
ORWHERE key IN [...]
). Each key forms one “OR” group, with just a long list of AND conditions. - With filter specifying a set of metric/resource conditions
(
WHERE metric.type = "..." AND resource.type = "..." ...
). However, we also support IN conditions for those types. Resource types may also be omitted optionally (but defaults are assumed then). For each combination of metric + resource type, we create one OR group.
One group query must specify exactly one metric and one resource type,
because each combined pair defines its own set of promoted indices, we
MUST NOT combine them! This is reflected in createTssQuery
.
For each OR group, we are grabbing descriptors, using which we can finally verify if conditions in filters are defined correctly, if a group by fields specifies existing fields, and we can compute indices we know we can query.
Then, we map the query to widecolumn one. From notable elements:
- We are passing a reducing mask for fields, it will make output
uid.Key
have some fields “reduced”! - We most of the time ignore perSeriesAligner passed by the user,
and switch to
ALIGN_SUMMARY
. It is because when we save, we use almost exclusivelyALIGN_SUMMARY
. Other types almost always can be derived from summary, so there is no point in maintaining all.
Finally, we execute the query with our batch processor. Thanks to
the reducing mask, the field Key for each KeyedValues
has already
reduced the list of key values, reduced keys are in the rest key set.
This is how we implement groupBy in monitoring. Each entry in the
resultEntries
map field of queryResultProcessor
will represent
the final TimeSerie object. However, this is a very CPU and
RAM-intensive task, because widecolumn storage returns values still
as we saved them, all individual instances! If for a single timestamp,
we have thousands of entries sharding the same key, then we will merge
thousands of points to have one point, and we repeat per each timestamp.
At least a wide column guarantees that, when querying, results will be
returned only with increasing seq
value. If we query with AP = 300s,
we will get points for let’s say noon, then 12:05, and so on. When we see
the sequence jump, then we know we can add the final point to the increasing
list.
Still, this is a serious scaling issue if we try to merge large collections into a single reduced key.
Since perSeriesAligner is different from what we passed to widecolumn
compared to what the user requested when we convert ColumnValue into
TimeSerie point (function buildPointFromWCColumnValue
), we need to
extract proper value from the summary object.
Once TimeSerie is obtained, we resolve in bulk all integer keys into a string, and we can form output time series.
3.6 - SPEKTRA Edge ActivityLogs and ResourceChangeLogs
The audit store for logs is implemented in the
audit/logs_store/v1/store.go
file. This interface also uses
the WideColumn store under the hood. We are mapping to KeyedValues
each ActivityLog
and ResourceChangeLog
.
Activity Logs
For ActivityLog, mapping is the following:
- A lot of fields are used to create
uid.Key
/uid.SKey
:scope
,request_id
,authentication
,request_metadata
,request_routing
,authorization
,service
,method
,resource.name
,resource.difference.fields
,category
,labels
. - Name field contains scope and binary key
uid.Key
. However, when logs are saved, this is often not present in the request (value is allocated on the server side). - Each event in the
events
array is mapped to a singledataentry.ColumnValue
. Then, we have two additional special Column values: fieldsresource.difference.before
andresource.difference.after
.
ActivityLog typically has 3 events: client message, server message, and exit code. It may be much longer for streaming calls, and be pretty long.
This is how ActivityLog Event maps to ColumnValue
in WideColumn:
- The whole event is marshaled into binary data and passed to ColumnValue.
- Family field is always equal to one static value,
ALStoreColumnType
. It should be noted that all ActivityLogs use one single ActivityLog column family! - We extract the time value from ActivityLog Event and use it as a Time in ColumnValue. Note that in V2 format WideColumn only uses second precision for times!
- Additional column key will contain event type (client msg, server msg…), and nanoseconds part of the timestamp. This is important if ActivityLog contains streaming messages and we have more than one of a single type within a second! This is how the column key type protects against overwrites.
Pre&Post object diffs from ActivityLogs will be mapped to ColumnValue:
- The whole object is marshaled into binary data and passed to ColumnValue.
- Column family is just equal to the const value of
ALStoreColumnType
. - Timestamp is equal to the first event from ActivityLog
- Column key contains the type (before OR after field values) with nanoseconds from the timestamp. This timestamp is not necessary, this is just to provide a similar format to those of events.
Resource Change Logs
ResourceChangeLogs have the following mapping to KeyedValues:
- A lot of fields are used to create
uid.Key
/uid.SKey
:scope
,request_id
,authentication
,service
,resource.type
,resource.name
,resource.pre.labels
,resource.post.labels
. - Name field contains scope and binary key
uid.Key
. However, when logs are saved, this is often not present in the request (value is allocated on the server side). - ResourceChangeLogs are a bit unique - but we marshal the whole of them to binary data, and they are forming ColumnValue types.
Each ResourceChangeLog typically has two ColumnValues because we are saving it twice: The first time, before the transaction concludes (so we have a chance to protest before allowing commitment), then after the transaction concludes.
In summary, ColumnValue is formed this way:
- Binary data contains the whole log marshaled
- Column family is set to the constant variable value of
StoreColumnType
. - Time is extracted from request time (first client message received).
- Column key is also used, we have one value
StoreColumnPreCommitType
when thetransaction.state
field of ResourceChangeLog is equal toPRE_COMMITTED
, otherwise, it isStoreColumnFinalizedType
.
If you check the NewStore
constructor in the
audit/logs_store/v1/store.go
file, you will notice that, unlike in
monitoring store, we have quite big uid.KeyFieldsDescriptor
instances
for Resource change and Activity logs, and a ready set of indices, not
just a common prefix.
If you analyzed monitoring time series storage querying and writing, then checking the same for Audit logs will be generally simpler. They follow the same principles with some differences:
- In monitoring we had two descriptors per TimeSeries, in Audit we have one descriptor for activity and another for resource change logs.
- Specific for resource change logs: We call
SaveResourceChangeLogsCommitState
and use internally againSaveResourceChangeLogs
, which is used for saving logs PRE COMMIT state. - For both Activity and ResourceChange logs we don’t require descriptors, usually labels are empty sets anyway, we have already large sets of promoted indices and labels, and this is useful for bootstrap processing when descriptors may not be there yet.
- When we query resource change logs, we don’t need to resolve any
integer keys, because whole logs were saved, see
onBatch
function. We only need to handle separate entries for the commit state. - When we query logs, we will get all logs up to second precision. It means, that even if we have a super large amount of logs in a single second, we cannot split them, continuation tokens (next page tokens) must be using second precision, as required by V2 storage format.
- Because logs are sorted by timestamp, but with second precision, we need to re-sort again anyway.
Things could be improved with the v3 SPEKTRA Edge wide-column version.
3.7 - SPEKTRA Edge Logging Store
The logging store for logs is implemented in the
logging/logs_store/v1/store.go
file. This interface also uses
the WideColumn store under the hood. We are mapping to KeyedValues
all Log
instances. Unlike in Audit, these logs can truly be very
long, for one uid.Key
mapping we can have logs lasting days, months,
and years.
From the Log instance, we extract fields scope
, region
, service
,
version
, log_descriptor
, and labels
, which are converted to
uid.Key
.
Regarding ColumnValue, the whole Log body is marshaled into a binary array, and the rest of the columns are extracted in the following way:
- Column family is just one for all logs
- Time is taken from the log timestamp, but we use second precision only.
- To prevent log overrides, in the column key we store the nanosecond part of the log timestamp.
In this case, we would also benefit from the v3 version where timestamps have nanosecond precision.
Logs saving/querying is like for other described here storages, and source code should be more or less readable.
4 - SPEKTRA Edge Monitoring Pipeline Design
With the data layout of the wide-column data store explained in the previous wide-column store page, let’s talk about the monitoring pipeline aspect of the SPEKTRA Edge monitoring system.
Unlike in Logging or Audit services, usage of WideColumn is not the only specific trait of Monitoring TimeSerie resource.
When a client submits a TimeSerie object, the point values match those declared in the metric descriptor. For example, if we have something like:
- name: ...
type: ...
displayName: ...
metricKind: GAUGE
valueType: INT64
# Other fields...
Then, given TimeSerie will have points from single writer (let’s assume that it sends one point per 30 seconds):
points:
- interval:
endTime: 12:04:24
value:
int64Value:
123
- interval:
endTime: 12:04:54
value:
int64Value:
98
- interval:
endTime: 12:05:24
value:
int64Value:
121
- interval:
endTime: 12:05:54
value:
int64Value:
103
- interval:
endTime: 12:06:24
value:
int64Value:
105
- interval:
endTime: 12:06:54
value:
int64Value:
106
However, unlike logs, querying will not return the same data points,
in fact, it is likely not possible at all, unless we enable raw storage
(unaligned). Request QueryTimeSeries
typically requires an aggregation
field provides, with alignmentPeriod
ranging from one minute to one day,
and perSeriesAligner equal to some supported value, like ALIGN_SUMMARY
,
ALIGN_MEAN
etc. For example, if we cuttle monitoring service like:
cuttle monitoring query time-serie \
--project '...' \
--filter '...' \
--interval '...' \
--aggregation '{"alignmentPeriod":"60s","perSeriesAligner":"ALIGN_SUMMARY"}' \
-o json | jq .
Then, for these points, we should expected output like:
points:
- interval:
endTime: 12:05:00
value:
distributionValue:
count: 2
mean: 110.5
sumOfSquaredDeviation: 312.5
range:
min: 98
max: 123
bucketOptions:
dynamicBuckets:
compression: 100.0
means: [98, 123]
bucketCounts: [1, 1]
- interval:
endTime: 12:06:00
value:
distributionValue:
count: 2
mean: 112
sumOfSquaredDeviation: 162
range:
min: 103
max: 121
bucketOptions:
dynamicBuckets:
compression: 100.0
means: [103, 121]
bucketCounts: [1, 1]
- interval:
endTime: 12:07:00
value:
distributionValue:
count: 2
mean: 105.5
sumOfSquaredDeviation: 0.5
range:
min: 105
max: 106
bucketOptions:
dynamicBuckets:
compression: 100.0
means: [105, 106]
bucketCounts: [1, 1]
Note that:
- All points across one-minute intervals are merged into distributions.
- Point at 12:07:00 contains all data points from 12:06:00.001 till 12:07:00.000.
- Distribution type is much more descriptive than other types.
For example, if we queried from
ALIGN_MEAN
, then we would get doubleValue instead of distributionValue, with mean values only. - We can specify more APs: three minutes, five minutes, 15 minutes, …, 1 day. Each larger value is going to have more data points merged.
If you check file monitoring/ts_store/v4/store_writing.go
, you should
note that:
-
Each AlignmentPeriod has its own Column Family:
ap := dp.GetAggregation().GetAlignmentPeriod().AsDuration() cf := tss.columnFamiliesByAp[ap]
-
We typically don’t store raw data points (AP = 0):
if ap == 0 && !md.GetStorageConfig().GetStoreRawPoints() { continue }
-
Now, when we query (
monitoring/ts_store/v4/store_querying.go
), we query for specific column family:colFamily := tss.columnFamiliesByAp[ q.aggregation.GetAlignmentPeriod().AsDuration(), ] if colFamily == "" { return nil, status.Errorf( codes.Unimplemented, "unsupported alignment period %s", q.aggregation.GetAlignmentPeriod(), ) }
-
When we query, we are changing per series aligner from query to other type in storage (function
createWCQuery
).
To summarize, the data that we query, and the data that the client submits are not the same, and this document describes what is going on in Monitoring service.
4.1 - Monitoring Pipeline Data Transformation
Monitoring TimeSerie’s specialty is that all data is numerical, and
we should be able to query various stats like percentiles, means,
counts, and rates. Monitoring allows to merging of data points using
the aggregation
param:
- Within a single TimeSerie object, by specifying alignmnentPeriod and perSeriesAligner.
- Across multiple TimeSerie objects, by specifying crossSeriesReducer with groupBy fields
Merging TimeSerie in either way is a costly operation, for this reason, Monitoring is merging in the background, then saving merged values. As of the moment of this writing, the following apply:
- Raw TimeSerie points as submitted by clients are not being saved at all in the time series storage. There is a setting in MetricDescriptor which causes data to be saved, but nobody is practically using it.
- Raw TimeSerie points are accumulated and Monitoring computes merged points for all supported alignment periods, ranging from one minute to one day.
- Monitoring saves aligned TimeSerie points in the background, each column family gets different data for different APs.
- When Monitoring merges TimeSerie points within TimeSerie, it
forms Distribution values. This is because things like mean value,
standard deviation, count, min, max, etc., all can be derived from
distribution. It makes no sense to have separate saves per various
ALIGN_*
types, we can storeALIGN_SUMMARY
. This is why when we query for data, we switch to a different type. There is some exception though, but we will describe it later in this document. - Distribution has many various supported types: Explicit, Dynamic, Linear, and Exponential. The type used depends on the original type indicated by MetricDescriptor.
- As of now, monitoring does not merge TimeSeries across their keys
(pre-aggregation). Unlike merging within TimeSerie, where we know
all alignment periods, it is a bit more difficult, tasks for
pre-aggregating data are still not finished, but on the roadmap.
This is why, if you study file
store_querying.go
for TimeSeries, you will see we do it entirely in RAM (we pass reducing mask, then we use reducer module frommonitoring/ts_reducer/v4
).
When raw points are sent by the client, the following happens:
- Server “saves” TimeSerie in the storage, but storage will quietly
ignore all the raw points - it will still allocate
uid.Key
based on all the metric/resource types and labels forminguid.SKey
, but will not save points. - Server will publish TimeSeries to a special TimeSerie stream queue, currently, we support Kafka or PubSub (GCP only). This queue is the “raw metrics” one.
- We have a special monitoring streaming pipeline, that reads raw TimeSeries from the raw metrics queue and makes per-series accumulations. This is a 24/7 background job. It produces accumulated values and sends them back to the stream queue (on a different topic, server rollups).
- Monitoring has a special module, rollup connector, that reads accumulated points from the server rollups queue and writes to TimeSeries storage. This module currently resides in Monitoring DbController.
Now, regarding code pointers:
- See file
monitoring/server/v4/time_serie/time_serie_service.go
. After we “save” the time series, we are publishing it with functionpublishRawTimeSeries
. - See file
monitoring/ts_streaming/v4/publisher.go
, it is what the server uses when publishing raw time series. In the config for the API server, you can see:monitoring/config/apiserver.proto
, and optionraw_metrics_publisher
. - The streaming job is completely separate, it is even written in a different language, Java. Currently, we use Flink or Google Dataflow (Apache beam).
- See the
monitoring/cmd/monitoringdbcontroller/main.go
file. Apart from standard DbController modules (syncing and constraint controllers, we have extra components). SeerunV4Connectors
function, which is currently leading. There, we create a rollup subscription (variablerollupSubscription
), which is where the Monitoring streaming job writes to! Then we createrealTimeConnector
, which continuously reads from the stream and writes to time series storage. The real-time connector is the crucial component here, filemonitoring/rollup_connector/connectors/v2/realtime_connector.go
. - Subscriber interface:
monitoring/ts_streaming/v4/subscriber.go
.
Here is the simplified pipeline flow:

Apart from ordered steps, there is a monitoring controller, which
generates Phantom TimeSeries points (once per minute). This is also
a kind of raw metric, so the monitoring streaming job combines those
two data sources when generating the final time series. You can see
the code here: monitoring/controller/v4/pts_publisher/pts_publisher.go
.
It generates once-per-minute phantom metrics (OnGeneratePhantomMetrics
),
which on separate goroutines are published (function Run
).
Depending on whether the SPEKTRA Edge cluster runs on GCP or somewhere else, we use different technologies and the architecture is even a bit different:
- On GCP, we use PubSub for all queues, otherwise, we use Kafka.
- In PubSub, we don’t have still message ordering, therefore, messages can be delivered to the streaming job or rollup connector out of the original order. Kafka is ordering messages per partition, but we utilize this feature only for raw metrics queue, we are using TimeSerie Key to compute the partition where data will be sent. This way we guarantee that data for a single Key will always be ordered. This guarantee is reinforced by the fact that the Monitoring API Server requires all data to arrive timestamp ordered, this is part of the TimeSeries spec!
- For PubSub, we have two topics for raw metrics, one topic is for regular raw time series, other topic is for phantom used by the monitoring controller. For Kafka, we write all metrics to one Kafka topic (simpler design).
- In GCP, we use Google Dataflow, and the job is written in
the Apache beam framework. Code location in SPEKTRA Edge repository:
src/main/java/com/ntt/monitoring/beam/pipelines/MonitoringRealtimePipeliene.java
. - In non-GCP, we use Apache Flink, code location in SPEKTRA Edge repo:
monitoring/streaming_jobs/flink-realtime/src/main/java/com/monitoring/pipelines/RealtimePipeline.java
. - When the Streaming job publishes aligned TimeSeries to rollups topic (Kafka or pub-sub), we are accumulating data in batches, and send randomly to some partition.
- In PubSub, messages are acknowledged individually, in Kafka we acknowledge when all previous messages are acknowledged. This is because in Kafka we have data ordering.
Both Kafka and PubSub queues work on at least once principle: We guarantee data will be sent at least once, but if there is some failure, it will be retried. If there was a failure when confirming successful delivery, then the message will be delivered twice in fact. The streaming job has no issues here: If it detects the same TimeSerie with the same Key and same timestamp, then it will discard the duplicate. The Rollup connector just saves a snapshot of data to TimeSeries storage. If it writes twice, it will simply overwrite one value with the same value. The minimal requirement for whole monitoring is AT LEAST ONCE principle. This is why RollupConnector only acknowledges messages after successful processing! Otherwise, we would risk losing some data.
Monitoring jobs (Flink or Dataflow) also operate on at least one principle. They have deduplication mechanisms encoded, so we don’t need to worry about raw metrics publishers’ side. They are stateful jobs: Both Dataflow and Flink use persistent storage to accumulate all states (per each TimeSerie Key). They are “acking” messages from raw metrics topics only after processing them (when they incorporate data points into their state). This is happening from the “end” of the job (around rollups topic) to the front (raw metrics topics): when the streaming job successfully writes data to the rollups topic, then it sends backward a signal about progress, which propagates through all the stages. Streaming job “acks” raw metrics topic last! During this whole process state data. In case of a crash, it will reprocess all data from the last recovery point, we are guaranteed to not lose data thanks to synchronized syncing. We may get duplicated points though in the rollup connector, but as said, we are prepared for that.
In the streaming job, we are calculating the first points with an alignment period of 1 minute:
- We accumulate points for specified Key (phantom and real points)
- If we got phantom points only, we will use phantom data.
- If we got real, we will discard phantom points if present.
- We try to output point with 1 minute AP soon after minute passes if possible. For example, if we hit 12:06:00, then we will output the point with timestamp 12:06:00 as soon as possible. However, we need to wait a bit more time, there is a chance we will receive let’s say (example) data point with timestamp 12:05:59 at 12:06:07.
- Because of possible late arrivals, output for HH:MM:00 we will send in range HH:MM:20-HH:MM:40.
- If we get points AFTER we output the aligned data point, the streaming job sends a new corrected value!
Further data points are accumulated further:
- Using one minute of accumulated points, we can combine: three minute and five minute points
- Using five minute accumulated points, we can generate 15 minute point
- Using 15 minute accumulated points, we can generate 30 minute point
- Using 30 minute accumulated points, we can generate one hour point
- Using one hour of accumulated points, we can generate a three hour point
- Using three hour accumulated points, we can generate six hour point
- Using six hour accumulated points, we can generate 12 hour point
- Using 12 hour accumulated points, we can generate one day point
- In case of correction (late arrivals), we will trigger a re-computation of all “downstream” points.
Streaming job publishes usually Distribution points because those are the most generic and can provide other types. Basically we have the following mappings:
- When MetricDescriptor has
{valueType: INT64/DOUBLE, metricKind: GAUGE/DELTA}
, then output points useDISTRIBUTION
output values and distribution type isDynamic
, with compression = 100.0, PerSeriesAligner,ALIGN_SUMMARY
. - When MetricDescriptor has
{valueType: DISTRIBUTION, metricKind: GAUGE/DELTA}
, then output points useDistribution
output values, and distribution type is the same as described in MetricDescriptor, fielddistribution_bucket_options
, PerSeriesAligner,ALIGN_SUMMARY
. - When MetricDescriptor has
{valueType: INT64/DOUBLE, metricKind: CUMULATIVE}
, then we produce actually three output points per each key and timestamp:- Distribution Dynamic type, with compression = 100.0,
PerSeriesAligner,
ALIGN_SUMMARY
. - Double type, PerSeriesAligner,
ALIGN_RATE
- Distribution Dynamic type, with compression = 100.0,
PerSeriesAligner,
- Integer/Double type (depending on original valueType),
PerSeriesAligner,
ALIGN_DELTA
- MetricDescriptor cannot specify CUMULATIVE kind with DISTRIBUTION value type.
- We do not support BOOL value types (but we will, it is similar to INT64). We do not and will not support MONEY or STRING. Use double/int64 for money, string types will match the logging service more.
All raw input points are mapped to distributions, with exception
for CUMULATIVE metrics, where point values are increasing like 10,
14, 16, 17, 19, 22, etc. We still form them into a distribution
of dynamic type (means will have values from 10 to 22), however,
CUMULATIVE metrics are most used for ALIGN_DELTA
or ALIGN_RATE
types. Unfortunately, those two data points cannot be derived
from the Distribution value. Example (integer points):
- For 12:00:00, we got data point with a value of 30
- For 12:01:00, we got data point with a value of 33
- For 12:02:00, we got data point with a value of 36
- For 12:03:00, we got data point with a value of 40
Distribution of Dynamic type, with perSeriesAligner ALIGN_SUMMARY
for 12:01:00 (AP = 1 minute) will contain value 33 (mean = 33, count = 1
and so on). Integer with perSeriesAligner ALIGN_DELTA
(AP = 1 minute)
for 12:01:00 will contain value three because 33 - 30 from the previous
point is 3. Double with perSeriesAligner ALIGN_RATE
(AP = 1 minute)
will contain 3.0 / 60.0 = 0.05 (seconds).
For AP = three minutes, timestamp 12:03:00, we will produce in streaming job:
- Distribution (Dynamic, count = 3, mean = (33+36+40) / 3.0, etc.),
perSeriesAligner:
ALIGN_SUMMARY
. Note it does not contain a value from the previous point (30). - Int64 with value 10 (because 40 - 30 is 10), perSeriesAligner:
ALIGN_DELTA
- Double with value 10.0 / 180.0 (delta divided by time range),
perSeriesAligner:
ALIGN_RATE
No matter how much you try, you won’t be able to get RATE/DELTA from
Distribution, because we need value from the previous point. This is
why we have special processing for CUMULATIVE metric kinds, and we
produce extra values! This is also why, in file
monitoring/ts_store/v4/store_querying.go
we have code like:
case common.Aggregation_ALIGN_RATE, common.Aggregation_ALIGN_DELTA:
if orGroup.wrappedMd.md.MetricKind == rmd.MetricDescriptor_DELTA {
storeAligner = common.Aggregation_ALIGN_SUMMARY
}
Those two align types are valid for CUMULATIVE and DELTA, but we switch to SUMMARY only for DELTA type.
4.2 - Monitoring Pipeline Streaming Jobs
Flink and Dataflow
Let’s talke about the streaming jobs, one is implemented by Apache Flink and the other by Google Dataflow.
They are significantly different in structure, in Dataflow, we have multiple stages, with each AP having a different one. We also need to handle messages arriving out of order, so we keep at least data for the last hour. We also add random delays for points with AP larger than one minute. This increases the chances of correction risks (we may output a five minutes rollup with four minute data points only because the random delay for five minutes was lower than one minute). We also needed to spread outputs to make smooth CPU usage. This prevents autoscaler from going crazy. In Flink, we have an entirely different internal architecture: We know Kafka sends ordered data only. We have support for more data types, like maps. Autoscaler also can average CPU usage, so we don’t need various random delays across keys. This allows for a much more efficient Flink job:
- We know we can output data points when we get raw points from the next window (for example, if we get data point with timestamp 12:00:45, we know we can output points for timestamp 12:00:00 and APs one minute, three minutes, five minutes, 15 minutes, 30 minutes, one hour, three hours, six hours, 12 hours). Data is ordered, so we won’t get past data.
- Less corrections, and smaller latencies for output points.
- We keep a much smaller state because per Key we remember the last data point only.
In the future, I plan to drop Dataflow entirely, and modify PubSub (use ordered PubSub Lite, add order to regular PubSub, or move to Kafka). This should make streaming jobs much more efficient.
Scalability
Monitoring API Servers and Monitoring Controllers are horizontally scalable (and recommended vertically too), so new pods are deployed, and the generation of phantom metrics, or submission of real-time raw metrics can be divided. PubSub and Kafka can work with multiple publishers for the same topic.
Both Dataflow and Flink are horizontally scalable (automatic). Flink can be diagonally scalable too, if we add a bigger nodes and larger parallelism per job.
Monitoring Db Controller can scale vertically (horizontally automatically). When the new pod is added, then rollup TimeSeries are split across all subscribers.
Kafka can horizontally scale up to partition count.
Job Recovery
In case of some disaster in the streaming job, data loss for the last days, or corruption resulting from a bug, we have additional components for recovery. How it works: All raw data (phantom and real raw metrics) is not only going to streaming job. It is also copied to special recovery storage with data retention configured for the last full seven days at least. If there is a need to “replay” historical streaming data, we can run an additional Flink/Dataflow BATCH job (not streaming), that will get data from recovery storage (instead of raw metrics topic) and write to separate rollups recovery topic.
See the recovery storage module in
monitoring/ts_recovery_store/v4/provider/provider.go
. This module
provides a recovery store, that subscribes also to raw metrics topics
and writes to recovery storage. We support two backends now, Azure Blob
Storage and Google Cloud Storage (depending on the backend, Azure or GCP).
We will need to add S3 storage for AWS or On-premise option.
Recovery storage is subscriber + writer. It is right now part of
the monitoring controller (See main.go
for monitoring controller,
it creates and runs recovery store writer as separate goroutine). It
may move to db-controller though (I consider this may be a better place).
It works 24/7, writing raw TimeSeries to external storage with some
compression.
We have a separate rollup topic for recovered TimeSeries too, see
main.go
of monitoring the db controller again. We are creating
an additional recovery connector apart from the real-time connector!
Implementation is in monitoring/connectors/v2/recovery_connector.go
.
Normally, this separate rollup recovery topic is silent, no message is being published, but the rollup connector is always listening anyway (it is assumed as cheap enough).
Dataflow/Flink jobs however are not running normally, we need to
schedule them manually, for example with script in edgelq repo,
scripts/run-monitoring-gcp-recovery-pipeline.sh
. We need to specify
the time range from which data will be loaded, the recovery job is
a batch type. It will run, compute all the data, and submit to
a separate recovery rollup topic. Once the job finishes, it is
deleted, this is the nature of recovery jobs here. As of now,
we have Apache Beam implementation only for these jobs: file
src/main/java/com/ntt/monitoring/beam/pipelines/MonitoringRecoveryPipeline.java
.
In this recovery pipeline, remember to set the correct SHARDS_COUNT
and BLOB_DURATION
. Those should be synchronized with settings in:
cuttle monitoring list recovery-store-sharding-infos
This also shows what periods are possible to recover. Value
BLOB_DURATION
must be taken from spec.tsBlobPeriod
(PT1H is 3600 seconds), and value SHARDS_COUNT
must be equal to
spec.shardsCount
.
This script in SPEKTRA Edge is exclusively for GCP, and will eventually
be obsolete. For Flink, we have instructions in edgelq-deployment
repository, file README-Azure-Devops.md, and find the Monitoring recovery job
part.