1 - SPEKTRA Edge IAM Service Design

Understanding the SPEKTRA Edge IAM service design.

Service iam.edgelq.com plays one of the central parts of the SPEKTRA Edge platform, in charge of authentication and authorization. It enables multi-tenant and multi-service environments. By default, Goten does not come with authentication and authorization feature, only IAM fills this hole and allows services to work with each other without trust.

IAM Concepts, Actors, Role management, and Tenant management should be already known from the user guide, customizations, and basic usage of Authenticator, Authorizer, and Authorization middleware, from the developer guide. This document dives more into details about what is happening in the unique components provided by the IAM service. It is assumed reader can understand the general and standard structure of the IAM codebase at this point, located in the directory iam in the SPEKTRA Edge repository.

We will focus here not so much on IAM service but in big part on what IAM provides. Authenticator and Authorizer are modules provided by IAM but are linked in during each server compilation. Therefore, each API server of any backend service has these in its runtime.

1.1 - SPEKTRA Edge IAM Principals

Understanding the SPEKTRA Edge IAM principals.

In IAM, we identify two types of principals that can be uniquely identified and authenticated:

  • Users
  • ServiceAccounts

They have very different authentication methods. First, IAM does not manage users that much. The third party is responsible for actual Users' management. When a user sends a request to a service, it provides an authorization token with some claims. API servers must use the jwks endpoint, which provides a json web key set, to verify the signature of the access token. Verification ensures we can trust the claims stored in the token. Inside claims, we have more details like User unique identifier, which we can use to extract User resource from IAM.

As of now, we use Auth0 3rd party service for users. It is creating and signing access tokens that SPEKTRA Edge receives. They are giving us a jwks endpoint from which we get public keys for verification. Token signing, rotation, and user list management are all handled by Auth0, although IAM has several methods where that connect to Auth0 for management purposes.

When a user joins the system, it is not an IAM that is notified first. The request goes to Auth0 where data is created. Record in IAM is created later on when the user starts interacting with SPEKTRA Edge. User resources may get created in IAM during the first Authentication. It may also be saved/updated when it gets RefreshUserFromIdToken.

On the other side, ServiceAccounts are typically managed by SPEKTRA Edge, or by any other entity that creates ServiceAccounts in IAM service. How it is done:

  • ServiceAccount is created by some clients. Without ServiceAccountKey though, it is not usable.

  • ServiceAccountKey is created by a client. Regarding public-private key, there are 2 options:

    • Client generates both private and public key pair. It sends CreateServiceAccountKey with a public key only, so IAM never sees the private key. The client is fully responsible for securing it. It is a recommended procedure.

    • It is possible to ask IAM to create a public-private key pair during ServiceAccountKey creation. In this case, IAM saves only the public key in the database, private key is returned in response. In this case, still client is still fully responsible for securing it. This method allows to skip generation only.

In summary, ServiceAccounts are the responsibility of the clients, who need to secure private keys. Those private keys are then later used to create access tokens. During authentication, the backend service will grab the public key from IAM.

1.2 - SPEKTRA Edge IAM Authentication

Understanding the SPEKTRA Edge IAM authentication.

The module for authentication is in the SPEKTRA Edge repository, file iam/auth/authenticator.go. It also provides an Authentication function (grpc_auth.AuthFunc) that is passed to the grpc server. If you see any main.go of API Server runtime, you should find code like:

grpcserver.NewGrpcServer(
  authenticator.AuthFunc(),
  commonCfg.GetGrpcServer(),
  log,
)

This function is used by grpc interceptors, so authentication is done outside any server middleware. As a result, the context object associated with the request will contain the AuthToken object, as defined in iam/auth/types/auth_token.go, before any server processes the call.

Authenticator Tasks

Authenticator uses HTTP headers to find out who is making a call. The primary header in use is authorization. It should contain the Bearer <AccessToken> value, and we want this token part.

For now, ignore x-goten-original-auth and additional access tokens, which will be described in distributed authorization process section.

Usually, we expect just a single token in authorization, based on which authentication happens.

Authenticator delegates access token identification to a module called AuthInfoProvider. It returns the Principal object, as defined in iam/auth/types/principal.go. Under the hood, it can be ServiceAccount, User, or Anonymous.

We will dive into it in the AuthInfoProvider for authentication section below, but for now let’s touch another important topic regarding authentication.

When AuthInfoProvider returns principal data, Authenticator also needs to validate all claims are as expected, see addKeySpecificClaims. Of these claims, the most important part is the audience. We need to protect against cases, where someone gets an access token and tries to pose as a given user in some other service. For this reason, look at the expected audience for User:

claims.Audience = []string{a.cfg.AccessTokenAudience}

This AccessTokenAudience is equal to the audience specific to SPEKTRA Edge: https://apis.edgelq.com for example, configured for some of our services. If this expected audience does not match what is actually in the claims, it may be because someone got this token and uses this in different services. It’s like us: we have access tokens from users, so if we knew where else the user has an account, we could try to log in somewhere else. To prevent issues, we check the audience. However, note that audience is one global value for all SPEKTRA Edge platforms. So, one service on SPEKTRA Edge can then connect to another service in SPEKTRA Edge and use this access token, successfully posing as a user. As long as services on SPEKTRA Edge can trust each other, it is not an issue. For untrusted third party services, it may be a problem: if a user sends a request to them, they potentially can take it and use it in other SPEKTRA Edge services. In the future, we may provide additional claims, but it means that the user will need to ask jwks provider for an access token for a specific SPEKTRA Edge service, and perhaps for each of them, or groups of them.

In API Server config, see Authenticator settings, field accessTokenAudience.

A bit easier situation is with ServiceAccounts, when they send a request to service, the audience contains the endpoint of the specific service they call. Therefore, if they send requests to devices, devices won’t be able to send requests to let’s say applications. The problem may be API keys, which are global for the whole SPEKTRA Edge, but it’s the user’s choice to use this method, which was insisted as less “complicated”. It should be fine if this API key is used strictly anyway.

In API Server config, see Authenticator settings, field serviceAccountIdTokenAudiencePrefixes. This is a list of prefixes from which the audience can start.

AuthInfoProvider for Authentication

AuthInfoProvider is a common module for both authenticator and authorizer. You can see it in the SPEKTRA Edge repo, file iam/auth/auth_info_provider.go. For authenticator, only one method counts: It is GetPrincipal.

Inside GetPrincipal of AuthInfoProvider we still don’t get the full principal. The reason is, that getting principal is a tricky thing: if AuthInfoProvider is running on the IAM Server, then it may use a local database. If it is part of a different server, then it will need to ask the IAM Server to give principal data. Since it can’t fully get principal, it does what it can:

  • First, we check the ID of the key from the authorization token.
  • If the ID is equal to the one ServiceAccountKey ID Server instance uses it means that it is requesting itself. Perhaps it is a controller trying to connect to the Server instance. If this is the case, we just return “us”. This is a helpful trick when a service is bootstrapping for the first time. API Server may not be listening on the port, or the database may have missing records.
  • Mostly, however, AuthInfoProvider is one giant cache object and this includes storage of principals. Caching principals locally, with some long-term cache, significantly lowers pressure on IAM and reduces latencies.

AuthInfoProvider uses the PrincipalProvider interface to get actual instances. There are two providers:

  • LocalPrincipalProvider in iam/auth/internal/local_principal_provider.go
  • RemotePrincipalProvider in iam/auth/internal/remote_principal_provider.go

Local providers must be used only by IAM Servers, others must use the remote option. Let’s start with the remote. If you check GetPrincipal of RemotePrincipalProvider, you can see that it just connects to the IAM service, and uses GetPrincipal method, which is defined in the API skeleton file. for ServiceAccount type, it however first needs to fetch a project resource, to figure out in which regions ServiceAccountKey is available.

It is worth mentioning, that services are not supposed to trust each other, this also means IAM does not necessarily trust services requesting access to user information, even if they have a token. Perhaps access to the authorization token should be enough for IAM to return user information, but in GetPrincipalRequest we also require information which service is asking. IAM will validate if this service is allowed to see the given principal.

You should jump into different parts of the code, see the GetPrincipal implementation in file iam/server/v1/authorization/authorization_service.go. The file name may be a bit misleading, but this service has actions used for both authentication and authorization, it may be worth moving to a different API group, and making a deprecation of the current action declaration. But it’s a side note for me to correct, generally.

Implementation of GetPrincipal will stay, so you should see what happens under the hood.

In IAM Server, GetPrincipal uses PrincipalProvider that it gets from AuthInfoProvider! Therefore, AuthInfoProvider on a different server than IAM will try to use cache - in case of a miss, it will ask the remote PrincipalProvider. RemotePrincipalProvider will send GetPrincipalRequest to IAM, which then checks LocalPrincipalProvider, so we will land in LocalPrincipalProvider anyway.

Before jumping into LocalPrincipalProvider see the rest of the GetPrincipal server implementation. Inside, we are checking User or ServiceAccount data, and iterate over the metadata.services.allowed_services slice. If it contains the service provided in GetPrincipalRequest, it means that this service is allowed to see the given principal, so we can just return it safely. This field is automatically updated when User/ServiceAccount gets access to a new service (or has access revoked). We work in this principle: If a User/ServiceAccount is a participant in a service, they must be able to see each other.

Now, you can jump into the GetPrincipal code for LocalPrincipalProvider. It has separate paths for users and service accounts, but generally, it is similar. We are getting User or ServiceAccount from the local database if possible (not all regions may have ServiceAccount). If we need to make any “Save” (users!), it has to be on the primary region, because this is where users are supposed to be saved.

Distributed Authorization Process

Imagine User X asked devices.edgelq.com to assign ServiceAccount to some Device. It sends a request to the Devices service, with an authorization token containing a User X access token. Devices will successfully authenticate and authorize the user. However, ServiceAccount belongs to IAM, therefore devices.edgelq.com will need to ask iam.edgelq.com to provide ServiceAccount. When devices.edgelq.com sends a request to iam.edgelq.com, the header authorization will not have the access token of the user. It will have an access token of ServiceAccount that is used by devices.edgelq.com. This will be always true, the authorization token must contain the token of the entity sending the current request. However, devices.edgelq.com may store original access token with x-goten-original-auth header. It is an array of tokens. In theory authorization token also may have many, but it does not work on all Ingresses.

In EnvRegistry we have Dial* methods with and without the FCtx suffix. Those with suffixes copy and paste all HTTP headers with the x- prefix. They also copy authorization into x-goten-original-auth. If the latter is already present, it will be appended. The current authorization is cleared and space for new is added.

It is up to the service to decide if they want to forward HTTP headers or not. There is some work needed from EnvRegistry though, caller should be able to customize what and how headers are passed from the current context to the next, but for current needs it is sufficient.

Authorization in this context has issues with audience claim though, when we forward authorization tokens to different service entirely, the audience may not be the one we expect.

By default, we use just Dial without FCtx. We have two known cases where it is used:

  • MultiRegion routing

    It is when requests need to be provided to other regions or split across many regions.

  • When Constraint Store sends EstablishReferences to another service

    This is because we have references in saved resources to other services. The problem here is that we assume Service may not be allowed to establish references (lack of attach checks). The user may have attach permissions though, so we send two authorization tokens.

Of the two cases above, Authorization and Audience validation work well for the first one, because we forward within service. EstablishReferences is a more difficult topic, we will need probably to ensure that the Service has always attach permissions, without relying on the user. We will need however to refactor attach permissions, so there is just one per resource type. With this, we need to fix conditions, so they can apply to attach checks. Right now they simply don’t work.

1.3 - SPEKTRA Edge IAM Authorization

Understanding the SPEKTRA Edge IAM authorization.

Authorization happens in its dedicated server middleware, see any generated one, like https://github.com/cloudwan/edgelq/blob/main/devices/server/v1/device/device_service.pb.middleware.authorization.go.

As Authorization middleware is assumed to be after multi-region routing, we can assume that IAM Service from local region holds all resources required to execute authorization locally, specifically: RoleBindings, Roles, Conditions.

Note that IAM itself does not execute any Authorization, Middleware is generated for each service. We have an Authorizer a module that is compiled with all API servers for all services. What IAM provides is a list of Roles, RoleBindings, and Conditions. Other services are allowed to get them, but evaluation happens on the proper server side.

The authorizer rarely needs to ask IAM for any data, if possible, it is I/O less. It relies on the RAM cache to store IAM resources internally. Therefore, checks are evaluated typically fast. More problematic are resource field conditions. If we have them, we will need to get current resources from database. For attach permissions, we may need to fetch them from other services.

Authorization middleware is generated per each method, but the pattern is always the same:

  • We create BulkPermissionCheck object, where we collect all permissions we want to check for this action. It is defined in the iam/auth/types/bulk_permission_check.go file.
  • Authorizer module, defined in iam/auth/authorizer.go file, checks if passed BulkPermissionCheck is all good and authenticated user is authorized for asked permissions. Some checks may be optional, like read checks for specific fields.

When we collect permissions for BulkPermissionCheck, we add:

  • Main permission for a method. Resource name (or parent) is taken from the request object, as indicated by requestPaths in the specification, or customized via proto file.
  • If we have some writing request (like Create or Update), and we are setting references to other resources, we need to add attach permission checks. Resource names are taken from referenced objects, not referencing the resource the user tries to write to.
  • Optional read/set permissions if some resource fields are restricted. For authorization object strings we pass either collection names or specific resources.

Every permission must be accompanied by some resource or collection name (parent). Refer to the IAM user specification. In this document, we map specifications to code and explain details.

Within the Authorizer module, defined in the iam/auth/authorizer.go, we are splitting all checks by main IAM scopes it recognizes: Service, Organization, Project, or System. Next, we delegate permission checks to AuthInfoProvider. It generates a list of PermissionGrantResult relevant for all PermissionCheck instances. The relationship between these two types is many-to-many. A single Grant (assigned via RoleBinding) can hold multiple permissions, and a user may have many RoleBindings, each with different Grants: More than one Grant may be giving access to the same permission.

If AuthInfoProvider notices that some PermissionCheck has unconditional PermissionGrantResult, it skips the rest. However, if there are conditions attached, there is a possibility that some will fail while others succeed. It makes a reason why we need multiple PermissionGrantResult per single PermissionCheck, if at least one is successful, then PermissionCheck passes. It works like an OR operator. Conditions in a single PermissionGrantResult must be evaluated positively.

Therefore, once AuthInfoProvider matches PermissionGrantResult instances with PermissionCheck ones, we must evaluate conditions (if any). One popular condition type we use is ResourceFieldCondition. To evaluate this kind, we fetch resources from the local database, other services, and other regions. To facilitate this check as much as possible, the authorizer iterates through all possible conditions and collects all resources it needs to fetch. It fetches in bulk, connecting to other services if necessary (attach permissions cases). For this reason, we put a reference field to the PermissionCheck object, it will contain resolved resources, so all conditions may have easy access to it in case they need it. If the service receives a PermissionDenied error when checking other services, then PermissionDenied is forwarded to the user with information that the service cannot see resources itself. It may indicate an issue with missing the metadata.services.allowed_services field.

On their own, conditions are simple, they execute fast, without any I/O work. We just check requests/resolved resources and verify whether specified conditions apply, according to IAM Role Grant conditions.

AuthInfoProvider for Authorizer

AuthInfoProvider gets only a set of checks grouped by IAM Scope (A project, an organization, a service, or a system if none of the before). As per IAM specification, the service scope inherits all RoleBindings from the project that owns the service. If we need to validate permissions in the project scope, we must also accept RoleBindings from the parent organization (if set), and full ancestry path. RoleBindings in system scope are valid in all scopes. Moreover, even the principal may have multiple member IDs (native one with email, then domain, then allAuthenticatedUsers, allUsers). This creates lots of potential RoleBindings to check. Furthermore, we should be aware that Authorizer is part of all API servers! As SPEKTRA Edge provides a framework for building 3rd party services, they can’t trust each other. Therefore, AuthInfoProvider of any service it runs on can only ask for RoleBindings that it is allowed to see (according to metadata.services.allowed_services).

IAM Controller is copying organization-level RoleBindings to child sub-organizations and projects, but we don’t copy (at least yet) RoleBindings from service project to a service. We also don’t copy system-level RoleBindings to all existing projects and organizations. It should typically stay that way, because system-level role bindings are rather internal, and should not leak to organization/project admins. The module for copying RoleBindings is in file iam/controller/v1/iam_scope/org_rbs_copier.go. It also handles changes in the parent organization field.

During authorization, AuthInfoProvider must list and fetch all RoleBindings per each memberId/IAM Scope combination. It must also only fetch role bindings relevant to the current service. We first try to get from the local cache, in case of a miss, we ask IAM. This is why in CheckPermissions we grab all possible RoleBindings. We filter out RoleBindings by subScope or role ID later on. We try to strip all unnecessary fields, to ensure AuthInfoProvider can hold (RAM-based cache!) as much data as possible. Additionally, we try to use integer identifiers for roles and permission names.

To hold RoleBindings per member ID, we may need like, two KiBs of data on average. If we cache principal, let’s say four. Using one MiB we could hold data for 256 principals. 256 MiB can hold then 65K of principals. Let’s divide by two for a safety margin. As a result, we can expect 256 MiB to hold tens of thousands of active users. This is why AuthInfoProvider caches all RoleBindings principal can have in each scope. We extract data from IAM only when the cache expires, for new principals, or when the server starts up for the first time. This is why GetAssignments (method of RoleBindings store) is looking like it looks.

When we have all RoleBindings for relevant members and relevant IAM scope, then we can iterate PermissionCheck (object + permission) against all assignments. If many assignments match the given PermissionCheck, then PermissionCheck will have multiple Results (variable).

RoleBindings (converted to RoleAssignment for slimmer RAM usage) are matched with permissions if:

  • they have owned_objects which match the object name in the PermissionCheck.
  • if the above fails, we check if the Role pointed by RoleBinding has any Grants containing permissions specified in PermissionCheck.
  • if there are any Grants, we need to check if subScope matches (if it is specified). PermissionCheck contains iam scope and sub-scope forming a full object name. It allows us to have granularity on specific resources.
  • if we find a Grant matching PermissionCheck, we store it in Results, note Grant can carry conditions, but we haven’t evaluated them yet.

Thanks to the cache, I/O work by AuthInfoProvider is practically non-existent, typically it can quickly provide list of assigned permissions with a list of conditions.

ConditionChecker for Authorizer

Each PermissionCheck can have multiple results, which can contribute to allowed Permissions. If the result item has no conditions, then we can assume permissions are granted. If it has, then all conditions must be evaluated successfully, so we iterate in the Authorizer code.

ConditionChecker is implemented in file iam/auth/internal/condition_checker.go. We have 3 condition types:

  1. checking by resource field, function checkByResourceField
  2. checking by request field, function checkByRequestField
  3. checking by CEL condition, function checkByCELCondition (will be retired though).

Resource conditions are the most popular, and for good reason, they are simple and can handle at least full CRUD, and often custom functions too. For example, suppose we want to assign certain users access to devices if the field path satisfies metadata.annotations.key = value:

  • CreateDeviceRequest will be forbidden if this field path with a given value is not specified in the resource body.
  • UpdateDeviceRequest will be forbidden if we are trying to update this field path to a different value or if the current resource stored in the database does not match.
  • DeleteDeviceRequest checks if the Device in the database matches.
  • Get/BatchGetDevice(s) are extracted from the database and the condition is checked
  • WatchDevice also is checked when the stream starts, we grab resources from the database and evaluate them.
  • ListDevices and WatchDevices have a Filter field, so we don’t need to grab anything from DB.
  • If there are custom methods, we can still get resources from DB and check if the field path is fine.

We also support attach permissions with resource field conditions, if necessary, we fetch resources from other services. Fetching is done before condition evaluations.

A smaller weakness is the need to have extra checks in the database. The object may be stored in Redis though, giving perhaps a faster answer, but still goes through the network stack. Perhaps another RAM-based cache can be used for storage, but invalidation may be a problem if we want to include List queries. For resource updates, we need to invalidate the previous and new state, and Firestore watch shows us only the new state. Mongo may be more beneficial in this case, especially if we consider the fact that it has active watches for all collections (!!!). It may work for collections especially non-frequently updated.

Checks by request are simpler and aimed at custom methods typically.

Checks by CEL condition are so far being less and less used in v1, but may still have some special use cases if yaml (protobuf) declaration is not enough. They use conditions with bodies specified in the iam.edgelq.com/Condition resource. ConditionChecker uses AuthInfoProvider to grab Conditions from IAM.

1.4 - SPEKTRA Edge IAM Cache Invalidation

Understanding the SPEKTRA Edge IAM cache invalidation.

AuthInfoProvider relies on RAM cache for low latency processing. The problem is with invalidation. To achieve a long-living cache, we need real-time invalidation straight from the database.

This is why each “store” module in AuthInfoProvider has one or more goroutines using real-time watch. When some object is updated, we may need to update/invalidate the cache. In case of prolonged broken access to IAM, it will invalidate the whole cache and retry.

Invalidation of principals is done using the WatchPrincipals method. This allows IAM to ensure that only selected (allowed) principals are seen by a service.

1.5 - SPEKTRA Edge Multi-Service Authorization

Understanding the SPEKTRA Edge multi-service authorization.

Main authorization happens when the user sends a request to a service, the authorization is located on the front. However, sometimes a service executing a request needs to send the next requests to other services. One often example is EstablishReferences call in Schema Mixin service. It is assumed that services don’t trust each other, and it shows here too. Even if let’s say device service allows UpdateDevice, then IAM needs to check on its own if UpdateDevice can update the reference to field spec.service_account (field in Device resource, pointing as ServiceAccount from IAM). We are using the fact that cross-region and cross-service references establishment require a call to EstablishReferences.

We have even special authorization for that: see file mixins/schema/server/v1/resource_shadow/resource_shadow_service_custom_auth.go. In this file, we check referenced resources and try to see if this is allowed for service-making calls, or from the user originally making the request. In the future, we may opt-out from the original user, and require that the service has access to referenced resources.

It typically should be the case, ServiceAccount pointed by Device should be owned by devices (metadata.services.owning_service). The same goes for logging or monitoring buckets. We may need proper permission attach checks for resources first, and support for resource field conditions!

Other than that, service-to-service subsequent calls are treated separately, and service verifies a service.

1.6 - SPEKTRA Edge E-mail Sender

Understanding the SPEKTRA Edge e-mail sender system.

Another 3rd party service we use apart from Auth0 is Sendgrid. You should see its config in iam/config/apiserver.proto. It is a second service for emails, Auth0 itself is used for emails too, like verification accounts. After all, Users are stored in the Auth0 service, IAM just gets copies.

However, invitations (ProjectInvitations and OrganizationInvitations) are sent using Sendgrid. See iam/invitationpublisher directory.

1.7 - SPEKTRA Edge Multi-Service Environment Safety

Understanding the SPEKTRA Edge multi-service environment safety.

IAM needs to ensure safety not only between tenants (Organizations, Projects) but also between Services. For this reason RoleBindings are also scoped per Service. There is however a problem with services we need to solve:

  • Organizations and Projects can enable services they use, and if they do, they should be able to use these Services. IAM must ensure that the organization/project admin cannot enable services if they don’t have permission to. IAM must ensure the service does not have access to organizations/projects which don’t enable a particular service.
  • If the Organization or Project enables Service, then the Service should be able to access the Project/Organization.
  • Ideally, the Service should be able to freely access all resources a Project or Organization has, as long as those resources are defined by that service. For example, service devices.edgelq.com must always be able to access any instance of devices.edgelq.com/Device. However, other services should have limited access to devices.edgelq.com/Device collection. It’s called limited access to Projects/Organizations: Project/org admins should be able to regulate which resources are accessible.
  • Some Services may be “private”.
  • If services are using/importing each other, they need some limited access to each other.

Private services are protected by attach permissions, so project/org admins can’t just enable any Service, this requires updating the list of references of Services after all.

Those things are fixed in IAM fixtures, see iam/fixtures/v1/iam_roles.yaml.

First, see this role: services/iam.edgelq.com/roles/service-user. This gives access to Service data and the ability to attach it. If the project admin is granted this role in a service scope, they can enable that service.

Then, the next thing, Service should be able to access all relevant resources projects or organizations have, without specifying the exact instance. This is why we have the IAM role services/iam.edgelq.com/roles/base-edgelq-service, which grants access to all resources across orgs/projects, as long as certain conditions are met. Note that we don’t give any create permissions, it would be wrong, because the Service could start creating resources with the proper metadata.services field, without checking if the project/org even uses the service. It is not an issue for non-creating permissions. To allow services creating project/organization scope resources, we have services/iam.edgelq.com/roles/service-to-project-access and services/iam.edgelq.com/roles/service-to-org-access roles. RoleBindings for these roles are created dynamically by the IAM controller when the Project/Organization enables some service. This code is located in iam/controller/v1/iam_scope.

We also need to regulate service-to-service access. By default, this is not allowed. However, if one service imports or uses another, we enable their access to each other. Roles for these scenarios are in iam/fixtures/v1/per_service_roles.yaml. Roles with ID importing-service-access and imported-service-access are granted to importing and imported service, but note it is not symmetrical. It does not need to be. For example, if one service imports another, then EstablishReferences is only needed in one direction. Roles with ID service-to-service-std-access are used for minimal standard access.

All those RoleBindings regulating access between services, and between services with projects/organizations, are called “Service RoleBindings”. They are dynamically created by the IAM Controller when a service is created/updated, or when an organization/project enables some service. The module responsible for these RoleBindings is in file iam/controller/v1/iam_scope/service_rbs_syncer.go:

  • makeDesiredRbsForSvc computes desired Service RoleBindings per each Service.
  • makeDesiredRbsForOrg computes desired Service RoleBindings per each Organization.
  • makeDesiredRbsForProject computes desired Service RoleBindings per each Project.

Note the convention for mixin services, each service has its copy of them, like services/<service>/permissions/resourceShadows.listMetaOwnees. This is because all services have their schema mixins. Note that RoleBindings for those “per service roles” are located on the root scope, see function makeDesiredRbsForSvc in file iam/controller/v1/iam_scope/service_rbs_syncer.go. The reason is that ResourceShadow is a “root” resource (name pattern is resourceShadows/{resourceShadow}, not something like services/{service}/resourceShadows/{resourceShadow}). Perhaps it could have been like this, but it is some continuity from v1alpha2. Also, CLI commands would become less intuitive. In order then to enable per-service access, permissions are per-service. If we create services/<service>/permissions/resourceShadows.listMetaOwnees per service, and create root scope RoleBinding containing this permission, in effect it will be granted for specific services only, not for all.

1.8 - SPEKTRA Edge IAM Principal Tracking

Understanding the SPEKTRA Edge principal tracking.

ServiceAccounts are project-scoped resources, but in theory, they can be granted roles in other projects and organizations too. Users are, in IAM terms, global resources, not necessarily bound to any organizational entity. They can however join any project or organization.

Members (Users or ServiceAccounts) are associated with projects/organizations via RoleBinding resources. Organizational role bindings are copied to downstream child projects/organizations by the IAM Controller (iam/controller/v1/iam_scope/org_rbs_copier.go).

If you visit iam/server/v1/role_binding/role_binding_service.go, you should note that, for each written/deleted RoleBinding we are managing MemberAssignment resource. See iam/proto/v1/member_assignment.proto for more details, it has described the role.

Generally, though, one instance of MemberAssignment is created per each scope/member combination. This internal resource facilitates tracking of members in organizational entities.

Members can see a list of their projects/organizations via ListMyProjects/ListMyOrganization calls. To make such calls possible, we needed to use MemberAssignment helper collection, we copy also many project/organization fields directly to MemberAssignment instances. Therefore, projects/organizations filter/orderBy/fieldMask/cursor objects can be mostly translated to MemberAssignment ones. To make it work, MemberAssignment is a regional, but globally synced resource (its copies are spread through all IAM regions, period). Regional status ensures that each region is responsible for tracking members in local organizations/projects individually. IamDbController syncs all created copies across all regions, so each region knows the full list of projects/organizations where the given member participates.

In case project/organization fields change (like title), the IAM Controller is responsible for propagating change to all MemberAssignment instances. Implementation is in file iam/controller/v1/mem_assignments_meta_updater.go.

1.9 - SPEKTRA Edge Principal Service Access

Understanding the SPEKTRA Edge principal service access.

RoleBinding is not only binding User/ServiceAccount with a project/organization. It also binds a member with a service. For example, a devices manager Role in project X would bind a given member not only with project X but also with the devices.edgelq.com service (and applications & secrets, since those are related). Each Role has a populated field metadata.services, which points to services where Role is relevant. RoleBinding also has metadata.services populated, and it contains combined services from a Role and a parent object (organization, project, or service).

When RoleBinding is created, IAM internally creates a MemberAssignment instance per each unique combination of member/scope, and this MemberAssignment will have a “scope” field pointing to the project or organization. However, there is something more to it. IAM will also create additional MemberAssignment objects where the “scope” field points to a Service! Those Service-level MemberAssignment instances are used to track in which services the given Member (User or ServiceAccount) is participating.

IAM Controller has a dedicated module (iam/controller/v1/iam_scope/service_users_updater.go), which ensures that the field metadata.services is in sync (for Users, ServiceAccounts, and ServiceAccountKeys). It does it by watching MemberAssignment changes and making “summary” of services in use. If it notices that some user/service account has inaccurate data, it will issue an update request to IAM. Each region of IAM is responsible for watching local members, but they will access all MemberAssignment instances since those are synced globally.

Making sure the field metadata.services of all Users/ServiceAccounts is synced has double functionality:

  • It ensures that a given member can access Service-related data.
  • It ensures that the given Service can access member data (via GetPrincipal).

If you check the file iam/fixtures/v1/iam_role_bindings.yaml, you should notice special RoleBinding roleBindings/services-participant. It is a root-level RoleBinding given to all authenticated members, granting services/iam.edgelq.com/roles/selected-service-user role. This role is a multi-service one. If you see its contents in iam/fixtures/v1/iam_roles.yaml, you should see it gives many read/attach permissions to a holder in a specified list of services. In the RoleBinding yaml definition, note that this list of services comes from a principal metadata object! This is how principals get automatic access to a Service.

Role services/iam.edgelq.com/roles/selected-service-user is similar to services/iam.edgelq.com/roles/selected-user. The latter one should be used on the service level to provide access to that single service, to someone who has no other access there. The former has an internal purpose, gives access to many services at once, and will only be assigned to members who already have some access to specified services. It just ensures they can access service meta-information.

2 - SPEKTRA Edge Limits Service Design

Understanding the SPEKTRA Edge resource limit design.

Limits service is a fairly one of those services with special design. Service itself should be known from the user guide already and in some part from the developer guide. Knowledge about plans is assumed. Limits also is one of the standard services, and its code structure in the SPEKTRA Edge repository (limits directory) should be familiar.

What needs explanation is how Limits ensures that “values” don’t get corrupted, lost, over-allocated, etc. First, resources are allocated in each service, but resource limits.edgelq.com/Limit belongs to a Limits service. Therefore, we can’t easily guarantee counter-integrity if the resource is created in one service and counted elsewhere. Next, we know that limit values are passed from service to organizations, then to potential child organizations, and eventually to projects. From MultiRegion design, we know that each organization and project may point to a main region where resources are kept. Therefore, we know that organizations/{organization}/acceptedPlans/{acceptedPlan} is in the organization’s region, and projects/{project}/planAssignments/{planAssignment} is in the project’s region, may be different. This document describes how these Limits work in this case.

We will also be showing code pointers, where things can be found.

During this guide, you will find out why parallel creations/deletions are not parallel!

2.1 - Service Limit Initialization

Understanding how the SPEKTRA Edge service limit initialized.

When Service boots up, it creates limits.edgelq.com/Plan instances. Limits controller, defined in limits/controller/v1, has LimitsAssigner processor, defined in limits/controller/v1/limits_assigner/limits_assigner.go. It is created per each possible assigner, therefore, it is created per Service and Organization. LimitsAssigner is typically responsible for creating AcceptedPlan instances for child entities, but, for Services, it makes an exception: It creates an AcceptedPlan for itself! See file limits/controller/v1/limits_assigner/default_plan_acceptor.go, function calculateSnapshot computes plans for child entities, and for the Service itself! This is booting things up, the Service can assign any values it likes to itself.

2.2 - Project and Organization Limit Initialization

Understanding how the project and organization limit initialized.

If the project has a parent organization, then this parent organization is an assigner for a project. If the project is root-level, then its enabled services are assigners, each service can assign an individual plan for a project. Same for organizations. When a project/organization is created, the Limits Controller puts the newly created entity in the “assigner” box (or boxes for root-level entities). Then it creates an instance(s) of AcceptedPlan. Implementation, again, is in limits/controller/v1/limits_assigner/default_plan_acceptor.go. It is worth mentioning however now, that DefaultPlanAcceptor uses LimitPools of assigner to see if it will be able to create an AcceptedPlan resource. If not, it will instead annotate the Project/Organization that failed to create a plan for. This is why in limits_assigner.go you can see Syncer for not only AcceptedPlan but also Project and Organization.

2.3 - AcceptedPlan Update Process

Understanding the AcceptedPlan update process.

Implementation can be found naturally in the server: limits/server/v1/accepted_plan/accepted_plan_service.go. We pass of course “actual” creation to the core server, but this is just a small step, the whole logic to execute before any CUD operation is much more significant here.

When the server processes the AcceptedPlan resource (Create or Update), then we are guaranteed to be in the Limits service region where the assigner resides. Because LimitsPools are a child of Service or Organization, we can guarantee that they reside on the same regional database as AcceptedPlan. Thanks to this, we can verify, within the SNAPSHOT transaction, that the caller does not attempt to create/update any AcceptedPlan that would exceed the limit pools of an assigner! This is the primary guarantee here: Assigner will not be able to exceed allocated values in its LimitPools. We need to check cases where AcceptedPlan increases reservations on Assigner LimitPools. When we decrease (some updates, deletions), then we don’t need to do that.

However, there is some risk with decreasing accepted plans (some updates and deletions). There is a risk that doing so would decrease assignee limit values below current usage. To prevent this, in the function validateAssigneeLimitsAndGetLimitPoolUpdates in server implementation, we are checking assignee limit values. This will work in 99.99% of cases unless some new resources will be allocated while we confirm that we can decrease limits. Therefore, we don’t have guarantees here.

In the result, when we create/update AcceptedPlan, we are only increasing LimitPools reservations values of Assigner. When we would decrease LimitPool values, we just don’t yet.

Decreasing values is done by the Limits controller, we have a task for this, in limits/controller/v1/limits_assigner/limit_pool_state_syncer.go. It takes into account all child Limit and LimitPool instances (for assignees), which are synchronized with PlanAssignment instances. It then sends UpdateLimitPool requests when it confirms decreased values of AcceptedPlan action (updated or deleted) took an effect. Reservation is immediate, release is asynchronous and delayed.

Some cheating however is potentially possible, if the org admin sends UpdateLimitPool trying to minimize the “Reserved” field, after which it can attempt to create a new accepted plan quickly enough before the controller fixes values again. Securing this may be a bit more tricky, but such an update would leave LimitPool with a Reserved value way above the configured size, which will be detectable, along with ActivityLogs, and if not, ResourceChangeLogs. It is unlikely it will be tried this way. A potential way to secure this would be to disable AcceptedPlan updates if the Reserved value of LimitPool decreased recently, with some timeout like 30 seconds. Optionally, we can just put some custom code in the API Server for UpdateLimitPool, and validate only straight service admin updates them (check principal from context). This is not covered by IAM Authorization code-gen middleware, but custom code can simply do.

2.4 - Resource Limit Assignment

Understanding the resource limit assignment.

It is assumed that organization admins can see and manage AcceptedPlan instances, but their tenants can only see them. Furthermore, parent and child organization and other organization/final projects are separate IAM scopes. Child entities also may reside in different primary regions than their parent organization (or service). For these reasons, we have resource type PlanAssignment, which is even read-only, see its proto definition. This allows admins to see the plan assigned for them, but without any modifications, even if they are owners of their scope. Because PlanAssignment is located in a region pointed by the project/organization, we can guarantee synchronization with LimitPool/Limit resources!

When AcceptedPlan is made, the Limits Controller is responsible for creating PlanAssignment asynchronously, which may be in a different region than source AcceptedPlan. The code for it is in limits/controller/v1/limits_assigner/assigned_plans_copier.go. It creates an instance of PlanAssignment and sends a request to API Server. The server implementation is in, naturally, file limits/server/v1/plan_assignment/plan_assignment_service.go. Note that the controller is setting output-only fields, but it is fine, when the server creates an instance, it will have these fields too. This only ensures that, if there is any mismatch in the controller, it will be forced to make another update.

When processing writes to PlanAssignment, the API Server grabs AcceptedPlan from the database, we require the child organization or project to be in a subset of regions available in parents. Therefore, we know at least a synced read-only copy of AcceptedPlan will be in the database. This is where we grab the desired configuration from.

PlanAssignment is synchronized with Limit and LimitPool instances, all of these belong to the same assignee, so we know our database owns these resources. Therefore, we can provide some guarantees based on SNAPSHOT: Configured limit values in Limit/LimitPool resources are guaranteed to match those in PlanAssignment, users don’t get any chance to make any mistake, and the system is not going to be out of sync here.

Note that we are only changing the configured limit, we have also so-called active limits. This is maintained by the controller. There is some chance configured limit is being set below current usage, if this happens, the active limit will stay on a higher value, as large as usage. This will affect the source limit pool reserved value, it will stay elevated! It is assumed however that PlanAssignment and configured limits must stay in sync with AcceptedPlan values, no matter if we are currently allocating/deallocating resources on the final API Server side.

Note that the limits controller tracks the active size and reserved value for LimitPool instances. Limits are on the next level.

2.5 - Resource Limit Tracking

Understanding how the resource limit is tracked.

We need to provide a guarantee that the usage tracker stays in sync with the actual resource counter. The best way to do that is to count during local transactions. However, resource Limit belongs to the Limits service, not the actual servicee. This is why we have Limits Mixin Service in the SPEKTRA Edge repository, mixins/limits.

It injects one resource type: LocalLimitTracker. Note it is a regional resource, but not a child of a Project. This means that no project admin will be able to see this resource ever, or any parent organization. This resource type is hidden, only service admins will be able to see it. This prevents any chance of final user mismanagement as well. Because this resource type is mixed along with final service resources, we can achieve SNAPSHOT transactions between actual resources and trackers. We can even prevent bugs that could result in the usage tracker having invalid values. When we create/update the LocalLimitTracker resource, we can extract the true counter from the local database, see file mixins/limits/server/v1/local_limit_tracker/local_limit_tracker_service.go.

To check how LocalLimitTracker usage is tracked during transactions, check two files:

  1. mixins/limits/resource_allocator/v1/resource_allocator.go
  2. common/store_plugins/resource_allocator.go

This is how the store plugin tracks creations/deletions, at the end of the transaction, it tries to push extra updates, LocalLimitTracker instances for all resource types where several instances changed. This guarantees complete synchronization with the database. But note this does not create LocalLimitTrackers yet.

This is why Limits Mixin comes with not only an API Server (so LocalLimitTrackers can be accessed) but also a controller, see mixins/limits/controller/v1 directory. Inside Limits processor we have:

  • LocalLimitTrackersManager instance, which Creates/Updates/Deletes instances of LocalLimitTracker for every Limit instance in Limits service.
  • Synchronizes Limit instances in limits service using LocalLimitTrackers from its region. It means that there is no actual point in meddling with Limit fields, the controller will fix them anyway, and they don’t participate in actual usage checking anyway.
  • Maintains also PhantomTimeSeries, so we have special store usage metrics, showing how historically resource counters were changing.

Note that the Limits processor in this controller has built-in multi-region features, primary region for project creates/deletes LocalLimitTrackers, but the final regions are maintaining Limit instances and PhantomTimeSeries.

2.6 - Project and Organization Deletion Process

Understand the project and organization deletion process.

When Project/Organization is deleted, we need to ensure that limit values will return to the assigner. This is why AcceptedPlan instances have assignee reference fields, with the ASYNC_CASCADE_DELETE option. When they are deleted, plans follow. This will delete PlanAssignments, but as it was said, LimitPools are not given reserved values yet. Instead, db-controllers should be deleting all child resources of the assignee, like Project. This will decrease Limit usage values, till we hit 0.

To prevent deletion of Limit/LimitPool instances before they reach zero values, we utilize metadata.lifecycle.block_deletion field, as below:

  • limits/server/v1/limit/limit_service.go

    Take a look at the update function, UpdateMetadataDeletionBlockFlag.

  • limits/server/v1/limit/limit_pool_service.go

    Take a look at the update function, UpdateMetadataDeletionBlockFlag.

This way LimitPool and Limit resources disappear only last. We achieve some order of deletions, so it is not chaotic. The controller for the assignee will confirm the reserved value of LimitPool is decreased only after whole resource collections are truly deleted.

3 - SPEKTRA Edge Wide-Column Store Usage

Understanding the wide-column store usage in SPEKTRA Edge.

Wide-column store provides alternative data storage compared to the document storage from Goten. It is focused on wide-column data stores, and currently, we support Google Bigtable and ScyllaDB as the backend engine. However, from the user’s perspective, it is worth mentioning the properties and features of this particular database interface:

  • this is a no-relational db with no joins or transactions. It has a simple interface: Query and Save (can overwrite). There is no “Get” equivalent and does not support streaming reading.
  • it is semi-schemaless, it mostly stores data in binary and non-queryable format. However, it allows to definition of column families and a set of filterable field keys.
  • data is stored using timestamp order. When saving, it is required to provide a timestamp per each column value. Query always requires a time range.
  • data is automatically GC-ed after the defined TTL per column family.
  • optimized for large data sets and throughput.
  • possible query operators: EQUAL, IN, NOT EQUAL, NOT IN
  • a single filter can contain many conditions connected with the AND operator.
  • query itself may have many filters - effectively supporting OR operator.

This store is fully implemented in the SPEKTRA Edge repository. The store interface and implementation can be found in common/widecolumn/store/store.go. The package for this file also contains primary user-visible objects.

We have a couple of use cases (resources!) within the SPEKTRA Edge repository where we use this storage:

  • in the Monitoring service, we use it for TimeSeries
  • in the Audit service, we use it for ActivityLogs and ResourceChangeLogs
  • in the Logging service, we use it for Logs

For each of these cases, we are wrapping wide-column store into the proper interface.

In the service specification for these resources, we are disabling metadata and standard CRUD.

3.1 - Wide-Column Data Models

Understanding the wide-column user and storage data models.

User Data Model

Input/output of widecolumn store is KeyedValues from store package, file common/widecolumn/store/keyed_values.go. It can be thought of as a resource. It is combined of:

  • Single Key object (uid package).

    It is used to uniquely identify resources.

  • Array of Column values (data entry package, object ColumnValue).

    Each column value has:

    • Column Family name (string)
    • Timestamp
    • Optional binary key
    • Data (binary data)

KeyedValues can hold multiple column values with timestamps ranging from hours, days, weeks, months, and potentially years. Therefore, the Write method only saves a range of values for a single key (normally it appends!). The query just grabs partial data only for a given range.

Column family and optional binary keys are identifiers of column values for the same timestamp (so they don’t overwrite each other).

Note that this makes “a resource called KeyedValues” a time-series-like resource. In this context, Key represents some header and column values (with timestamps) contain a series of values across an always-increasing time range.

Imagine there is a temperature sensor with some serial number. It sends the temperature every minute. In that case, we can define the following Key: serial_number=1234, product=temp_sensor and Column Family: temperatures. Then each temp can be represented as a double value and associated with a specific timestamp. Double can be encoded as binary data. We don’t need an optional binary key in this case.

Keys

A key is a unique identifier of a column value series. Key definitions and manipulation are provided by the uid package. It is a set of key-value pairs (KV type). KV without Value part is just a “KId” object, key identifier. It may be better thought of as key field identifier, because “the real” Key is an array of these objects.

Each KV pair has an equivalent of a UID object: UID allows to mapping of Keys and KeyValue pairs between string and binary representation UIDs are represented as paths in up-to depth three tree each node may be allocated using parent counter and atomic operations hierarchy:

  1. Root UID len 1; (always 1), str: _
  2. Key UID len 2; (1, kid), example str: _#serial_number
  3. Value UID len 3; (1, kid, vid), example str: _#serial_number#1234

It is worth mentioning that UID with depth 1 is the same as the empty root value. UID with length 2 is just a KId, then full size is equivalent to full KV.

It is important to say that UID is more focused on how data is internally stored in the underlying backend (bigtable or scylla).

Going back to the temperature sensor example, we can notice our Key object has two KV pairs:

  • serial_number=1234
  • product=temp_sensor

But if you look at the structure of KV you may notice that it is a pair of integer values. This is because the storage of integers is more efficient than strings. Especially if the given key value repeats many times across time and different keys. Therefore, each type in the uid package has a “String” equivalent:

  • KV has SKV
  • KId has SKId
  • UID has StrUID

Note that the main store interface, apart from Query/Write, provides also functions for:

  • Allocating string key values.
  • Resolving strings to integers back and forth.

Structure KeyedValues provides Key and SKey as fields. They are meant to be equivalent. However, it is worth to note:

  • When executing the Query method, all KeyedValues will only have a Key value set. It is assumed that SKey may not always be needed. Besides, it is more efficient to resolve all skeys once the query is completed in bulk.
  • When executing the Write method, the store will check if the Key is defined. If not, it will default to SKey and allocate/resolve at runtime. However, it is recommended to use Key whenever possible for performance reasons.

Storage Data Model

Data is stored in a bit different format than it is presented in KeyedValues. Object KeyedValues is transformed into a set of DataEntry objects from the dataentry package. DataEntry is a combination of:

  • Row object (see dataentry/row.go)
  • Array of ColumnValue objects (see dataentry/column_value.go)

It may look like Row is equivalent to Key from KeyedValues, but it i not. There is a transformation going on:

  • Key from KeyedValues is being split into two keys, promoted and tail key. This split is defined by the TableIndex object from the uid package. As you may have figured out, this is to help query data in a fast and efficient way when the filter defines a set of keys, then the store will try to pick up the index with the promoted key set most closely to the filter.

    We are indexing!

  • Column value timestamps are transformed into RowSequence objects (see dataentry/row.go file). Those column values that have the same sequence are grouped. Otherwise, for each unique sequence, a new Row is created, containing the promoted key, tail key, and sequence number. Then it gets assigned column values that have the same sequence number.

  • Note that KeyedValues are created from IndexedKeyedValues when writing, each TableIndex will create DataEntry object and those indices are full replicas!

Example: Imagine we have the following KeyedValue (single):

  • Key: serial_number=1234, product=temp_sensor
  • Values:
    • temperatures: 123.4, 2020-01-01T00:00:00Z
    • temperatures: 124.4, 2020-01-01T00:01:00Z

Then it will be transformed into the following DataEntry objects, provided that serial_number is used as an index:

  • DataEntry 1:
    • Row:
      • Promoted Key: serial_number=1234
      • Tail key: product=temp_sensor
      • Sequence: fromTimestamp(2020-01-01T00:00:00Z)
    • Values:
      • temperatures: 123.4, 2020-01-01T00:00:00Z
  • DataEntry 2:
    • Row:
      • Promoted Key: serial_number=1234
      • Tail key: product=temp_sensor
      • Sequence: fromTimestamp(2020-01-01T00:01:00Z)
    • Values:
      • temperatures: 124.4, 2020-01-01T00:01:00Z

When data is saved to the underlying DB, repeated fields from “Values” like timestamp may be dropped, as we already have them in the Row object. Promoted/Tail key understanding is important to write good indices! In this example, we assumed a single promoted index, but if we had more, we would have more replicas.

3.2 - SPEKTRA Edge Wide-Column Versions

Understanding the SPEKTRA Edge wide-column versions.

We have right now two supported storage versions: v2 and v3. Version v2 is old and has lots of influence from the Monitoring service that had widecolumn implementation internally for its private use. Later Audit and Logging services were added but v2 still specifies some fields relevant for monitoring only. It was written also having bigtable in mind, where you can query by not only row range, but column values range. This is however not efficient with ScyllaDB and likely other CQL-based DBs. Moreover, this is not needed, a query from monitoring only needs data from a specific column key, and Audit/logging always queries by whole range only. There is no plan to utilize this feature, so it is better to have it removed. On top of that, sequence numbers in v3 have nanosecond precision, while in v2 only seconds. This may allow us to drop timestamps from column values completely and invalidates the reason why we have many column values in Audit/Logging anyway.

Summary:

  • v3 has Sequence in nanosecond precision, v2 in seconds.
  • Row in v2 contains: promoted key, tail key, sequence number, empty Aggregation key, empty start timestamp, and alignment period. v3 has promoted key, tail key, and sequence number only.
  • Column Value in v2 has extra elements removed in v3: Alignment period, start time.
  • Query in v2 allows to specify column key range, v3 not. We are using this feature right now in Monitoring (Aligner is used to specify the column to pick the correct value type). However, we don’t specify a multi-value range, column range start is equal to column value range, always. Therefore, for v3, to keep the same functionality, we need to move Aligner as part of Key in KeyedValues.

TODO:

  • Monitoring, Audit, and Logging write/query to v2. We need to implement v3 implementations for these.
  • All three services will need to double-write for some time. Queries should be executed on V3 when possible, but v2 should be used as a fallback. Writes should be executed on both v2 and v3. This is to allow for a smooth transition from v2 to v3. We should use the config file to indicate when v2 should be written or from which data point v3 is valid. Specific stores (like TimeSeriesStore) should internally have two references to StoreV2 & StoreV3 of widecolumn.

3.3 - SPEKTRA Edge Wide-Column Usage Example

Understanding the SPEKTRA Edge wide-column usage through example.
package example

import (
  "context"
  "fmt"
  "time"

  wcde "github.com/cloudwan/edgelq/common/widecolumn/dataentry"
  wcstore "github.com/cloudwan/edgelq/common/widecolumn/store"
  wcuid "github.com/cloudwan/edgelq/common/widecolumn/uid"
)

func RunDemo(ctx context.Context, store wcstore.WCStoreV3) error {
  // Create reusable fields descriptor. Define two key fields:
  // serial_number (no flags - 0), no default value, required, not
  // negative filters allowed product (no flags - 0), no default value,
  // required, not negative filters allowed.
  fieldsDesc := wcuid.NewKeyDescriptor().
    AddSKId("serial_number", 0, "", true, false).
    AddSKId("product", 0, "", true, false)

  // Allocate string - int mappings for later. Now keep fieldsDesc "forever".
  // This part can be completed at the startup.
  strUIDs := fieldsDesc.GetUnresolvedStrUIDs()
  uids, err := store.ResolveOrAllocateStrUIDs(ctx, strUIDs)
  if err != nil {
    return err
  }
  fieldsDesc.ResolveSKVsInBulk(strUIDs, uids)

  // Create specific key - err is if for example key is repeated or required
  // key is not provided. Undefined key are stored as unclassified - but they
  // can be classified later providing more field descriptors.
  describedSKey, err := fieldsDesc.NewDescribedSKeyBuilder().
    Add("serial_number", "1234").
    Add("product", "temp_sensor").Build()
  if err != nil {
    return err
  }

  // Prepare indices by which data must be split into. We want to index
  // by serial_number only.
  indices := []*wcuid.TableIndex{wcuid.NewTableIndex([]wcuid.UID{
    wcuid.KIdUID(
      fieldsDesc.GetFieldDescriptorBySKey("serial_number").GetKey(),
      0,
    ),
  })}

  // Prepare data to write. We could have allocated skey into key, but
  // we dont need.
  indexedValues := make([]wcstore.IndexedKeyedValues[*wcde.ColumnValueV3], 0)
  indexedValues = append(indexedValues, wcstore.IndexedKeyedValues[*wcde.ColumnValueV3]{
    Indices: indices,
    KeyedValues: wcstore.KeyedValues[*wcde.ColumnValueV3]{
      SKey: describedSKey.FormatRawKey(),
      Values: []*wcde.ColumnValueV3{
        {
          Family: "temperatures",
          Time:   time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC),
          Data:   marshalDouble(123.4),
        },
        {
          Family: "temperatures",
          Time:   time.Date(2020, 1, 1, 0, 1, 0, 0, time.UTC),
          Data:   marshalDouble(124.4),
        },
      },
    },
  })

  // Save the data
  err = store.Write(ctx, indexedValues)
  if err != nil {
    return err
  }

  // Prepare query - we will get only one of two points saved.
  // We filter by specific serial number and all keys should be automatically
  // validated (true flag)
  filter := wcstore.NewFilterFromSKVs([]wcuid.SKV{
    wcuid.NewSKIV("serial_number", "1234"),
  }, true)
  query := &wcstore.QueryV3{
    StartTime: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC),
    EndTime:   time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC),
    OrGroups:  []*wcstore.QueryV3OrGroup{
      {
        Indices:      indices,
        Filter:       filter,
        ColumnFamily: "temperatures",
      },
    },
  }

  tempsByKey := make(map[wcuid.Key][]float64, 0)
  uniqueKeys := make([]*wcuid.Key, 0)
  err = store.Query(
    ctx,
    query,
    func(
      ctx context.Context,
      seq wcde.RowSequenceV3,
      entries []wcstore.KeyedValues[*wcde.ColumnValueV3],
      isDone bool,
    ) (bool, error) {
      for _, entry := range entries {
        temps, ok := tempsByKey[*entry.Key]
        if !ok {
          temps = make([]float64, 0)
          uniqueKeys = append(uniqueKeys, entry.Key)
        }
        for _, v := range entry.Values {
          temps = append(temps, unmarshalDouble(v.Data))
        }
        tempsByKey[*entry.Key] = temps
      }
    
      // Return true to continue, false to stop
      return true, nil
    },
  )
  if err != nil {
    return err
  }
  
  skeys, err := store.ResolveKeys(ctx, uniqueKeys)
  if err != nil {
    return err
  }
  for i, key := range uniqueKeys {
    temps := tempsByKey[*key]
    descSKey, _ := fieldsDesc.ValidateSKey(skeys[i], false)
    serialNumber := descSKey.GetSVid("serial_number")
    fmt.Sprintf(
      "Serial number %s has %d temperatures: %v",
      serialNumber,
      len(temps),
      temps,
    )
  }
  return nil
}

It is important to learn how to use indices efficiently, basically to successfully query data, a filter query must define all fields that are defined in at least one index. It is like looking for a proper data partition. Tail keys are filtered out in memory (if there are filter conditions for them), but data is still being pulled.

Note also an important quirk about widecolumn storage: We need to specify indices both when writing and when querying. Widecolumn can detect what index is most optimal, but it does not remember them. There are no functions for saving/getting indices. It is the responsibility of a user to provide indices when saving, and a list of available when querying!

It is generally “fine”, however - all our implementations use descriptors which we need to get from the local database when validating user input anyway! If we get it from the database, then it makes no sense to have extra DB lookups when reading indices. They are used when writing, querying, and also validating.

3.4 - SPEKTRA Edge Wide-Column Annotations

Understanding the SPEKTRA Edge Wide-Column Annotations.

Those four resource types in SPEKTRA Edge (TimeSerie, ActivityLog, ResourceChangeLog, Log) are still declared as Goten resources, but have several opt-outs that make them unique. See in SPEKTRA Edge repo:

  • Audit API-Skeleton: audit/proto/api-skeleton-v1.yaml, resources: ActivityLog, ResourceChangeLog
  • Logging API-Skeleton: logging/proto/api-skeleton-v1.yaml, resources: Log
  • Monitoring API-Skeleton: monitoring/proto/api-skeleton-v4.yaml, resources: TimeSerie

See opt-out settings: They don’t have a metadata field, standard CRUD is disabled, and pagination and resource change are also not present. For TimeSerie, we are also disabling the name field (more for historical reasons, to match to Google Stackdriver API in version v3). We also disabled “basicActions” for all (by specifying *), but this option is not necessary, since disabling standard CRUD ensures that no basic actions (like Create, Get, BatchGet…) are generated.

As a result, Goten will not generate anything for these resources in the access or store packages, nor server will have any basics related to them. Some standard types in the resources package will also be missing.

They will still get Descriptor instances, and they will still satisfy the gotenresource.Resource interface (as defined in the runtime/resource/resource.go file in the Goten repo), but for example EnsureMetadata will return nil value. Function SupportsMetadata in the gotenresource.Descriptor interface will return false.

This is how we are taking control of resources from Goten to us, and we can use a different storage type.

3.5 - SPEKTRA Edge Monitoring Time Series

Understanding the SPEKTRA Edge monitoring service time series.

You can see the proper TimeSeries storage interface in the file monitoring/ts_store/v4/store.go, TimeSeriesStore. We will study here QueryTimeSeries and SaveTimeSeries, which are most important for our case.

TimeSeries are the most special across all widecolumn types: when a user makes queries, the same time series can be completely different: when group by is in use (which is almost always the case!), then time series are merged forming dynamically new ones than submitted! This should be already known from the User guide though.

As per user documentation, it should be clear that we use MonitoredResourceDescriptor and MetricDescriptor instances to create final identifiers of TimeSeries (keys), and a list of promoted indices for faster queries. Before we go into explaining saving/deleting, it is worth checking the initTsData function in the monitoring/ts_store/v4/store.go file. This is where we initialize:

  • tsKeyDesc, which contains the common part of the descriptor for all time series, regardless of labels in metric or resource descriptors
  • Common index prefix, consisting of project and metric.type fields from TimeSerie.

Additional notes:

  • TimeSeries storage has no concept of regional replication, each region contains its data only.
  • This document describes only TimeSeries storage, but not the full monitoring design, which is described separately.

Overall, object TimeSerie is mapped into KeyedValues from WideColumn, but note this is a bit higher level object. Each mapping requires some level of design. This is done in the following way:

  • TimeSerie fields project, region, metric, and resource are mapped to various String key-values, forming together uid.SKey. This SKey is then mapped to some uid.Key, which is a short integer representation. Note that uid.Key is the identifier of KeyedValues.
  • TimeSerie field key is binary encoded uid.Key, which has “compressed” project, region, metric and resource fields.
  • TimeSerie field points, which is a repeated array, is converted into Widecolumn dataentry.ColumnValue, one by one.

Single Point in TimeSerie is mapped to dataentry.ColumnValue like this:

  • TypedValue, which holds an actual value, is mapped to binary data (bytes). We will need to marshal.
  • The timestamp of the key naturally maps to time in dataentry.ColumnValue. Note that AlignmentPeriod is, apart from 0 value, dividable by 60 seconds. This means that the timestamp will be some multiplication of the AP value.
  • Family in dataentry.ColumnValue will contain AlignmentPeriod (in string format). It means that values from a single AP will be stored in separate column families (tables)!
  • Interface dataentry.ColumnValue also has an optional key, in Monitoring, we will store their perSeriesAligner value from Point Aggregation. For example, this will allow us to save ALIGN_RATE (some double) and ALIGN_SUMMARY (some distribution) with the same key, and same timestamp, in the same family column, but next to each other.

In monitoring, we will not save in the database raw points (Alignment period = 0 seconds). We will store only aligned values.

Saving

The writing time series is implemented fully in the monitoring/ts_store/v4/store_writing.go file.

There is a possibility, when we save time series, that the TimeSerie object contains already a key field value, in which case we don’t need to resolve it ourselves. However, we will need to verify it is correct still! The client may choose to submit TimeSeries with binary keys to make a final request a bit smaller (they can drop project, region, metric, and resource fields).

Moreover, from TimeSerie, we can and must get metric and resource descriptors, from which we can compute finally indices. This way, we can wrap KeyedValues into IndexedKeyedValues, as required by widecolumn storage. Note that those two descriptor resources describe what fields are possible to be in TimeSerie object in general, they regulate metric.labels and resource.labels. If we map MonitoredResourceDescriptor/MetricDescriptor as widecolumn store types, they would be mapped to uid.KeyFieldsDescriptor!

In implementation, each TimeSerie we will wrap into CreateTssOperationResult, type defined in the monitoring/ts_store/v4/store.go file, see it. This will contain params of KeyedValues, along with associated descriptor resources. Then, metric and resource descriptor resources will be wrapped together with uid.KeyFieldsDescriptor types, to which they de facto map.

When we save time series, we map TimeSerie into CreateTssOperationResult already in initiateCreateOperations. Inside this function, we validate basic properties of the TimeSerie object, project, metric, and resource type fields. We use the field descriptor tsKeyDesc, which was initialized in the store constructor. At this initial stage, we don’t know the exact metric and resource descriptor types, so we just validate basic properties only! If the binary key is provided, we are initializing descKey instance, otherwise descSKey. The former one is better for performance, but not always possible. Note that at this stage we have described keys, and validated base properties, but descriptors still have work to do.

In the next stage, we grab descriptor references, see getWrappedDescriptors. It does not make any resolutions yet, the same descriptors may be used across many TimeSerie objects, so we don’t want to do more resolutions than necessary. With Goten resource descriptors wrapped with uid.KeyFieldsDescriptor, we are resolving in resolveAndPopulateDescriptors function, where we finally get field descriptors as required in uid format. This will allow us to execute final, proper validation, and compute indices for widecolumn.

Proper validation is done in functions defaultAndValidateHeaders and defaultAndValidatePoints. In the second function, we are also generating final column values used by the widecolumn storage interface! However, note some “traps” in defaultAndValidatePoints:

if ap == 0 && !md.GetStorageConfig().GetStoreRawPoints() {
    continue
}

Raw points, unaligned that clients are sending, with AP equal to 0 seconds, we are skipping saving in the database, second condition is pretty much always evaluated to true! It will be more explained in the monitoring design doc.

With data validated, and with output columns populated, we can now ensure that the output raw key in CreateTssOperationResult is present. If the binary key was not submitted when saving TimeSerie (field key), then we will need to use resolver to allocate string to integer pair. Mapping is of course saved in widecolumn storage. See the ensureOutputKeysAreAllocated function.

Next, with column values and raw keys, we need to wrap KeyedValues into indexed ones IndexedKeyedValues. This is what we finally pass to widecolumn storage. Inside keyed values are duplicated per each index and saved in underlying storage.

Querying

When we query time series we need:

  • Convert time series query params into WideColumn query object (mapping).
  • Create a batch processor, that maps KeyedValues from Widecolumn storage into TimeSerie objects.

This implementation is fully in monitoring/ts_store/v4/store_querying.go.

Widecolumn store, unlike regular Goten document one, supports OR groups, but it is more like executing multiple queries at once. Each query group represents a filter with a set of AND conditions, plus can be executed on different indices. We need to deal with this specific WideColumn interface trait, where we must specify indices when saving and querying.

When executing a query, we gather all input parameters and convert them into a tssQuery object, with tssQueryGroup as one OR group. This is not the format required by widecolumn but by some intermediary. See function createTssQuery.

We support two types of queries for TimeSeries:

  • With filter specifying one or more binary keys (WHERE key = "..." OR WHERE key IN [...]). Each key forms one “OR” group, with just a long list of AND conditions.
  • With filter specifying a set of metric/resource conditions (WHERE metric.type = "..." AND resource.type = "..." ...). However, we also support IN conditions for those types. Resource types may also be omitted optionally (but defaults are assumed then). For each combination of metric + resource type, we create one OR group.

One group query must specify exactly one metric and one resource type, because each combined pair defines its own set of promoted indices, we MUST NOT combine them! This is reflected in createTssQuery.

For each OR group, we are grabbing descriptors, using which we can finally verify if conditions in filters are defined correctly, if a group by fields specifies existing fields, and we can compute indices we know we can query.

Then, we map the query to widecolumn one. From notable elements:

  • We are passing a reducing mask for fields, it will make output uid.Key have some fields “reduced”!
  • We most of the time ignore perSeriesAligner passed by the user, and switch to ALIGN_SUMMARY. It is because when we save, we use almost exclusively ALIGN_SUMMARY. Other types almost always can be derived from summary, so there is no point in maintaining all.

Finally, we execute the query with our batch processor. Thanks to the reducing mask, the field Key for each KeyedValues has already reduced the list of key values, reduced keys are in the rest key set. This is how we implement groupBy in monitoring. Each entry in the resultEntries map field of queryResultProcessor will represent the final TimeSerie object. However, this is a very CPU and RAM-intensive task, because widecolumn storage returns values still as we saved them, all individual instances! If for a single timestamp, we have thousands of entries sharding the same key, then we will merge thousands of points to have one point, and we repeat per each timestamp. At least a wide column guarantees that, when querying, results will be returned only with increasing seq value. If we query with AP = 300s, we will get points for let’s say noon, then 12:05, and so on. When we see the sequence jump, then we know we can add the final point to the increasing list.

Still, this is a serious scaling issue if we try to merge large collections into a single reduced key.

Since perSeriesAligner is different from what we passed to widecolumn compared to what the user requested when we convert ColumnValue into TimeSerie point (function buildPointFromWCColumnValue), we need to extract proper value from the summary object.

Once TimeSerie is obtained, we resolve in bulk all integer keys into a string, and we can form output time series.

3.6 - SPEKTRA Edge ActivityLogs and ResourceChangeLogs

Understanding the SPEKTRA Edge activity and resource change logs.

The audit store for logs is implemented in the audit/logs_store/v1/store.go file. This interface also uses the WideColumn store under the hood. We are mapping to KeyedValues each ActivityLog and ResourceChangeLog.

Activity Logs

For ActivityLog, mapping is the following:

  • A lot of fields are used to create uid.Key/uid.SKey: scope, request_id, authentication, request_metadata, request_routing, authorization, service, method, resource.name, resource.difference.fields, category, labels.
  • Name field contains scope and binary key uid.Key. However, when logs are saved, this is often not present in the request (value is allocated on the server side).
  • Each event in the events array is mapped to a single dataentry.ColumnValue. Then, we have two additional special Column values: fields resource.difference.before and resource.difference.after.

ActivityLog typically has 3 events: client message, server message, and exit code. It may be much longer for streaming calls, and be pretty long.

This is how ActivityLog Event maps to ColumnValue in WideColumn:

  • The whole event is marshaled into binary data and passed to ColumnValue.
  • Family field is always equal to one static value, ALStoreColumnType. It should be noted that all ActivityLogs use one single ActivityLog column family!
  • We extract the time value from ActivityLog Event and use it as a Time in ColumnValue. Note that in V2 format WideColumn only uses second precision for times!
  • Additional column key will contain event type (client msg, server msg…), and nanoseconds part of the timestamp. This is important if ActivityLog contains streaming messages and we have more than one of a single type within a second! This is how the column key type protects against overwrites.

Pre&Post object diffs from ActivityLogs will be mapped to ColumnValue:

  • The whole object is marshaled into binary data and passed to ColumnValue.
  • Column family is just equal to the const value of ALStoreColumnType.
  • Timestamp is equal to the first event from ActivityLog
  • Column key contains the type (before OR after field values) with nanoseconds from the timestamp. This timestamp is not necessary, this is just to provide a similar format to those of events.

Resource Change Logs

ResourceChangeLogs have the following mapping to KeyedValues:

  • A lot of fields are used to create uid.Key/uid.SKey: scope, request_id, authentication, service, resource.type, resource.name, resource.pre.labels, resource.post.labels.
  • Name field contains scope and binary key uid.Key. However, when logs are saved, this is often not present in the request (value is allocated on the server side).
  • ResourceChangeLogs are a bit unique - but we marshal the whole of them to binary data, and they are forming ColumnValue types.

Each ResourceChangeLog typically has two ColumnValues because we are saving it twice: The first time, before the transaction concludes (so we have a chance to protest before allowing commitment), then after the transaction concludes.

In summary, ColumnValue is formed this way:

  • Binary data contains the whole log marshaled
  • Column family is set to the constant variable value of StoreColumnType.
  • Time is extracted from request time (first client message received).
  • Column key is also used, we have one value StoreColumnPreCommitType when the transaction.state field of ResourceChangeLog is equal to PRE_COMMITTED, otherwise, it is StoreColumnFinalizedType.

If you check the NewStore constructor in the audit/logs_store/v1/store.go file, you will notice that, unlike in monitoring store, we have quite big uid.KeyFieldsDescriptor instances for Resource change and Activity logs, and a ready set of indices, not just a common prefix.

If you analyzed monitoring time series storage querying and writing, then checking the same for Audit logs will be generally simpler. They follow the same principles with some differences:

  • In monitoring we had two descriptors per TimeSeries, in Audit we have one descriptor for activity and another for resource change logs.
  • Specific for resource change logs: We call SaveResourceChangeLogsCommitState and use internally again SaveResourceChangeLogs, which is used for saving logs PRE COMMIT state.
  • For both Activity and ResourceChange logs we don’t require descriptors, usually labels are empty sets anyway, we have already large sets of promoted indices and labels, and this is useful for bootstrap processing when descriptors may not be there yet.
  • When we query resource change logs, we don’t need to resolve any integer keys, because whole logs were saved, see onBatch function. We only need to handle separate entries for the commit state.
  • When we query logs, we will get all logs up to second precision. It means, that even if we have a super large amount of logs in a single second, we cannot split them, continuation tokens (next page tokens) must be using second precision, as required by V2 storage format.
  • Because logs are sorted by timestamp, but with second precision, we need to re-sort again anyway.

Things could be improved with the v3 SPEKTRA Edge wide-column version.

3.7 - SPEKTRA Edge Logging Store

Understanding the SPEKTRA Edge loggging store.

The logging store for logs is implemented in the logging/logs_store/v1/store.go file. This interface also uses the WideColumn store under the hood. We are mapping to KeyedValues all Log instances. Unlike in Audit, these logs can truly be very long, for one uid.Key mapping we can have logs lasting days, months, and years.

From the Log instance, we extract fields scope, region, service, version, log_descriptor, and labels, which are converted to uid.Key.

Regarding ColumnValue, the whole Log body is marshaled into a binary array, and the rest of the columns are extracted in the following way:

  • Column family is just one for all logs
  • Time is taken from the log timestamp, but we use second precision only.
  • To prevent log overrides, in the column key we store the nanosecond part of the log timestamp.

In this case, we would also benefit from the v3 version where timestamps have nanosecond precision.

Logs saving/querying is like for other described here storages, and source code should be more or less readable.

4 - SPEKTRA Edge Monitoring Pipeline Design

Understanding the SPEKTRA Edge monitoring pipeline design.

With the data layout of the wide-column data store explained in the previous wide-column store page, let’s talk about the monitoring pipeline aspect of the SPEKTRA Edge monitoring system.

Unlike in Logging or Audit services, usage of WideColumn is not the only specific trait of Monitoring TimeSerie resource.

When a client submits a TimeSerie object, the point values match those declared in the metric descriptor. For example, if we have something like:

- name: ...
  type: ...
  displayName: ...
  metricKind: GAUGE
  valueType: INT64
  # Other fields...

Then, given TimeSerie will have points from single writer (let’s assume that it sends one point per 30 seconds):

points:
- interval:
    endTime: 12:04:24
  value:
    int64Value:
      123
- interval:
    endTime: 12:04:54
  value:
    int64Value:
      98
- interval:
    endTime: 12:05:24
  value:
    int64Value:
      121
- interval:
    endTime: 12:05:54
  value:
    int64Value:
      103
- interval:
    endTime: 12:06:24
  value:
    int64Value:
      105
- interval:
    endTime: 12:06:54
  value:
    int64Value:
      106

However, unlike logs, querying will not return the same data points, in fact, it is likely not possible at all, unless we enable raw storage (unaligned). Request QueryTimeSeries typically requires an aggregation field provides, with alignmentPeriod ranging from one minute to one day, and perSeriesAligner equal to some supported value, like ALIGN_SUMMARY, ALIGN_MEAN etc. For example, if we cuttle monitoring service like:

cuttle monitoring query time-serie \
  --project '...' \
  --filter '...' \
  --interval '...' \
  --aggregation '{"alignmentPeriod":"60s","perSeriesAligner":"ALIGN_SUMMARY"}' \
  -o json | jq .

Then, for these points, we should expected output like:

points:
- interval:
    endTime: 12:05:00
  value:
    distributionValue:
      count: 2
      mean: 110.5
      sumOfSquaredDeviation: 312.5
      range:
        min: 98
        max: 123
      bucketOptions:
        dynamicBuckets:
          compression: 100.0
          means: [98, 123]
      bucketCounts: [1, 1]
- interval:
    endTime: 12:06:00
  value:
    distributionValue:
      count: 2
      mean: 112
      sumOfSquaredDeviation: 162
      range:
        min: 103
        max: 121
      bucketOptions:
        dynamicBuckets:
          compression: 100.0
          means: [103, 121]
      bucketCounts: [1, 1]
- interval:
    endTime: 12:07:00
  value:
    distributionValue:
      count: 2
      mean: 105.5
      sumOfSquaredDeviation: 0.5
      range:
        min: 105
        max: 106
      bucketOptions:
        dynamicBuckets:
          compression: 100.0
          means: [105, 106]
      bucketCounts: [1, 1]

Note that:

  • All points across one-minute intervals are merged into distributions.
  • Point at 12:07:00 contains all data points from 12:06:00.001 till 12:07:00.000.
  • Distribution type is much more descriptive than other types. For example, if we queried from ALIGN_MEAN, then we would get doubleValue instead of distributionValue, with mean values only.
  • We can specify more APs: three minutes, five minutes, 15 minutes, …, 1 day. Each larger value is going to have more data points merged.

If you check file monitoring/ts_store/v4/store_writing.go, you should note that:

  • Each AlignmentPeriod has its own Column Family:

    ap := dp.GetAggregation().GetAlignmentPeriod().AsDuration()
    cf := tss.columnFamiliesByAp[ap]
    
  • We typically don’t store raw data points (AP = 0):

    if ap == 0 && !md.GetStorageConfig().GetStoreRawPoints() {
        continue
    }
    
  • Now, when we query (monitoring/ts_store/v4/store_querying.go), we query for specific column family:

    colFamily := tss.columnFamiliesByAp[
      q.aggregation.GetAlignmentPeriod().AsDuration(),
    ]
    if colFamily == "" {
        return nil, status.Errorf(
          codes.Unimplemented,
          "unsupported alignment period %s",
          q.aggregation.GetAlignmentPeriod(),
        )
    }
    
  • When we query, we are changing per series aligner from query to other type in storage (function createWCQuery).

To summarize, the data that we query, and the data that the client submits are not the same, and this document describes what is going on in Monitoring service.

4.1 - Monitoring Pipeline Data Transformation

Understanding the SPEKTRA Edge monitoring pipeline data transformation.

Monitoring TimeSerie’s specialty is that all data is numerical, and we should be able to query various stats like percentiles, means, counts, and rates. Monitoring allows to merging of data points using the aggregation param:

  • Within a single TimeSerie object, by specifying alignmnentPeriod and perSeriesAligner.
  • Across multiple TimeSerie objects, by specifying crossSeriesReducer with groupBy fields

Merging TimeSerie in either way is a costly operation, for this reason, Monitoring is merging in the background, then saving merged values. As of the moment of this writing, the following apply:

  • Raw TimeSerie points as submitted by clients are not being saved at all in the time series storage. There is a setting in MetricDescriptor which causes data to be saved, but nobody is practically using it.
  • Raw TimeSerie points are accumulated and Monitoring computes merged points for all supported alignment periods, ranging from one minute to one day.
  • Monitoring saves aligned TimeSerie points in the background, each column family gets different data for different APs.
  • When Monitoring merges TimeSerie points within TimeSerie, it forms Distribution values. This is because things like mean value, standard deviation, count, min, max, etc., all can be derived from distribution. It makes no sense to have separate saves per various ALIGN_* types, we can store ALIGN_SUMMARY. This is why when we query for data, we switch to a different type. There is some exception though, but we will describe it later in this document.
  • Distribution has many various supported types: Explicit, Dynamic, Linear, and Exponential. The type used depends on the original type indicated by MetricDescriptor.
  • As of now, monitoring does not merge TimeSeries across their keys (pre-aggregation). Unlike merging within TimeSerie, where we know all alignment periods, it is a bit more difficult, tasks for pre-aggregating data are still not finished, but on the roadmap. This is why, if you study file store_querying.go for TimeSeries, you will see we do it entirely in RAM (we pass reducing mask, then we use reducer module from monitoring/ts_reducer/v4).

When raw points are sent by the client, the following happens:

  • Server “saves” TimeSerie in the storage, but storage will quietly ignore all the raw points - it will still allocate uid.Key based on all the metric/resource types and labels forming uid.SKey, but will not save points.
  • Server will publish TimeSeries to a special TimeSerie stream queue, currently, we support Kafka or PubSub (GCP only). This queue is the “raw metrics” one.
  • We have a special monitoring streaming pipeline, that reads raw TimeSeries from the raw metrics queue and makes per-series accumulations. This is a 24/7 background job. It produces accumulated values and sends them back to the stream queue (on a different topic, server rollups).
  • Monitoring has a special module, rollup connector, that reads accumulated points from the server rollups queue and writes to TimeSeries storage. This module currently resides in Monitoring DbController.

Now, regarding code pointers:

  • See file monitoring/server/v4/time_serie/time_serie_service.go. After we “save” the time series, we are publishing it with function publishRawTimeSeries.
  • See file monitoring/ts_streaming/v4/publisher.go, it is what the server uses when publishing raw time series. In the config for the API server, you can see: monitoring/config/apiserver.proto, and option raw_metrics_publisher.
  • The streaming job is completely separate, it is even written in a different language, Java. Currently, we use Flink or Google Dataflow (Apache beam).
  • See the monitoring/cmd/monitoringdbcontroller/main.go file. Apart from standard DbController modules (syncing and constraint controllers, we have extra components). See runV4Connectors function, which is currently leading. There, we create a rollup subscription (variable rollupSubscription), which is where the Monitoring streaming job writes to! Then we create realTimeConnector, which continuously reads from the stream and writes to time series storage. The real-time connector is the crucial component here, file monitoring/rollup_connector/connectors/v2/realtime_connector.go.
  • Subscriber interface: monitoring/ts_streaming/v4/subscriber.go.

Here is the simplified pipeline flow:

Apart from ordered steps, there is a monitoring controller, which generates Phantom TimeSeries points (once per minute). This is also a kind of raw metric, so the monitoring streaming job combines those two data sources when generating the final time series. You can see the code here: monitoring/controller/v4/pts_publisher/pts_publisher.go. It generates once-per-minute phantom metrics (OnGeneratePhantomMetrics), which on separate goroutines are published (function Run).

Depending on whether the SPEKTRA Edge cluster runs on GCP or somewhere else, we use different technologies and the architecture is even a bit different:

  • On GCP, we use PubSub for all queues, otherwise, we use Kafka.
  • In PubSub, we don’t have still message ordering, therefore, messages can be delivered to the streaming job or rollup connector out of the original order. Kafka is ordering messages per partition, but we utilize this feature only for raw metrics queue, we are using TimeSerie Key to compute the partition where data will be sent. This way we guarantee that data for a single Key will always be ordered. This guarantee is reinforced by the fact that the Monitoring API Server requires all data to arrive timestamp ordered, this is part of the TimeSeries spec!
  • For PubSub, we have two topics for raw metrics, one topic is for regular raw time series, other topic is for phantom used by the monitoring controller. For Kafka, we write all metrics to one Kafka topic (simpler design).
  • In GCP, we use Google Dataflow, and the job is written in the Apache beam framework. Code location in SPEKTRA Edge repository: src/main/java/com/ntt/monitoring/beam/pipelines/MonitoringRealtimePipeliene.java.
  • In non-GCP, we use Apache Flink, code location in SPEKTRA Edge repo: monitoring/streaming_jobs/flink-realtime/src/main/java/com/monitoring/pipelines/RealtimePipeline.java.
  • When the Streaming job publishes aligned TimeSeries to rollups topic (Kafka or pub-sub), we are accumulating data in batches, and send randomly to some partition.
  • In PubSub, messages are acknowledged individually, in Kafka we acknowledge when all previous messages are acknowledged. This is because in Kafka we have data ordering.

Both Kafka and PubSub queues work on at least once principle: We guarantee data will be sent at least once, but if there is some failure, it will be retried. If there was a failure when confirming successful delivery, then the message will be delivered twice in fact. The streaming job has no issues here: If it detects the same TimeSerie with the same Key and same timestamp, then it will discard the duplicate. The Rollup connector just saves a snapshot of data to TimeSeries storage. If it writes twice, it will simply overwrite one value with the same value. The minimal requirement for whole monitoring is AT LEAST ONCE principle. This is why RollupConnector only acknowledges messages after successful processing! Otherwise, we would risk losing some data.

Monitoring jobs (Flink or Dataflow) also operate on at least one principle. They have deduplication mechanisms encoded, so we don’t need to worry about raw metrics publishers’ side. They are stateful jobs: Both Dataflow and Flink use persistent storage to accumulate all states (per each TimeSerie Key). They are “acking” messages from raw metrics topics only after processing them (when they incorporate data points into their state). This is happening from the “end” of the job (around rollups topic) to the front (raw metrics topics): when the streaming job successfully writes data to the rollups topic, then it sends backward a signal about progress, which propagates through all the stages. Streaming job “acks” raw metrics topic last! During this whole process state data. In case of a crash, it will reprocess all data from the last recovery point, we are guaranteed to not lose data thanks to synchronized syncing. We may get duplicated points though in the rollup connector, but as said, we are prepared for that.

In the streaming job, we are calculating the first points with an alignment period of 1 minute:

  • We accumulate points for specified Key (phantom and real points)
  • If we got phantom points only, we will use phantom data.
  • If we got real, we will discard phantom points if present.
  • We try to output point with 1 minute AP soon after minute passes if possible. For example, if we hit 12:06:00, then we will output the point with timestamp 12:06:00 as soon as possible. However, we need to wait a bit more time, there is a chance we will receive let’s say (example) data point with timestamp 12:05:59 at 12:06:07.
  • Because of possible late arrivals, output for HH:MM:00 we will send in range HH:MM:20-HH:MM:40.
  • If we get points AFTER we output the aligned data point, the streaming job sends a new corrected value!

Further data points are accumulated further:

  • Using one minute of accumulated points, we can combine: three minute and five minute points
  • Using five minute accumulated points, we can generate 15 minute point
  • Using 15 minute accumulated points, we can generate 30 minute point
  • Using 30 minute accumulated points, we can generate one hour point
  • Using one hour of accumulated points, we can generate a three hour point
  • Using three hour accumulated points, we can generate six hour point
  • Using six hour accumulated points, we can generate 12 hour point
  • Using 12 hour accumulated points, we can generate one day point
  • In case of correction (late arrivals), we will trigger a re-computation of all “downstream” points.

Streaming job publishes usually Distribution points because those are the most generic and can provide other types. Basically we have the following mappings:

  • When MetricDescriptor has {valueType: INT64/DOUBLE, metricKind: GAUGE/DELTA}, then output points use DISTRIBUTION output values and distribution type is Dynamic, with compression = 100.0, PerSeriesAligner,ALIGN_SUMMARY.
  • When MetricDescriptor has {valueType: DISTRIBUTION, metricKind: GAUGE/DELTA}, then output points use Distribution output values, and distribution type is the same as described in MetricDescriptor, field distribution_bucket_options, PerSeriesAligner, ALIGN_SUMMARY.
  • When MetricDescriptor has {valueType: INT64/DOUBLE, metricKind: CUMULATIVE}, then we produce actually three output points per each key and timestamp:
    • Distribution Dynamic type, with compression = 100.0, PerSeriesAligner, ALIGN_SUMMARY.
    • Double type, PerSeriesAligner, ALIGN_RATE
  • Integer/Double type (depending on original valueType), PerSeriesAligner, ALIGN_DELTA
  • MetricDescriptor cannot specify CUMULATIVE kind with DISTRIBUTION value type.
  • We do not support BOOL value types (but we will, it is similar to INT64). We do not and will not support MONEY or STRING. Use double/int64 for money, string types will match the logging service more.

All raw input points are mapped to distributions, with exception for CUMULATIVE metrics, where point values are increasing like 10, 14, 16, 17, 19, 22, etc. We still form them into a distribution of dynamic type (means will have values from 10 to 22), however, CUMULATIVE metrics are most used for ALIGN_DELTA or ALIGN_RATE types. Unfortunately, those two data points cannot be derived from the Distribution value. Example (integer points):

  • For 12:00:00, we got data point with a value of 30
  • For 12:01:00, we got data point with a value of 33
  • For 12:02:00, we got data point with a value of 36
  • For 12:03:00, we got data point with a value of 40

Distribution of Dynamic type, with perSeriesAligner ALIGN_SUMMARY for 12:01:00 (AP = 1 minute) will contain value 33 (mean = 33, count = 1 and so on). Integer with perSeriesAligner ALIGN_DELTA (AP = 1 minute) for 12:01:00 will contain value three because 33 - 30 from the previous point is 3. Double with perSeriesAligner ALIGN_RATE (AP = 1 minute) will contain 3.0 / 60.0 = 0.05 (seconds).

For AP = three minutes, timestamp 12:03:00, we will produce in streaming job:

  • Distribution (Dynamic, count = 3, mean = (33+36+40) / 3.0, etc.), perSeriesAligner: ALIGN_SUMMARY. Note it does not contain a value from the previous point (30).
  • Int64 with value 10 (because 40 - 30 is 10), perSeriesAligner: ALIGN_DELTA
  • Double with value 10.0 / 180.0 (delta divided by time range), perSeriesAligner: ALIGN_RATE

No matter how much you try, you won’t be able to get RATE/DELTA from Distribution, because we need value from the previous point. This is why we have special processing for CUMULATIVE metric kinds, and we produce extra values! This is also why, in file monitoring/ts_store/v4/store_querying.go we have code like:

case common.Aggregation_ALIGN_RATE, common.Aggregation_ALIGN_DELTA:
    if orGroup.wrappedMd.md.MetricKind == rmd.MetricDescriptor_DELTA {
        storeAligner = common.Aggregation_ALIGN_SUMMARY
    }

Those two align types are valid for CUMULATIVE and DELTA, but we switch to SUMMARY only for DELTA type.

4.2 - Monitoring Pipeline Streaming Jobs

Understanding the SPEKTRA Edge monitoring pipeline streaming jobs.

Let’s talke about the streaming jobs, one is implemented by Apache Flink and the other by Google Dataflow.

They are significantly different in structure, in Dataflow, we have multiple stages, with each AP having a different one. We also need to handle messages arriving out of order, so we keep at least data for the last hour. We also add random delays for points with AP larger than one minute. This increases the chances of correction risks (we may output a five minutes rollup with four minute data points only because the random delay for five minutes was lower than one minute). We also needed to spread outputs to make smooth CPU usage. This prevents autoscaler from going crazy. In Flink, we have an entirely different internal architecture: We know Kafka sends ordered data only. We have support for more data types, like maps. Autoscaler also can average CPU usage, so we don’t need various random delays across keys. This allows for a much more efficient Flink job:

  • We know we can output data points when we get raw points from the next window (for example, if we get data point with timestamp 12:00:45, we know we can output points for timestamp 12:00:00 and APs one minute, three minutes, five minutes, 15 minutes, 30 minutes, one hour, three hours, six hours, 12 hours). Data is ordered, so we won’t get past data.
  • Less corrections, and smaller latencies for output points.
  • We keep a much smaller state because per Key we remember the last data point only.

In the future, I plan to drop Dataflow entirely, and modify PubSub (use ordered PubSub Lite, add order to regular PubSub, or move to Kafka). This should make streaming jobs much more efficient.

Scalability

Monitoring API Servers and Monitoring Controllers are horizontally scalable (and recommended vertically too), so new pods are deployed, and the generation of phantom metrics, or submission of real-time raw metrics can be divided. PubSub and Kafka can work with multiple publishers for the same topic.

Both Dataflow and Flink are horizontally scalable (automatic). Flink can be diagonally scalable too, if we add a bigger nodes and larger parallelism per job.

Monitoring Db Controller can scale vertically (horizontally automatically). When the new pod is added, then rollup TimeSeries are split across all subscribers.

Kafka can horizontally scale up to partition count.

Job Recovery

In case of some disaster in the streaming job, data loss for the last days, or corruption resulting from a bug, we have additional components for recovery. How it works: All raw data (phantom and real raw metrics) is not only going to streaming job. It is also copied to special recovery storage with data retention configured for the last full seven days at least. If there is a need to “replay” historical streaming data, we can run an additional Flink/Dataflow BATCH job (not streaming), that will get data from recovery storage (instead of raw metrics topic) and write to separate rollups recovery topic.

See the recovery storage module in monitoring/ts_recovery_store/v4/provider/provider.go. This module provides a recovery store, that subscribes also to raw metrics topics and writes to recovery storage. We support two backends now, Azure Blob Storage and Google Cloud Storage (depending on the backend, Azure or GCP). We will need to add S3 storage for AWS or On-premise option.

Recovery storage is subscriber + writer. It is right now part of the monitoring controller (See main.go for monitoring controller, it creates and runs recovery store writer as separate goroutine). It may move to db-controller though (I consider this may be a better place). It works 24/7, writing raw TimeSeries to external storage with some compression.

We have a separate rollup topic for recovered TimeSeries too, see main.go of monitoring the db controller again. We are creating an additional recovery connector apart from the real-time connector! Implementation is in monitoring/connectors/v2/recovery_connector.go.

Normally, this separate rollup recovery topic is silent, no message is being published, but the rollup connector is always listening anyway (it is assumed as cheap enough).

Dataflow/Flink jobs however are not running normally, we need to schedule them manually, for example with script in edgelq repo, scripts/run-monitoring-gcp-recovery-pipeline.sh. We need to specify the time range from which data will be loaded, the recovery job is a batch type. It will run, compute all the data, and submit to a separate recovery rollup topic. Once the job finishes, it is deleted, this is the nature of recovery jobs here. As of now, we have Apache Beam implementation only for these jobs: file src/main/java/com/ntt/monitoring/beam/pipelines/MonitoringRecoveryPipeline.java.

In this recovery pipeline, remember to set the correct SHARDS_COUNT and BLOB_DURATION. Those should be synchronized with settings in:

cuttle monitoring list recovery-store-sharding-infos

This also shows what periods are possible to recover. Value BLOB_DURATION must be taken from spec.tsBlobPeriod (PT1H is 3600 seconds), and value SHARDS_COUNT must be equal to spec.shardsCount.

This script in SPEKTRA Edge is exclusively for GCP, and will eventually be obsolete. For Flink, we have instructions in edgelq-deployment repository, file README-Azure-Devops.md, and find the Monitoring recovery job part.