Developing your Business Logic in Controller

How to develop your business logic in controller.

API Server can execute very little actual work - all reading requests are limited in size, they can fetch a page. Write requests will stop working if you start saving/deleting too many resources in a single transaction. Multiple transactions will also make users wonder if something is stuck. Some actions are intense, for example, when a user creates a Distribution resource in applications.edgelq.com, that matches thousands of Edge devices, the system needs to create thousands of Pod services. Transaction from this side is practically impossible, pods must be created asynchronously for a service to operate correctly.

Since we are using No-SQL databases, which don’t have cross-collection joins, we need sometimes to denormalize data, and make copies to be able to read from a single collection all necessary data.

Service development very often requires the development of its business logic controller - it is designed to execute all additional write tasks in an asynchronous manner.

We also need to acknowledge that:

  • Some write requests may be failing, and some parts of the system may be not available. We need to have reasonable retries.

  • System may be in constant move

    actions changing the desired state may be arriving asynchronously. Tasks may change dynamically even before they are completed.

  • For various reasons (mistake?) users may delete objects that need to exist. We need to handle interruptions and correct errors.

The business logic controller was designed to react in real-time, able to handle failures, cancel or amend actions when necessary, heal the system to the desired state.

Desired state/Observed state are the key things here. Controllers are first optimized for Create/Update/Delete operations, trying to match the desired state with the observed. The pattern is the following: The Controller uses Watchers to know the current system state, ideally, it should watch the subset it needs. The observed state of some resources is used to compute the desired state. Then desired state is compared with the relevant part of the observed state again, any mismatch is handled by Create/Update/Delete operation. Although this is not the only way the controller can operate, this is the most common.

Since the exact tasks of the business logic controller are service-specific, SPEKTRA Edge/Goten provides a framework for building it. This is different compared to db-controller, where we have just ready modules to use.

Of course, there are some ready controller node managers, like limits mixin, which need to be included in each controller runtime if the limits feature is used. This document however provides explanations of how to create own one.

Some example tasks

Going back to Distributions, Devices, and Pods: The Controller should, for any matching combination of Device + Distribution, create a Pod resource. If the Device is deleted, all its pods must be deleted. If Distribution is deleted, then similarly pods need to be deleted from all devices.

Observed states are Devices, Pods, and Distributions. The observed state of Distributions and Devices is used to compute the desired pod set. This is then compared with observed pods to create action points.

Another example: Imagine we have a collection of Orders and Products, and one order can point to one product, but the product can be pointed to by many orders. Imagine that we want to display a view of orders, but each item has also short product info. Since we have no SQL without joins, we will need to copy short info from a product into order. We can do this when the Order is created/updated, get Product resource, and copy its info to the Order record. It may be questionable whether we want to update existing orders if the product is updated, for the sake of this example, suppose we need to support this case. In this case, we observe products and orders as observed state. For each observed order, we compute the desired one by checking current product info. If there is any mismatch, we issue an Update to a server. Note we have both observed and desired state of orders here.

Architecture Overview

When it comes to controllers, we define a thing called a “Processor”. The processor is a module that accepts the observed state as the input. Inside, it computes the desired state. Note that observed and desired states can potentially consist of many resource types and many collections. Still, it should concentrate on isolated business logic tasks, for example, management of Pods based on Distributions and Devices is such a task. Still inside a processor, desired and observed state is provided into internal syncers that ensure the valid state of the system. The processor does not have any output, it is a rather high-level and large object. However, processors are scoped.

Resource models in SPEKTRA Edge are concentrated around tenants, Services, Organizations, Projects, usually the last one though. This is where each processor is scoped around, selected Service, Organization, or Project. We have as many processor instances as many tenant resources in total. This is for safety reasons, to ensure that tenants are separated. It would not be good if by mistake we matched Distribution with Devices from different projects. Then one tenant could schedule pods in the other one…

Therefore, we need to remember, that the Processor in the Business Logic Controller is a unit scoped by a tenant (usually Project), and focused on executing a single business logic task (developer defined). This business logic task may produce as many desired states (one per collection) as deemed necessary by the developer.

Above a Processor, we have a “Node”. Node contains:

  • Set of processors, one per tenant it sees.
  • Set of watchers, one per each input (observed) collection.

Node is responsible for:

  • Management of processors, for example, if a new project is created, it should create a new processor object. If the project is deleted, then the processor must also be deleted.
  • Running set of watchers. By using common watchers for processors, we ensure that we do not have too many streams to the servers (multiple small projects are a thing here).
  • Distributing observed state changes to the processors, each change set should be split if necessary and provided to the relevant processors.

The node should be considered self-contained and generally, the highest-level object, although we have things like “Node Managers”, which manage a typically fixed set of nodes, typically one to four of them. We will come back to this with Scaling considerations topic here.

We can go back to a Processor instance: each has a “heart”, one primary goroutine that runs all the internal computations and events, only one, to avoid multi-threading issues as much as possible. Those “events” include all observed state changes provided by a Node to Processor. This “heart” is called a processor runner here. Its responsibility includes computing the desired state.

Modules-wise, each processor consists of (typically):

  • Set of input objects. They are queues, where the Node is pushing observed state changes on the produces side. On the consumer side, the processor runner extracts those updates and pushes them into “stores”.
  • Set of stores, one per observed collection, stores are stateful and contain full snapshots of the observed collection. When the processor runner gets an update from the input object, it applies change on the “store”. This is where we decide if there was any update/deletion/creation.
  • Set of transformers. They observe one or many stores, which are responsible for propagating changes in real-time to them. Transformers contain code responsible for computing the desired state based on the observed one.
  • Set of syncers. Each has two inputs: One is some store with the observed state, other is the transformer producing the desired state. In some cases though, it is possible to provide more than one transformer to the desired state input of a syncer.

All of these components are run by Processor Runtime goroutine with little exception, Syncers have internally additional goroutines that are executing actual updates (create, delete, and update operations). Those are IO operations, therefore it is necessary to delegate those tasks away from the processor runner.

An important note is that the processor runner MUST NOT execute and IO work, it should be always fast. If necessary, framework allows to run additional goroutines in the processor, which can execute longer operations (or those that can return errors).

One final thing to talk about processors is initial synchronization. When Node boots up initially, its number of processors is 0, and watchers for the observed state are empty. First, watchers need to start observing relevant input collections, as instructed. When they start, before getting real-time updates, they get a current snapshot of the data. Only then will we start getting real-time updates, that happened after point of snapshot in time. Node is responsible for creating as many processors as many tenants it observes. Events from different collections may be out of sync, sometimes we may get tenants after other collections, sometimes before, often both. It is also possible for a watcher to lose connectivity with a server. If disconnection is long enough, it may opt for requesting a snapshot again after successful reconnection. A full snapshot for each tenant is delivered to each corresponding processor. Therefore, when Node provides an “event” to a processor, it must include “Sync” or “LostSync” flags too. In the case of “Sync”, the processor is responsible for generating its diff using its internal Store with the previous snapshot.

Note that each observed input of the processor will get its own “sync” event, and we can’t control the order here. It is considered that:

  • Sync/LostSync events must be propagated from inputs/stores to syncers.
  • Transformer must send a “Sync” signal when all of its inputs (or stores it uses) are in sync. If at least one gets a LostSync event, then it must propagate LostSync to the Syncer’s desired state.
  • Syncer’s desired state is in sync/non-sync depending on events from the transformer(s).
  • Syncer’s observed state is in sync/non-sync depending on the sync/lostSync event from the store it observes.
  • Syncer’s updater executes updates only when both desired and observed states are in sync. When they both gain a sync event, the syncer executes a fresh snapshot of Create/Update/Delete operations, all previous operations are discarded.
  • Syncer’s updater must stop actions when either observed or the desired state loses sync.
  • Transformers may postpone desired state calculation till all inputs achieve sync state (developer decides).

Prototyping controllers with proto annotations

In Goten, we first define the structure of the business logic controller (or what is possible) in protobuf files, we define the structure of Nodes, Processors, and their components.

A full reference can be found here: https://github.com/cloudwan/goten/blob/main/annotations/controller.proto. We will discuss some examples here to provide some more clarity.

By convention, in proto/$VERSION we create a controller subdirectory for proto files. In regenerate.sh we add relevant protoc compiler call, like in https://github.com/cloudwan/inventory-manager-example/blob/master/regenerate.sh, find --goten-controller_out.

When going through examples, we will explore some common patterns and techniques.

Inventory manager example - Processor and Node definitions.

We can review some examples, first Inventory Manager, definition of a Processor: https://github.com/cloudwan/inventory-manager-example/blob/master/proto/v1/controller/agent_phantom_processor.proto

We can start from the top of the file (imports and top options):

  • See go_package annotation - this is the location where generated files will be put. Directory controller/$version/$processor_module is a convention we use and recommend for Processors.
  • Import of goten.proto and controller.proto from goten/annotations is required.
  • We need to import service packages’ main files for the versions we intend to use. For this example, we want to use monitoring from v4, and Inventory manager for v1. Relevant import files were added.
  • We also import “common components”, but we will return to it later.

In this file, we define a processor called “AgentPhantomProcessor”. We MAY then optionally specify the types we want to use. This one (CommonInventoryManagerControllerTypes), is specified in the one imported file we mentioned we will come back later to it. Let’s skip explaining this one yet.

The next important part is definitions. In Goten, resource-type names are fully qualified with the format $SERVICE_DOMAIN/$RESOURCE_NAME. This is how we need to specify resources. Definitions can be used to escape long names into shorter ones. With the next example, we will also demonstrate another use case.

In AgentPhantomProcessor, we would like to generate a single PhantomTimeSerie resource per each ReaderAgent in existence. So this is a very simple business logic task, make one additional resource for everyone in another collection.

Since both ReaderAgent and PhantomTimeSerie are project-scoped resources, we want processors to operate per project. Therefore, we declare that “Project” is a scope object in the processor. Then we define two inputs: ReaderAgent and PhantomTimeSerie. Here in protobuf, “input” code-wise will consist of 2 components: Input and Store object (as described in the architecture overview).

We define a single transformer object: AgentPhantomTransformer. There, we want to notify you that this transformer should produce the desired collection of PhantomTimeSerie instances, where each will be owned by some ReaderAgent. It simplifies cleanup, if ReaderAgent is deleted, the transformer will delete PhantomTimeSerie from the desired collection. The best transformer type is owner_ownee in such a situation, where each output resource belongs to a separate parent.

After transformers, we define syncers, we have one instance, PhantomTimeSerieSyncer. It takes PhantomTimeSerie from the input list as observed input. Then the desired collection must come from AgentPhantomTransformer.

This Processor instance shows us what connects with what, we constructed the structure in a declarative way.

Now let’s come back to types. As we said in the architecture overview, the Processor consists of input, store, transformer, and syncer objects. While transformers can be specified only in Processor definitions, the rest of those little elements can be delegated to type sets (here it is CommonInventoryManagerControllerTypes). This is optional, type_sets are not needed very often, here as well. If they were not defined, then the compiler would generate all necessary components implicitly in the same directory indicated by go_package along the processor. If type_sets are defined, then it will try to find types elsewhere before deciding to generate some on its own.

Separate type_sets can be used for example to reduce unnecessary generated code, especially if we have multiple processors using similar underlying types. In the Inventory manager, it was done for demonstration purposes only. Let’s see this file though: https://github.com/cloudwan/inventory-manager-example/blob/master/proto/v1/controller/common_components.proto.

We define here input, store, and syncer components. Note that go_package is different compared to the one in the processor file. It means that generated components will reside in a different directory than the processor. The only benefit here is this separation, but it’s not strictly required.

Finally, note that in the processor we only indicated what the is controller doing, and the connections. However, implementation is not here yet, it will be in the Golang. For now, let’s jump to the Node declaration, which can be found here: https://github.com/cloudwan/inventory-manager-example/blob/master/proto/v1/controller/inventory_manager_ctrl_node.proto

Node is a component managing Processor instances and is responsible for dispatching real-time updates from all watchers to processors, which are scoped in this example by an Inventory Manager Project (inventory-manager.edgelq.com/Project).

In this example file, we declare a Node called “InventoryManagerCtrl”. The processors we want to attach are just one element array containing AgentPhantomProcessor. We potentially could attach more processors, under one condition though: All must be scoped by the exactly same object. Since AgentPhantomProcessor is scoped by Project (inventory-manager.edgelq.com/Project), other processors would need the same.

Compiler parsing such a Node definition will automatically detect Scope and all Input resources. What we need to define is:

  • Sharding method, since the scope is a Project, the standard sharding for it is “byProjectId”. For organization, it would be “byOrgId”, for service, “byServiceId”. All 3 can be optionally replaced with “byIamScope”. We will return to it when talking about scaling.
  • Dispatchment: When Node gets snapshot + real-time updates from watchers for all resources (Scope + Input), it needs to also know how resources should be grouped.
    • Param scope_grouping tells us how the Project is identified. Normally, we want to define Project ID by using its name, if you are unsure, just pass method: NAME for scope_grouping. In result, Node will extract the name field from a Project and use it as a Processor Identifier.
    • Param input_groupings is defined per each detected input resource. In the processor, we defined monitoring.edgelq.com/PhantomTimeSerie and inventory-manager.edgelq.com/ReaderAgent (which were shortened to PhantomTimeSerie and ReaderAgent). Input groupings instruct Node how each resource instance of a given type should be classified, which means, how to extract ID of the corresponding processor instance. Resource ReaderAgent is a child of an inventory-manager.edgelq.com/Project instance according to the api-skeleton. Therefore, we want to indicate that the method for grouping is a “NAME” type. Node can figure out the rest. Resource PhantomTimeSerie is a bit more tricky because its parent resource is not inventory-manager.edgelq.com/Project, but monitoring.edgelq.com/Project. Still, Node will need a method to extract the name of inventory-manager.edgelq.com/Project from the monitoring.edgelq.com/PhantomTimeSerie instance. Because it can’t be done in a declarative way (as of now, the compiler does not figure out things by string value as IAM Authorizer), we must pass the CUSTOM method. It means that in Golang we provide our function of getting processor ID.

When deciding on dispatchment annotation, we need to know that Node has a customizable way of defining Processor Identifier. We need to provide a way how <Resource Instance> needs to be mapped into <Processor Identifier>, and we need to do this for Scope AND all Input resources. Method NAME passed to either scope or input resource means that Node should just call the GetName() function on the resource instance to get an Identifier. It will work for same-service resources, but for others like PhantomTimeSerie not, GetName returned by it would eventually point to Project in monitoring service.

Although the GetName() method on the ReaderAgent instance would return the Name of ReaderAgent than a Project, Node is able to notice that the Name of ReaderAgent contains also the name of the project.

Applications example - Processor and Node definitions.

We have a more interesting example of a Controller in applications.edgelq.com. We have a Controller processor responsible for Pods management, we say that “Pod management” is a business logic task. There are two things we want from such a processor:

  • Create a Pod instance per each matched Device and Distribution.
  • Whenever the Device goes offline, we want to mark all its Pods as offline.

Business notes:

  • One Device can host multiple Pods, and Distribution can create Pods across many devices. Still, Device+Distribution can have one Pod instance at most.
  • Pod can be deployed manually, not via Distribution. Not all pods are of distribution type.
  • When the Device gets online, it will Update pod statuses itself. But when it goes offline, the controller will need to do it. Note that it means, that basically, the controller will need to track pods of offline status.
  • If the device is offline, the pod status should be “Unknown”.
  • Resources Pods, Distribution, and Devices are project scoped, however, Device belongs to the devices.edgelq.com service, and other resources to applications.edgelq.com. Still, the Project is our primary scope.

With this knowledge, we can draft the following Processor declaration: https://github.com/cloudwan/edgelq/blob/main/applications/proto/v1/controller/pods_processor.proto

Compared to the previous example, goten.controller.type_set is declared in the same file, but for now let’s skip this part, and talk about the processor first. There, we have the PodsProcessor type defined. As we can deduce from business notes, the Scope resource should be “Project”, inputs should be clear too. Then we have two transformers, one per business task defined. You should also note that we have two additional definitions of the applications.edgelq.com/Pod instance. One is DistributionPod, other is UnknownStatePod. As mentioned in business notes, not all pods belong to distribution, and pods with unknown states are considered also a subset of all notes. Those extra definitions can be used to differentiate between types and help write proper controllers.

Transformer DistributionController is of known already type, owner/ownee. But in this case, each Pod instance is owned by a unique combination of Distributions and Devices. Also, when either of the parents is deleted, all associated pods will be automatically deleted.

Another transformer, UnknownStateTracker, is of a different kind: Generic. This type of transformer just takes some number of inputs, and then produces some number of outputs. In this particular case, we want to just have some Devices and Pods, where each belongs to a specific Device only. For each offline Device, we want to mark its Pods as of Unknown state. Generic type requires more code implementation and developers need to handle all input events: Additions, updates, and deletions too. For each change in the input resources new snapshot of the output (or DIFF to the snapshot) is required.

One alternative we could have used is a Decorator:

{
  name : "UnknownStateStatusDecorator"
  decorator : {
    resource : "Pod"
    additional_inputs : [ "Device" ]
  }
}

The decorator takes the same resource on the output, in this case, when Pod is changed, the decorator function will be called to decorate Pod resource. There, we could get the Device record owning pod, check the Device status, and then mark the Pod status. If the device changes, then it would trigger a re-compute of all Pods it belongs to (decorator is called again). We did not use this decorator here, because the Controller should only mark Pod status as UNKNOWN when the Device is offline. When the Device is online, it needs to manage its Pod statuses. This “shared” ownership means that the decorator was not exactly suitable, instead, we may need to use a “generic” type, and output pods that have UNKNOWN status. The controller needs to run UpdatePod for only offline device pods. If the device gets online, the controller should “forget” about those pods. What do we mean: UnknownStateTracker DELETES pods from output collection if the device becomes online (it’s not the same as actually Deleting pods!). This is why the output from UnknownStateTracker is UnknownStatePod, not Pod. We want to show that output contains pods with unknown status, not all pods. We will come back to this when commenting on the implementation in Go.

We also will be re-checking offline pods periodically, producing snapshots (per scope project) after each period. By default, transformer would be kicked only when some Pod/Device changes (create, update, delete).

Now going back to goten.controller.type_set - there, we defined only the Store component for a Pod resource, with one custom index, even though we have multiple resource definitions in the processor. As we mentioned, this type set is an optional annotation and the compiler can generate missing bits on its own. In this particular case, we wanted to have a custom index for pods, field path spec.node defines a field path to the Device resource. This index gives us just some convenience in the code later on. Anyway, this is another use case for type sets, the ability to enhance default types we would get from the code-generation compiler.

Node definition can be found here: https://github.com/cloudwan/edgelq/blob/main/applications/proto/v1/controller/applications_ctrl_node.proto

However, in this case, it is pretty much the same as in Inventory Manager.

Overview of generated code and implementing missing bits

The best way to discuss controller code is by examples again, we will check the example Inventory Manager and Application.

Inventory manager

In InventoryManager, we want the following feature: a time series showing the history of online/offline changes per agent. First, each agent runtime should be sending an online signal within the interval (1 minute), using a CreateTimeSeries call from monitoring. When an agent goes offline, it can be sending “offline” though - instead, we need to generate a PhantomTimeSerie object per each agent, so it can generate data when original metrics are missing. This is how we obtain online/offline history, zeroes are filling periods of offline, “ones” of online parts. This is a task we did for the Inventory Manager.

The controller code can be found here: https://github.com/cloudwan/inventory-manager-example/tree/master/controller/v1.

As with the rest of the packages, file names with .pb. are generated, otherwise handwritten. Directory common from there contain only generated types, as pointed out by the proto file for type_sets. More interesting is the agent phantom processor to be found here: https://github.com/cloudwan/inventory-manager-example/tree/master/controller/v1/agent_phantom.

We should start examining examples from there.

The first file is https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/agent_phantom/agent_phantom_processor.pb.go.

It contains the Processor object and all its methods. We can notice the following:

  • In the constructor NewAgentPhantomProcessor, we are creating all processor components as described by the protobuf file for a processor. Connections are done automatically.
  • Constructor gets an instance of AgentPhantomProcessorCustomizer, which we will need to implement.
  • The processor has a “runner” object, this is the “heart” of the processor handling all the events.
  • Processor has a set of getters for all components, including runner and scope object.
  • Processor has AddExtraRunner function, where we can add extra procedures running on separate goroutines, doing some extra tasks not predicted by processor proto definition.
  • Interface AgentPhantomProcessorCustomizer has an extra default partial implementation.

In the customizer, we can:

  • Add PreInit and PostInit handlers

    PreInit is called for a processor with all internal components not initialized. PostInit is done after initial construction is completed (but not after it runs).

  • We have StoreConfig calls, which can be used to additionally customize Store objects. You can check the code to see the options, one option is to provide an additional filter applied to the store, so we don’t see all resources.

  • Functions ending with ConfigAndHandlers are for Syncer objects. We will have to implement them. This is for the final configuration & tuning of Syncers.

  • Functions ending with ConfigAndImpl must be used to customize transformers.

  • We can also hook a handler in case the Scope object changes itself (like, some fields in the Project). Usually, it is left empty, but we may hit some use cases for it still.

After reviewing the processor file, you should see the processor customizer implementation. This is a handwritten file, here example for InventoryManager: https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/agent_phantom/agent_phantom_processor.go.

Constructor we can define however we want. Then, for implementation notes:

  • For PhantomTimeSerieStoreConfig, we want to ensure to filter out PhantomTimeSeries that are not of specific metric type, of which we don’t have specific meta owner types. This may often be redundant because we can define the proper filter for PhantomTimeSerie objects themselves (in a different file, we will come back to it).

  • In the function AgentPhantomTransformerConfigAndImpl, we need to return an implementation handler that must satisfy the specific interface required by the transformer. In the config file, usually, provide a reconciliation mask. These masks are used to prevent the triggering of the transformer function for non-interesting updates. In this example, we are checking field paths online, activation.status, and location. It means, that if some of those fields change in ReaderAgent, then we will need to trigger the transformer to recompute PhantomTimeSerie objects (for this agent only). Reconciliation mask helps reduce unnecessary work. If someone changed let’s say display name of the agent, then no work would be triggered.

  • In function PhantomTimeSerieSyncerConfigAndHandlers we are customizing Syncer for PhantomTimeSeries objects. In the config part, we almost always need to provide update mask, fields that are maintained by the controller. We also may provide information on what to do in case of duplicated resource detection - by default we delete them, but it may be OK to provide this value explicitly (AllowDuplicates is false). Apart from that, there is some quirk about PhantomTimeSerie instances:

    Fields resource and metric are non-updatable. Because of that, we need to disable updates UpdatesDisabled. It is recommended to review all options in the code itself to see what else can change. Handlers for syncer are a bit tricky here, we could have just returned NewDefaultPhantomTimeSerieSyncerHandlers, but we need some special cases, which is common for PhantomTimeSerie instances. We will come back later to it.

  • In the function PostInit we are providing extra goroutine, ConnectionTracker. It is doing work unpredicted by the controller framework for now and needs some IO. For those reasons, it is highly recommended to delegate this work on a separate goroutine. This component will also get updates from the ReaderAgent store (create, update, delete).

Let’s first discuss the phantomTimeSerieSyncerHandlers object. It extends generated common.PhantomTimeSerieSyncerHandlers. Custom handlers are quite powerful tools, we can customize even how the object is created/updated/deleted, by default, it uses standard Create/Update/Delete methods, but it does not need to be this way. In this particular case, we want to customize identifier extraction from the PhantomTimeSerie resource instance. We created a key instance for this defined here: https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/agent_phantom/agent_phantom_key.go.

By default, the identifier of a resource is just simply extracted from the name field. However, PhantomTimeSerie is very special in this manner: This resource has a non-predictable name! All CreatePhantomTimeSerie requests must not specify its name, it’s assigned by a server during creation. This has nothing to do with the controller, it is part of the PhantomTimeSerie spec in monitoring. For this reason, we are extracting some fields that we know will be unique. Since we know that for a given ReaderAgent we will generate only one “Online” metric, we use just the agent name extracted from metadata along metric type value. This customized syncer will then match desired and observed PhantomTimeSerie resources using those custom IDs.

Connection tracker defined here: https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/agent_phantom/connection_tracker.go shows some examples of controller tasks that were not predicted by the controller framework. It is being run on a separate goroutine, however, OnReaderAgentSet and OnReaderAgentDeleted are called by the processor runner, the main goroutine of the processor. This mandates some protection. Golang’s channels may have been used perhaps, but we need to note that they have limited capacity, if they get full processing threads stalls. Maps with traditional locks are safer in this manner and are often used in SPEKTRA Edge, which solved some issues when there were sudden floods of updates. The benefit of maps is that they can merge multiple updates at once (overrides). With channels, we would need to process all individual elements.

Going back to comments about implementation: As we said, we are ensuring monitoring has a time series per each agent showing if the agent was online or offline at a given time point. However, to synchronize the “online” flag, we are periodically asking for monitoring for time series for all agents, then flip flags if they mismatch with the desired value.

Let’s move forward, to files for the transformer. The generated one can be found here: https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/agent_phantom/agent_phantom_transformer.pb.go.

Notes:

  • The interface we need to notice is AgentPhantomTransformerImpl, this one needs implementation from us.
  • Config structure AgentPhantomTransformerConfig, which needs to be provided by us.
  • In transformer code, we are already handling all events related to input resources, including deletions. This reduces the required interface from AgentPhantomTransformerImpl to a minimum, we just need to compute desired resources for a given input.
  • Note that this config and impl are provided by your customizer implementation for the processor.

The file with the implementation for the transformer is here: https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/agent_phantom/agent_phantom_transformer.go.

Notes:

  • For a single ReaderAgent, we may have potentially N output resources (PhantomTimeSerie here).
  • When we create DESIRED PhantomTimeSerie, note that we provide only the parent part of the name field, When we call NewNameBuilder, we are NOT calling SetId. As part of PhantomTimeSerie spec, we can only provide parent names, but never own ID. This must be generated by a server. Note this combines with custom PhantomTimeSerie Syncer handlers, where we extract ID from metadata.ownerReferences and metric.type.
  • PhantomTimeSerie is constructed with service ownership info and ReaderAgent. This ensures that we will own this resource, not another service. Metadata ensures PhantomTimeSeries will be cleaned up (this is an additional cleanup guarantee, as a transformer of owner/ownee type can provide the same functionality).

To summarize, when implementing a Processor, it is necessary to (at the minimum):

  • Provide all transformers implementations and define their configs.
  • Provide an implementation of processor customizer, at the minimum, it needs to provide objects for syncers and transformers.

We need to provide missing implementation though not just for processors, but nodes too. You can typically find three code-generated files for nodes (for InventoryManager example):

The main file to review is the one ending with the pb.node.go name. In the constructor, it creates all watcher instances for all scope and input resources for all processors. It manages a set of processors per project (in this case we have one processor, but more could be available). All updates from watchers are distributed to relevant processors. It is quite a big file, initially, you may just remember, that this component just watches all collections in real-time and pushes updates to Processors, so they can react. However, at the top of the file, there are four types you need to see:

  • InventoryManagerCtrlFieldMasks

    Its generic name is <NodeName>FieldMasks.

  • InventoryManagerCtrlFilters

    Its generic name is <NodeName>Filters.

  • InventoryManagerCtrlCleaner

    Its generic name is <NodeName>Cleaner.

  • InventoryManagerCtrlNodeCustomizer

    Its generic name is <NodeName>NodeCustomizer.

Of these types, the most important for developers is NodeCustomizer. Developers should implement its functions:

  • Filters() needs to return filters for all input resources (From all processors) and scope resources. This is important, the controller should only know the resources it needs to know!
  • FieldMasks() needs to return field masks for all input resources (From all processors) and scope resources. It is very beneficial to return only the fields the controller needs to know, especially considering that the controller will need to keep those objects in RAM! However, be aware to include all needed fields, those needed by dispatchment (typically name), those needed by transformers and reconciliation masks, and all fields required by syncers (Update masks!).
  • Function GetScopeIdentifierForPhantomTimeSerie (or GetScopeIdentifierFor<Resource>) was generated because in protobuf, in dispatchment annotation for PhantomTimeSerie, we declared that the identifier is using the CUSTOM method!
  • Function CustomizedCleaner should return a cleaner that handles orphaned resources in case Scope resource (Project here) is deleted, but some kid resources exist. However, in 99.99% of cases, this functionality is not needed. When the Project is deleted, then all kid resources are cleaned up asynchronously by the db-controller.
  • Function AgentPhantomProcessorCustomizer must return a customizer for each Processor and scope object.

Developers need to implement a customizer, for the inventory manager we have the file: https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/inventory_manager_ctrl_node.go.

Notes:

  • For GetScopeIdentifierForPhantomTimeSerie, we need to return the name object of the Inventory Manager project. Using the name of a PhantomTimeSerie is very easy though. We may find some autodetection in the future: If the name pattern matches across resources, then the developer won’t need to provide those simple functions.
  • In the FieldMask call, the Mask for ReaderAgent needs to be checked against all fields used in the processor - reconciliation mask and connection tracker. The name should always be included.
  • In the Filters call, we need to consider a couple of things:
    • We may have multi-region env, and each region will have its controllers. Typically, for regional resources, we should get those belonging to our region (ReaderAgents or PhantomTimeSeries). Projects should we get that can be in our region, so we filter by enabled regions.
    • Resources from core SPEKTRA Edge services we should filter by our service, ideally by owned. Otherwise, we would get PermissionDenied.
  • Customizer for processor construction should be straightforward. Any extra params were provided upfront, passed to the node customizer.

To see a final top-level implementation bit for the business logic controller for InventoryManager, see https://github.com/cloudwan/inventory-manager-example/blob/master/cmd/inventorymanagercontroller/main.go.

Find the NewInventoryManagerCtrlNodeManager call, this is how we construct node manager and how we pass our node customizer there. It should conclude this example.

Applications controller implementation

Controller implementation for applications can be found here: https://github.com/cloudwan/edgelq/tree/main/applications/controller/v1.

It contains additional information compared to Inventory Manager so let’s go through it, but skip common parts with previous example.

Starting from the processor, the main files are:

As described in the protobuf part, we have essentially two transformers and two syncers, for two different sub-tasks of general pod processing.

Let’s start with a transformer called DistributionController. For a quick recap, this transformer is producing Pods based on combined Device and Distribution resources, each matched Device + Distribution should produce a Pod, called DistributionPod in protobuf. Not all pods belong to Distribution though! Some may be deployed manually by clients.

You should start examining code from the processor customizer (link above).

In the function DistributionControllerConfigAndImpl of customizer we are creating a config, that reacts to specific field path changes for Distributions and Devices. At least as for now, distribution is matched with the device based solely on metadata.labels field path in Device, so this is what we check in Device. For Distribution, we want to recompute pods if the selector or pod template changes, other updates to Distributions should not trigger Pod re-computation! Also, note that the implementation object also can have Store instances, so we can access the current state. This will be necessary.

In the transformer, https://github.com/cloudwan/edgelq/blob/main/applications/controller/v1/pods/distribution_controller.go, there are additional knowledge elements. Since we know that this transformer is meta-ownee type, BUT we have two owners, we must implement two functions, one computing Pods for Distribution across all matched devices, other computing Pods for Device across Distributions. Note that each DESIRED generated pod does not have a clear static name value, GenerateResourceIdFromElements may have non-deterministic elements. We will need to reflect this when configuring Syncer for the DistributionPod type.

This is how pods are generated. To continue analyzing the behavior of Distribution pods, go back to the customizer for the processor and find the function PodFactoryConfigAndHandlers. Config with an update mask only seems ordinary, but there is an example of limit integration. First, we construct default handlers. Then, we are attaching the limits guard. Note that Pods are subject to limits! There is a possibility that we will fit with Devices/Distributions in the plan, but we would exceed Pods.

In such a situation, Syncer must:

  • Stop executing CreatePod if we did hit a limit.
  • UpdatePod should continue being executed as normal.
  • DeletePod should be executed as normal.

Once we have a free limit (as a result of plan change or deleted other pods), creation should be resumed. Limits guard is a component that must be used if we may be creating resources in the limits plan! Note also that in the PostInit call, we must additionally configure the limits guard.

For the pod factory syncer, we also provided some other syncer customizations:

  • Identifiers of pods have a custom implementation, since the pod name may be non-deterministic.
  • We must be very careful of what we delete! Note that in the protobuf section for PodFactory Syncer desired state takes pods from the DistributionController transformer. But the observed state contains ALL pods! To prevent the wrong deletion, we must provide additional CanProceedWithDeletion.

Let’s move on to the next transformer and syncer, handling unknown state pods. As a recap, the controller must mark pods whose device went offline as of UNKNOWN status. A set of unknown pods is a collection on its own (UnknownStatePod). When the Device gets online, we will need to remove the pods belonging there. We want to recompute snapshots of unknown state pods periodically - so this is what we declared in protobuf.

Starting with the transformer, we have the UnknownStateTrackerConfigAndImpl function used to customize it and examine it, it is in the Customizer implementation for PodsProcessor. Note that the config object has now a SnapshotTimeout variable. This timeout decides the interval how often the desired collection is re-computed (in this case!). Note that we declared this transformer as periodic-snapshot generic type.

See transformer-generated file and handwritten customization:

From the PB file, note that the minimum implementation required is called CalculateSnapshot, which is called periodically as instructed. This is the minimum we require from implementation.

However, if you examine it carefully, you can notice code like this:

onDeviceSetExec, ok := t.Implementation.(interface {
    OnDeviceSet(
      ctx context.Context,
      current, previous *device.Device,
    ) *UnknownStateTrackerDiffActions })
if ok {
    t.ProcessActions(
      ctx,
      onDeviceSetExec.OnDeviceSet(ctx, current, previous),
    )
}

Basically, all generic transformers allow additional custom interfaces for implementations, generally, On<Resource>Set and On<Resource>Deleted calls for each input resource. Those allow us to update desired collections much faster!

There is also an additional benefit of implementing those optional methods:

  • For generic, without periodic snapshot transformers, this avoids the CalculateSnapshot call entirely. In regular generic transformers, if the implementation does not implement the On<Resource><SetOrDeleted> call, the snapshot is triggered with a delay specified by the SnapshotTimeout variable (different behavior than periodic snapshot!). To avoid some extra CPU work, it is recommended to implement optional methods.

For this particular transformer, in file https://github.com/cloudwan/edgelq/blob/main/applications/controller/v1/pods/unknown_state_tracker.go, we implemented basic snapshot computation, where we get all pods with unknown statuses based on the last heartbeat from devices. However, we also implemented OnDeviceSet and OnDeviceDeleted. The set is especially important, when the Device gets online, we want to remove pods with unknown states from the desired collection ASAP. If we waited for timeout (more than 10 seconds), there is a possibility Device will mark pods online, but our controller would mark them unknown till timeout happens. This mechanism may be improved in the future though, even now we risk having two to three additional updates unnecessarily.

Going back to customizer (file pods_processor.go), see finally UnknownStateMaintainerConfigAndHandlers. We are again using Syncer for pods, but it’s a separate instance with a different update mask. We just want to control specific fields only, related to the status. Note that as in Distribution pods, the observed state contains ALL pods, but the desired state is only those with unknown status. To avoid bad deletions, we are disabling deletions entirely, creations too, as we don’t need them.

We can now exit processor type and examine node customizer, which can be seen here: https://github.com/cloudwan/edgelq/blob/main/applications/controller/v1/applications_ctrl_node.go.

It is very similar to customizer for Inventory Manager, with some additional info:

  • Note we are passing limits observer instance for limits guard integration. We will return to it shortly.
  • For the Filters() call, we need to note that the Distribution resource is non-regional - in fact, its instances are copied to all regions where the project is present in the multi-region environment. In those situations, we should filter by the metadata.syncing.regions field. This will return all distributions for all projects enabled in our region, which is basically what we need.

For the limits guard integration, also see a controller main.go file: https://github.com/cloudwan/edgelq/blob/main/applications/cmd/applicationscontroller/main.go.

Note that there we are constructing limits observer:

limitsObserver := v1ctrllg.NewLimitTrackersObserver(ctx, envRegistry)

We also need to run it:

g.Go(func() error {
    return limitsObserver.Run(gctx)
})

Limits observer instance should be global for the whole NodesManager, and be declared before, in the main!

Scaling considerations

Note that processors, to work properly, need to have:

  • Scope object (Project typically)
  • All input collections (snapshot of each of them)
  • All desired collections

In the case of multiple processors, input collections may be shared, but that’s not the point here, the point is, that the controller will need to have sufficient RAM, at least for now. This may be improved in the future, for example with disk usage. It won’t change the fact, that the Controller node needs to handle all assigned scope objects and their collections.

First, to minimize memory footprint, provide field masks for all collections, but be careful to include all necessary paths, there were bugs because of missing values! Then we need some horizontal scaling. For this, we use sharding.

Sharding divides resources into some groups. Note that in both examples we used byProjectId, declared explicitly in the protobuf file for Inventory Manager and Applications controller. This project ID sharding means that each Node instance will get a share of projects, not all of them. If we have only one Node, then it will contain data for all projects. But, if we have multiple Nodes, projects will be spread across them. Sharding by project also guarantees that resources belonging to one project will always belong to the same shard, this is why it is called byProjectId. For each resource, we extract a name, then we extract the project ID part from it and hash it. Hash is some large integer value, like int64. We need to know how big the ring is: 16, 256, 4096… For each ring, we modulo ring size and we get the shard number. For example byProjectId hash mod 16 gives us the byProjectIdMod16 shard key. Those values are saved in metadata.shards for each resource. This is done by sharding store plugins on the server side. Note that the field metadata.shards is a map<string, int64>. See https://github.com/cloudwan/goten/blob/main/types/meta.proto.

The ring size we use everywhere is 16 now, meaning we could potentially divide work across 16 nodes for all controller nodes.

When the first node starts, it will get assigned 16 shards, a value from 0 to 15. If the second node starts, it will get some random starting point, let’s say from 10-15, while the first node keeps 0-9. When the third node starts, it grabs some new random range, like 0-3. The remaining nodes are left with 4-9 and 10-15. It can continue till we are blocked by ring size and scaling is no longer effective.

Note that when the node starts, it can lower pressure on the two nodes at the maximum, not all. For this reason, we have a thing called Node Managers in all controllers, in all examples. We are building node managers in main.go files in the first place. Node managers start with one to four virtual node instances, but the most common is two. This way, when the new runtime starts, we have a good chance of taking pressure off from more instances.

Node managers are responsible for communicating with each other and assigning shards to their nodes. As of now, we use a Redis instance for this purpose. If you examined generated files for nodes, you could see that each Node has a method for updating the shard range. Shard ranges add additional filter conditions to filters passed from the node customizer instance.

With Kubernetes Horizontal Pod Autoscaler we are solving some issues with scaling, by splitting projects across more instances. This gives us some room for breath. But we have remaining 2 issues:

  • A super large project could potentially outgrow the controller.
  • Super large shards (lots of projects assigned to the same value) can be too massive.

For the first issue, we could leverage multi-region env, like we already did for example, we get resources mostly from our region only, so large projects can be further split across regions. Still, we may get hit with a large project-region.

For the second issue, we could switch to a larger ring size: like 256. However, it means we will have lots of controller instances, like 20 or more. Controllers also induce their overhead, meaning that we are wasting plenty of resources just for a large number of instances.

Presented techniques still provide us with some flexibility and horizontal scaling. To scale further, we can:

  • Introduce improvements in the framework, so it can compress data, use disk, or even “forget” data and retrieve it on demand.
  • Use diagonal scaling - use horizontal autoscaling first (like in Kubernetes), then, if the number of instances hits some alert (like 4 pods), then we can increase assigned memory in the YAML declaration and redeploy.

Diagonal autoscaling with automation in one axis may be most efficient, even though it will require little reaction from the operator, to handle the alert and increase values in yaml. Note however this simple action also has a potential for automation.