Developing your Business Logic in Controller
API Server can execute very little actual work - all reading requests are
limited in size, they can fetch a page. Write requests will stop working
if you start saving/deleting too many resources in a single transaction.
Multiple transactions will also make users wonder if something is stuck.
Some actions are intense, for example, when a user creates a Distribution
resource in applications.edgelq.com
, that matches thousands of Edge devices,
the system needs to create thousands of Pod services. Transaction from
this side is practically impossible, pods must be created asynchronously
for a service to operate correctly.
Since we are using No-SQL databases, which don’t have cross-collection joins, we need sometimes to denormalize data, and make copies to be able to read from a single collection all necessary data.
Service development very often requires the development of its business logic controller - it is designed to execute all additional write tasks in an asynchronous manner.
We also need to acknowledge that:
-
Some write requests may be failing, and some parts of the system may be not available. We need to have reasonable retries.
-
System may be in constant move
actions changing the desired state may be arriving asynchronously. Tasks may change dynamically even before they are completed.
-
For various reasons (mistake?) users may delete objects that need to exist. We need to handle interruptions and correct errors.
The business logic controller was designed to react in real-time, able to handle failures, cancel or amend actions when necessary, heal the system to the desired state.
Desired state/Observed state are the key things here. Controllers are first optimized for Create/Update/Delete operations, trying to match the desired state with the observed. The pattern is the following: The Controller uses Watchers to know the current system state, ideally, it should watch the subset it needs. The observed state of some resources is used to compute the desired state. Then desired state is compared with the relevant part of the observed state again, any mismatch is handled by Create/Update/Delete operation. Although this is not the only way the controller can operate, this is the most common.
Since the exact tasks of the business logic controller are service-specific, SPEKTRA Edge/Goten provides a framework for building it. This is different compared to db-controller, where we have just ready modules to use.
Of course, there are some ready controller node managers, like limits mixin, which need to be included in each controller runtime if the limits feature is used. This document however provides explanations of how to create own one.
Some example tasks
Going back to Distributions, Devices, and Pods: The Controller should, for any matching combination of Device + Distribution, create a Pod resource. If the Device is deleted, all its pods must be deleted. If Distribution is deleted, then similarly pods need to be deleted from all devices.
Observed states are Devices, Pods, and Distributions. The observed state of Distributions and Devices is used to compute the desired pod set. This is then compared with observed pods to create action points.
Another example: Imagine we have a collection of Orders and Products, and one order can point to one product, but the product can be pointed to by many orders. Imagine that we want to display a view of orders, but each item has also short product info. Since we have no SQL without joins, we will need to copy short info from a product into order. We can do this when the Order is created/updated, get Product resource, and copy its info to the Order record. It may be questionable whether we want to update existing orders if the product is updated, for the sake of this example, suppose we need to support this case. In this case, we observe products and orders as observed state. For each observed order, we compute the desired one by checking current product info. If there is any mismatch, we issue an Update to a server. Note we have both observed and desired state of orders here.
Architecture Overview
When it comes to controllers, we define a thing called a “Processor”. The processor is a module that accepts the observed state as the input. Inside, it computes the desired state. Note that observed and desired states can potentially consist of many resource types and many collections. Still, it should concentrate on isolated business logic tasks, for example, management of Pods based on Distributions and Devices is such a task. Still inside a processor, desired and observed state is provided into internal syncers that ensure the valid state of the system. The processor does not have any output, it is a rather high-level and large object. However, processors are scoped.
Resource models in SPEKTRA Edge are concentrated around tenants, Services, Organizations, Projects, usually the last one though. This is where each processor is scoped around, selected Service, Organization, or Project. We have as many processor instances as many tenant resources in total. This is for safety reasons, to ensure that tenants are separated. It would not be good if by mistake we matched Distribution with Devices from different projects. Then one tenant could schedule pods in the other one…
Therefore, we need to remember, that the Processor in the Business Logic Controller is a unit scoped by a tenant (usually Project), and focused on executing a single business logic task (developer defined). This business logic task may produce as many desired states (one per collection) as deemed necessary by the developer.
Above a Processor, we have a “Node”. Node contains:
- Set of processors, one per tenant it sees.
- Set of watchers, one per each input (observed) collection.
Node is responsible for:
- Management of processors, for example, if a new project is created, it should create a new processor object. If the project is deleted, then the processor must also be deleted.
- Running set of watchers. By using common watchers for processors, we ensure that we do not have too many streams to the servers (multiple small projects are a thing here).
- Distributing observed state changes to the processors, each change set should be split if necessary and provided to the relevant processors.
The node should be considered self-contained and generally, the highest-level object, although we have things like “Node Managers”, which manage a typically fixed set of nodes, typically one to four of them. We will come back to this with Scaling considerations topic here.
We can go back to a Processor instance: each has a “heart”, one primary goroutine that runs all the internal computations and events, only one, to avoid multi-threading issues as much as possible. Those “events” include all observed state changes provided by a Node to Processor. This “heart” is called a processor runner here. Its responsibility includes computing the desired state.
Modules-wise, each processor consists of (typically):
- Set of input objects. They are queues, where the Node is pushing observed state changes on the produces side. On the consumer side, the processor runner extracts those updates and pushes them into “stores”.
- Set of stores, one per observed collection, stores are stateful and contain full snapshots of the observed collection. When the processor runner gets an update from the input object, it applies change on the “store”. This is where we decide if there was any update/deletion/creation.
- Set of transformers. They observe one or many stores, which are responsible for propagating changes in real-time to them. Transformers contain code responsible for computing the desired state based on the observed one.
- Set of syncers. Each has two inputs: One is some store with the observed state, other is the transformer producing the desired state. In some cases though, it is possible to provide more than one transformer to the desired state input of a syncer.
All of these components are run by Processor Runtime goroutine with little exception, Syncers have internally additional goroutines that are executing actual updates (create, delete, and update operations). Those are IO operations, therefore it is necessary to delegate those tasks away from the processor runner.
An important note is that the processor runner MUST NOT execute and IO work, it should be always fast. If necessary, framework allows to run additional goroutines in the processor, which can execute longer operations (or those that can return errors).
One final thing to talk about processors is initial synchronization. When Node boots up initially, its number of processors is 0, and watchers for the observed state are empty. First, watchers need to start observing relevant input collections, as instructed. When they start, before getting real-time updates, they get a current snapshot of the data. Only then will we start getting real-time updates, that happened after point of snapshot in time. Node is responsible for creating as many processors as many tenants it observes. Events from different collections may be out of sync, sometimes we may get tenants after other collections, sometimes before, often both. It is also possible for a watcher to lose connectivity with a server. If disconnection is long enough, it may opt for requesting a snapshot again after successful reconnection. A full snapshot for each tenant is delivered to each corresponding processor. Therefore, when Node provides an “event” to a processor, it must include “Sync” or “LostSync” flags too. In the case of “Sync”, the processor is responsible for generating its diff using its internal Store with the previous snapshot.
Note that each observed input of the processor will get its own “sync” event, and we can’t control the order here. It is considered that:
- Sync/LostSync events must be propagated from inputs/stores to syncers.
- Transformer must send a “Sync” signal when all of its inputs (or stores it uses) are in sync. If at least one gets a LostSync event, then it must propagate LostSync to the Syncer’s desired state.
- Syncer’s desired state is in sync/non-sync depending on events from the transformer(s).
- Syncer’s observed state is in sync/non-sync depending on the sync/lostSync event from the store it observes.
- Syncer’s updater executes updates only when both desired and observed states are in sync. When they both gain a sync event, the syncer executes a fresh snapshot of Create/Update/Delete operations, all previous operations are discarded.
- Syncer’s updater must stop actions when either observed or the desired state loses sync.
- Transformers may postpone desired state calculation till all inputs achieve sync state (developer decides).
Prototyping controllers with proto annotations
In Goten, we first define the structure of the business logic controller (or what is possible) in protobuf files, we define the structure of Nodes, Processors, and their components.
A full reference can be found here: https://github.com/cloudwan/goten/blob/main/annotations/controller.proto. We will discuss some examples here to provide some more clarity.
By convention, in proto/$VERSION
we create a controller
subdirectory
for proto files. In regenerate.sh
we add relevant protoc compiler call,
like in
https://github.com/cloudwan/inventory-manager-example/blob/master/regenerate.sh,
find --goten-controller_out
.
When going through examples, we will explore some common patterns and techniques.
Inventory manager example - Processor and Node definitions.
We can review some examples, first Inventory Manager, definition of a Processor: https://github.com/cloudwan/inventory-manager-example/blob/master/proto/v1/controller/agent_phantom_processor.proto
We can start from the top of the file (imports and top options):
- See
go_package
annotation - this is the location where generated files will be put. Directorycontroller/$version/$processor_module
is a convention we use and recommend for Processors. - Import of
goten.proto
andcontroller.proto
fromgoten/annotations
is required. - We need to import service packages’ main files for the versions we intend to use. For this example, we want to use monitoring from v4, and Inventory manager for v1. Relevant import files were added.
- We also import “common components”, but we will return to it later.
In this file, we define a processor called “AgentPhantomProcessor”. We MAY then optionally specify the types we want to use. This one (CommonInventoryManagerControllerTypes), is specified in the one imported file we mentioned we will come back later to it. Let’s skip explaining this one yet.
The next important part is definitions
. In Goten, resource-type names are
fully qualified with the format $SERVICE_DOMAIN/$RESOURCE_NAME
. This is
how we need to specify resources. Definitions can be used to escape long names
into shorter ones. With the next example, we will also demonstrate another
use case.
In AgentPhantomProcessor, we would like to generate a single PhantomTimeSerie resource per each ReaderAgent in existence. So this is a very simple business logic task, make one additional resource for everyone in another collection.
Since both ReaderAgent and PhantomTimeSerie are project-scoped resources, we want processors to operate per project. Therefore, we declare that “Project” is a scope object in the processor. Then we define two inputs: ReaderAgent and PhantomTimeSerie. Here in protobuf, “input” code-wise will consist of 2 components: Input and Store object (as described in the architecture overview).
We define a single transformer object: AgentPhantomTransformer. There,
we want to notify you that this transformer should produce the desired
collection of PhantomTimeSerie instances, where each will be owned by
some ReaderAgent. It simplifies cleanup, if ReaderAgent is deleted,
the transformer will delete PhantomTimeSerie from the desired collection.
The best transformer type is owner_ownee
in such a situation, where
each output resource belongs to a separate parent.
After transformers, we define syncers, we have one instance, PhantomTimeSerieSyncer. It takes PhantomTimeSerie from the input list as observed input. Then the desired collection must come from AgentPhantomTransformer.
This Processor instance shows us what connects with what, we constructed the structure in a declarative way.
Now let’s come back to types. As we said in the architecture overview,
the Processor consists of input, store, transformer, and syncer objects.
While transformers can be specified only in Processor definitions,
the rest of those little elements can be delegated to type sets
(here it is CommonInventoryManagerControllerTypes). This is optional,
type_sets
are not needed very often, here as well. If they were not
defined, then the compiler would generate all necessary components
implicitly in the same directory indicated by go_package
along
the processor. If type_sets
are defined, then it will try to find
types elsewhere before deciding to generate some on its own.
Separate type_sets
can be used for example to reduce unnecessary
generated code, especially if we have multiple processors using similar
underlying types. In the Inventory manager, it was done for demonstration
purposes only. Let’s see this file though:
https://github.com/cloudwan/inventory-manager-example/blob/master/proto/v1/controller/common_components.proto.
We define here input, store, and syncer components. Note that go_package
is different compared to the one in the processor file. It means that
generated components will reside in a different directory than the processor.
The only benefit here is this separation, but it’s not strictly required.
Finally, note that in the processor we only indicated what the is controller doing, and the connections. However, implementation is not here yet, it will be in the Golang. For now, let’s jump to the Node declaration, which can be found here: https://github.com/cloudwan/inventory-manager-example/blob/master/proto/v1/controller/inventory_manager_ctrl_node.proto
Node is a component managing Processor instances and is responsible
for dispatching real-time updates from all watchers to processors, which
are scoped in this example by an Inventory Manager Project
(inventory-manager.edgelq.com/Project
).
In this example file, we declare a Node called “InventoryManagerCtrl”.
The processors we want to attach are just one element array containing
AgentPhantomProcessor. We potentially could attach more processors, under
one condition though: All must be scoped by the exactly same object. Since
AgentPhantomProcessor is scoped by Project
(inventory-manager.edgelq.com/Project
), other processors would need
the same.
Compiler parsing such a Node definition will automatically detect Scope and all Input resources. What we need to define is:
- Sharding method, since the scope is a Project, the standard sharding for it is “byProjectId”. For organization, it would be “byOrgId”, for service, “byServiceId”. All 3 can be optionally replaced with “byIamScope”. We will return to it when talking about scaling.
- Dispatchment: When Node gets snapshot + real-time updates from watchers
for all resources (Scope + Input), it needs to also know how resources
should be grouped.
- Param
scope_grouping
tells us how the Project is identified. Normally, we want to define Project ID by using its name, if you are unsure, just passmethod: NAME
forscope_grouping
. In result, Node will extract the name field from a Project and use it as a Processor Identifier. - Param
input_groupings
is defined per each detected input resource. In the processor, we definedmonitoring.edgelq.com/PhantomTimeSerie
andinventory-manager.edgelq.com/ReaderAgent
(which were shortened toPhantomTimeSerie
andReaderAgent
). Input groupings instruct Node how each resource instance of a given type should be classified, which means, how to extract ID of the corresponding processor instance. ResourceReaderAgent
is a child of aninventory-manager.edgelq.com/Project
instance according to the api-skeleton. Therefore, we want to indicate that the method for grouping is a “NAME” type. Node can figure out the rest. Resource PhantomTimeSerie is a bit more tricky because its parent resource is notinventory-manager.edgelq.com/Project
, butmonitoring.edgelq.com/Project
. Still, Node will need a method to extract the name ofinventory-manager.edgelq.com/Project
from themonitoring.edgelq.com/PhantomTimeSerie
instance. Because it can’t be done in a declarative way (as of now, the compiler does not figure out things by string value as IAM Authorizer), we must pass theCUSTOM
method. It means that in Golang we provide our function of getting processor ID.
- Param
When deciding on dispatchment annotation, we need to know that Node has
a customizable way of defining Processor Identifier. We need to provide
a way how <Resource Instance>
needs to be mapped into
<Processor Identifier>
, and we need to do this for Scope AND all Input
resources. Method NAME
passed to either scope or input resource means that
Node should just call the GetName()
function on the resource instance
to get an Identifier. It will work for same-service resources, but for others
like PhantomTimeSerie not, GetName returned by it would eventually point to
Project in monitoring service.
Although the GetName()
method on the ReaderAgent
instance would return
the Name of ReaderAgent than a Project, Node is able to notice that the Name
of ReaderAgent contains also the name of the project.
Applications example - Processor and Node definitions.
We have a more interesting example of a Controller in
applications.edgelq.com
. We have a Controller processor responsible
for Pods management, we say that “Pod management” is a business logic task.
There are two things we want from such a processor:
- Create a Pod instance per each matched Device and Distribution.
- Whenever the Device goes offline, we want to mark all its Pods as offline.
Business notes:
- One Device can host multiple Pods, and Distribution can create Pods across many devices. Still, Device+Distribution can have one Pod instance at most.
- Pod can be deployed manually, not via Distribution. Not all pods are of distribution type.
- When the Device gets online, it will Update pod statuses itself. But when it goes offline, the controller will need to do it. Note that it means, that basically, the controller will need to track pods of offline status.
- If the device is offline, the pod status should be “Unknown”.
- Resources Pods, Distribution, and Devices are project scoped, however,
Device belongs to the
devices.edgelq.com
service, and other resources toapplications.edgelq.com
. Still, the Project is our primary scope.
With this knowledge, we can draft the following Processor declaration: https://github.com/cloudwan/edgelq/blob/main/applications/proto/v1/controller/pods_processor.proto
Compared to the previous example, goten.controller.type_set
is declared
in the same file, but for now let’s skip this part, and talk about
the processor first. There, we have the PodsProcessor
type defined.
As we can deduce from business notes, the Scope resource should be “Project”,
inputs should be clear too. Then we have two transformers, one per business
task defined. You should also note that we have two additional definitions
of the applications.edgelq.com/Pod
instance. One is DistributionPod,
other is UnknownStatePod. As mentioned in business notes, not all pods belong
to distribution, and pods with unknown states are considered also a subset
of all notes. Those extra definitions can be used to differentiate between
types and help write proper controllers.
Transformer DistributionController is of known already type, owner/ownee. But in this case, each Pod instance is owned by a unique combination of Distributions and Devices. Also, when either of the parents is deleted, all associated pods will be automatically deleted.
Another transformer, UnknownStateTracker
, is of a different kind: Generic.
This type of transformer just takes some number of inputs, and then produces
some number of outputs. In this particular case, we want to just have some
Devices and Pods, where each belongs to a specific Device only. For each
offline Device, we want to mark its Pods as of Unknown state. Generic type
requires more code implementation and developers need to handle all input
events: Additions, updates, and deletions too. For each change in the input
resources new snapshot of the output (or DIFF to the snapshot) is required.
One alternative we could have used is a Decorator:
{
name : "UnknownStateStatusDecorator"
decorator : {
resource : "Pod"
additional_inputs : [ "Device" ]
}
}
The decorator takes the same resource on the output, in this case, when Pod
is changed, the decorator function will be called to decorate Pod resource.
There, we could get the Device record owning pod, check the Device status,
and then mark the Pod status. If the device changes, then it would trigger
a re-compute of all Pods it belongs to (decorator is called again). We did
not use this decorator here, because the Controller should only mark Pod
status as UNKNOWN when the Device is offline. When the Device is online,
it needs to manage its Pod statuses. This “shared” ownership means that
the decorator was not exactly suitable, instead, we may need to use
a “generic” type, and output pods that have UNKNOWN status. The controller
needs to run UpdatePod for only offline device pods. If the device gets online,
the controller should “forget” about those pods. What do we mean:
UnknownStateTracker
DELETES pods from output collection if
the device becomes online (it’s not the same as actually Deleting pods!).
This is why the output from UnknownStateTracker
is UnknownStatePod
, not
Pod
. We want to show that output contains pods with unknown status,
not all pods. We will come back to this when commenting on
the implementation in Go.
We also will be re-checking offline pods periodically, producing snapshots (per scope project) after each period. By default, transformer would be kicked only when some Pod/Device changes (create, update, delete).
Now going back to goten.controller.type_set
- there, we defined only
the Store component for a Pod resource, with one custom index, even though
we have multiple resource definitions in the processor. As we mentioned,
this type set is an optional annotation and the compiler can generate
missing bits on its own. In this particular case, we wanted to have
a custom index for pods, field path spec.node
defines a field path
to the Device resource. This index gives us just some convenience in
the code later on. Anyway, this is another use case for type sets,
the ability to enhance default types we would get from the code-generation
compiler.
Node definition can be found here: https://github.com/cloudwan/edgelq/blob/main/applications/proto/v1/controller/applications_ctrl_node.proto
However, in this case, it is pretty much the same as in Inventory Manager.
Overview of generated code and implementing missing bits
The best way to discuss controller code is by examples again, we will check the example Inventory Manager and Application.
Inventory manager
In InventoryManager, we want the following feature: a time series showing the history of online/offline changes per agent. First, each agent runtime should be sending an online signal within the interval (1 minute), using a CreateTimeSeries call from monitoring. When an agent goes offline, it can be sending “offline” though - instead, we need to generate a PhantomTimeSerie object per each agent, so it can generate data when original metrics are missing. This is how we obtain online/offline history, zeroes are filling periods of offline, “ones” of online parts. This is a task we did for the Inventory Manager.
The controller code can be found here: https://github.com/cloudwan/inventory-manager-example/tree/master/controller/v1.
As with the rest of the packages, file names with .pb.
are generated,
otherwise handwritten. Directory common
from there contain only generated
types, as pointed out by the proto file for type_sets
. More interesting is
the agent phantom processor to be found here:
https://github.com/cloudwan/inventory-manager-example/tree/master/controller/v1/agent_phantom.
We should start examining examples from there.
The first file is https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/agent_phantom/agent_phantom_processor.pb.go.
It contains the Processor object and all its methods. We can notice the following:
- In the constructor
NewAgentPhantomProcessor
, we are creating all processor components as described by the protobuf file for a processor. Connections are done automatically. - Constructor gets an instance of
AgentPhantomProcessorCustomizer
, which we will need to implement. - The processor has a “runner” object, this is the “heart” of the processor handling all the events.
- Processor has a set of getters for all components, including runner and scope object.
- Processor has
AddExtraRunner
function, where we can add extra procedures running on separate goroutines, doing some extra tasks not predicted by processor proto definition. - Interface
AgentPhantomProcessorCustomizer
has an extra default partial implementation.
In the customizer, we can:
-
Add PreInit and PostInit handlers
PreInit is called for a processor with all internal components not initialized. PostInit is done after initial construction is completed (but not after it runs).
-
We have
StoreConfig
calls, which can be used to additionally customize Store objects. You can check the code to see the options, one option is to provide an additional filter applied to the store, so we don’t see all resources. -
Functions ending with
ConfigAndHandlers
are for Syncer objects. We will have to implement them. This is for the final configuration & tuning of Syncers. -
Functions ending with
ConfigAndImpl
must be used to customize transformers. -
We can also hook a handler in case the Scope object changes itself (like, some fields in the Project). Usually, it is left empty, but we may hit some use cases for it still.
After reviewing the processor file, you should see the processor customizer implementation. This is a handwritten file, here example for InventoryManager: https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/agent_phantom/agent_phantom_processor.go.
Constructor we can define however we want. Then, for implementation notes:
-
For PhantomTimeSerieStoreConfig, we want to ensure to filter out PhantomTimeSeries that are not of specific metric type, of which we don’t have specific meta owner types. This may often be redundant because we can define the proper filter for PhantomTimeSerie objects themselves (in a different file, we will come back to it).
-
In the function AgentPhantomTransformerConfigAndImpl, we need to return an implementation handler that must satisfy the specific interface required by the transformer. In the config file, usually, provide a reconciliation mask. These masks are used to prevent the triggering of the transformer function for non-interesting updates. In this example, we are checking field paths
online
,activation.status
, andlocation
. It means, that if some of those fields change in ReaderAgent, then we will need to trigger the transformer to recompute PhantomTimeSerie objects (for this agent only). Reconciliation mask helps reduce unnecessary work. If someone changed let’s say display name of the agent, then no work would be triggered. -
In function PhantomTimeSerieSyncerConfigAndHandlers we are customizing Syncer for PhantomTimeSeries objects. In the config part, we almost always need to provide update mask, fields that are maintained by the controller. We also may provide information on what to do in case of duplicated resource detection - by default we delete them, but it may be OK to provide this value explicitly (AllowDuplicates is false). Apart from that, there is some quirk about PhantomTimeSerie instances:
Fields
resource
andmetric
are non-updatable. Because of that, we need to disable updatesUpdatesDisabled
. It is recommended to review all options in the code itself to see what else can change. Handlers for syncer are a bit tricky here, we could have just returnedNewDefaultPhantomTimeSerieSyncerHandlers
, but we need some special cases, which is common for PhantomTimeSerie instances. We will come back later to it. -
In the function
PostInit
we are providing extra goroutine, ConnectionTracker. It is doing work unpredicted by the controller framework for now and needs some IO. For those reasons, it is highly recommended to delegate this work on a separate goroutine. This component will also get updates from the ReaderAgent store (create, update, delete).
Let’s first discuss the phantomTimeSerieSyncerHandlers
object. It extends
generated common.PhantomTimeSerieSyncerHandlers
. Custom handlers are quite
powerful tools, we can customize even how the object is created/updated/deleted,
by default, it uses standard Create/Update/Delete methods, but it does not
need to be this way. In this particular case, we want to customize identifier
extraction from the PhantomTimeSerie resource instance. We created a key
instance for this defined here:
https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/agent_phantom/agent_phantom_key.go.
By default, the identifier of a resource is just simply extracted from
the name
field. However, PhantomTimeSerie is very special in this manner:
This resource has a non-predictable name! All CreatePhantomTimeSerie requests
must not specify its name, it’s assigned by a server during creation. This
has nothing to do with the controller, it is part of the PhantomTimeSerie
spec in monitoring. For this reason, we are extracting some fields that
we know will be unique. Since we know that for a given ReaderAgent we will
generate only one “Online” metric, we use just the agent name extracted
from metadata along metric type value. This customized syncer will then
match desired and observed PhantomTimeSerie resources using those custom IDs.
Connection tracker defined here:
https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/agent_phantom/connection_tracker.go
shows some examples of controller tasks that were not predicted by
the controller framework. It is being run on a separate goroutine,
however, OnReaderAgentSet
and OnReaderAgentDeleted
are called
by the processor runner, the main goroutine of the processor. This
mandates some protection. Golang’s channels may have been used perhaps,
but we need to note that they have limited capacity, if they get full
processing threads stalls. Maps with traditional locks are safer
in this manner and are often used in SPEKTRA Edge, which solved some issues
when there were sudden floods of updates. The benefit of maps is that
they can merge multiple updates at once (overrides). With channels,
we would need to process all individual elements.
Going back to comments about implementation: As we said, we are ensuring monitoring has a time series per each agent showing if the agent was online or offline at a given time point. However, to synchronize the “online” flag, we are periodically asking for monitoring for time series for all agents, then flip flags if they mismatch with the desired value.
Let’s move forward, to files for the transformer. The generated one can be found here: https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/agent_phantom/agent_phantom_transformer.pb.go.
Notes:
- The interface we need to notice is
AgentPhantomTransformerImpl
, this one needs implementation from us. - Config structure
AgentPhantomTransformerConfig
, which needs to be provided by us. - In transformer code, we are already handling all events related to input resources, including deletions. This reduces the required interface from AgentPhantomTransformerImpl to a minimum, we just need to compute desired resources for a given input.
- Note that this config and impl are provided by your customizer implementation for the processor.
The file with the implementation for the transformer is here: https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/agent_phantom/agent_phantom_transformer.go.
Notes:
- For a single ReaderAgent, we may have potentially N output resources (PhantomTimeSerie here).
- When we create DESIRED PhantomTimeSerie, note that we provide only
the parent part of the
name
field, When we callNewNameBuilder
, we are NOT calling SetId. As part of PhantomTimeSerie spec, we can only provide parent names, but never own ID. This must be generated by a server. Note this combines with custom PhantomTimeSerie Syncer handlers, where we extract ID frommetadata.ownerReferences
andmetric.type
. - PhantomTimeSerie is constructed with service ownership info and ReaderAgent. This ensures that we will own this resource, not another service. Metadata ensures PhantomTimeSeries will be cleaned up (this is an additional cleanup guarantee, as a transformer of owner/ownee type can provide the same functionality).
To summarize, when implementing a Processor, it is necessary to (at the minimum):
- Provide all transformers implementations and define their configs.
- Provide an implementation of processor customizer, at the minimum, it needs to provide objects for syncers and transformers.
We need to provide missing implementation though not just for processors, but nodes too. You can typically find three code-generated files for nodes (for InventoryManager example):
- https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/inventory_manager_ctrl.pb.node.go
- https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/inventory_manager_ctrl.pb.node_manager.go
- https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/inventory_manager_ctrl.pb.dispatch_helper.go
The main file to review is the one ending with the pb.node.go
name.
In the constructor, it creates all watcher instances for all scope and
input resources for all processors. It manages a set of processors per
project (in this case we have one processor, but more could be available).
All updates from watchers are distributed to relevant processors. It is
quite a big file, initially, you may just remember, that this component
just watches all collections in real-time and pushes updates to Processors,
so they can react. However, at the top of the file, there are four types
you need to see:
-
InventoryManagerCtrlFieldMasks
Its generic name is
<NodeName>FieldMasks
. -
InventoryManagerCtrlFilters
Its generic name is
<NodeName>Filters
. -
InventoryManagerCtrlCleaner
Its generic name is
<NodeName>Cleaner
. -
InventoryManagerCtrlNodeCustomizer
Its generic name is
<NodeName>NodeCustomizer
.
Of these types, the most important for developers is NodeCustomizer. Developers should implement its functions:
Filters()
needs to return filters for all input resources (From all processors) and scope resources. This is important, the controller should only know the resources it needs to know!FieldMasks()
needs to return field masks for all input resources (From all processors) and scope resources. It is very beneficial to return only the fields the controller needs to know, especially considering that the controller will need to keep those objects in RAM! However, be aware to include all needed fields, those needed by dispatchment (typically name), those needed by transformers and reconciliation masks, and all fields required by syncers (Update masks!).- Function
GetScopeIdentifierForPhantomTimeSerie
(orGetScopeIdentifierFor<Resource>
) was generated because in protobuf, in dispatchment annotation for PhantomTimeSerie, we declared that the identifier is using theCUSTOM
method! - Function
CustomizedCleaner
should return a cleaner that handles orphaned resources in case Scope resource (Project here) is deleted, but some kid resources exist. However, in 99.99% of cases, this functionality is not needed. When the Project is deleted, then all kid resources are cleaned up asynchronously by the db-controller. - Function
AgentPhantomProcessorCustomizer
must return a customizer for each Processor and scope object.
Developers need to implement a customizer, for the inventory manager we have the file: https://github.com/cloudwan/inventory-manager-example/blob/master/controller/v1/inventory_manager_ctrl_node.go.
Notes:
- For GetScopeIdentifierForPhantomTimeSerie, we need to return the name object of the Inventory Manager project. Using the name of a PhantomTimeSerie is very easy though. We may find some autodetection in the future: If the name pattern matches across resources, then the developer won’t need to provide those simple functions.
- In the FieldMask call, the Mask for ReaderAgent needs to be checked against all fields used in the processor - reconciliation mask and connection tracker. The name should always be included.
- In the Filters call, we need to consider a couple of things:
- We may have multi-region env, and each region will have its controllers. Typically, for regional resources, we should get those belonging to our region (ReaderAgents or PhantomTimeSeries). Projects should we get that can be in our region, so we filter by enabled regions.
- Resources from core SPEKTRA Edge services we should filter by our service, ideally by owned. Otherwise, we would get PermissionDenied.
- Customizer for processor construction should be straightforward. Any extra params were provided upfront, passed to the node customizer.
To see a final top-level implementation bit for the business logic controller for InventoryManager, see https://github.com/cloudwan/inventory-manager-example/blob/master/cmd/inventorymanagercontroller/main.go.
Find the NewInventoryManagerCtrlNodeManager
call, this is how we
construct node manager and how we pass our node customizer there.
It should conclude this example.
Applications controller implementation
Controller implementation for applications can be found here: https://github.com/cloudwan/edgelq/tree/main/applications/controller/v1.
It contains additional information compared to Inventory Manager so let’s go through it, but skip common parts with previous example.
Starting from the processor, the main files are:
- https://github.com/cloudwan/edgelq/blob/main/applications/controller/v1/pods/pods_processor.pb.go (Processor itself)
- https://github.com/cloudwan/edgelq/blob/main/applications/controller/v1/pods/pods_processor.go (Customizer)
As described in the protobuf part, we have essentially two transformers and two syncers, for two different sub-tasks of general pod processing.
Let’s start with a transformer called DistributionController
. For
a quick recap, this transformer is producing Pods based on combined
Device and Distribution resources, each matched Device + Distribution
should produce a Pod, called DistributionPod
in protobuf. Not all
pods belong to Distribution though! Some may be deployed manually
by clients.
You should start examining code from the processor customizer (link above).
In the function DistributionControllerConfigAndImpl
of customizer
we are creating a config, that reacts to specific field path changes
for Distributions and Devices. At least as for now, distribution is
matched with the device based solely on metadata.labels
field path
in Device, so this is what we check in Device. For Distribution, we want to
recompute pods if the selector or pod template changes, other updates
to Distributions should not trigger Pod re-computation! Also, note that
the implementation object also can have Store instances, so we can access
the current state. This will be necessary.
In the transformer,
https://github.com/cloudwan/edgelq/blob/main/applications/controller/v1/pods/distribution_controller.go,
there are additional knowledge elements. Since we know that this transformer
is meta-ownee type, BUT we have two owners, we must implement two functions,
one computing Pods for Distribution across all matched devices, other
computing Pods for Device across Distributions. Note that each DESIRED
generated pod does not have a clear static name value,
GenerateResourceIdFromElements
may have non-deterministic elements. We will
need to reflect this when configuring Syncer for the DistributionPod
type.
This is how pods are generated. To continue analyzing the behavior of
Distribution pods, go back to the customizer for the processor and find
the function PodFactoryConfigAndHandlers
. Config with an update mask
only seems ordinary, but there is an example of limit integration. First,
we construct default handlers. Then, we are attaching the limits guard.
Note that Pods are subject to limits! There is a possibility that we will
fit with Devices/Distributions in the plan, but we would exceed Pods.
In such a situation, Syncer must:
- Stop executing CreatePod if we did hit a limit.
- UpdatePod should continue being executed as normal.
- DeletePod should be executed as normal.
Once we have a free limit (as a result of plan change or deleted other pods), creation should be resumed. Limits guard is a component that must be used if we may be creating resources in the limits plan! Note also that in the PostInit call, we must additionally configure the limits guard.
For the pod factory syncer, we also provided some other syncer customizations:
- Identifiers of pods have a custom implementation, since the pod name may be non-deterministic.
- We must be very careful of what we delete! Note that in the protobuf section for PodFactory Syncer desired state takes pods from the DistributionController transformer. But the observed state contains ALL pods! To prevent the wrong deletion, we must provide additional CanProceedWithDeletion.
Let’s move on to the next transformer and syncer, handling unknown state pods. As a recap, the controller must mark pods whose device went offline as of UNKNOWN status. A set of unknown pods is a collection on its own (UnknownStatePod). When the Device gets online, we will need to remove the pods belonging there. We want to recompute snapshots of unknown state pods periodically - so this is what we declared in protobuf.
Starting with the transformer, we have the UnknownStateTrackerConfigAndImpl
function used to customize it and examine it, it is in the Customizer
implementation for PodsProcessor. Note that the config object has now
a SnapshotTimeout
variable. This timeout decides the interval how often
the desired collection is re-computed (in this case!). Note that we declared
this transformer as periodic-snapshot generic type.
See transformer-generated file and handwritten customization:
- https://github.com/cloudwan/edgelq/blob/main/applications/controller/v1/pods/unknown_state_tracker.pb.go
- https://github.com/cloudwan/edgelq/blob/main/applications/controller/v1/pods/unknown_state_tracker.go
From the PB file, note that the minimum implementation required is called
CalculateSnapshot
, which is called periodically as instructed. This is
the minimum we require from implementation.
However, if you examine it carefully, you can notice code like this:
onDeviceSetExec, ok := t.Implementation.(interface {
OnDeviceSet(
ctx context.Context,
current, previous *device.Device,
) *UnknownStateTrackerDiffActions })
if ok {
t.ProcessActions(
ctx,
onDeviceSetExec.OnDeviceSet(ctx, current, previous),
)
}
Basically, all generic transformers allow additional custom interfaces
for implementations, generally, On<Resource>Set
and On<Resource>Deleted
calls for each input resource. Those allow us to update desired collections
much faster!
There is also an additional benefit of implementing those optional methods:
- For generic, without periodic snapshot transformers, this avoids
the
CalculateSnapshot
call entirely. In regular generic transformers, if the implementation does not implement theOn<Resource><SetOrDeleted>
call, the snapshot is triggered with a delay specified by theSnapshotTimeout
variable (different behavior than periodic snapshot!). To avoid some extra CPU work, it is recommended to implement optional methods.
For this particular transformer, in file https://github.com/cloudwan/edgelq/blob/main/applications/controller/v1/pods/unknown_state_tracker.go, we implemented basic snapshot computation, where we get all pods with unknown statuses based on the last heartbeat from devices. However, we also implemented OnDeviceSet and OnDeviceDeleted. The set is especially important, when the Device gets online, we want to remove pods with unknown states from the desired collection ASAP. If we waited for timeout (more than 10 seconds), there is a possibility Device will mark pods online, but our controller would mark them unknown till timeout happens. This mechanism may be improved in the future though, even now we risk having two to three additional updates unnecessarily.
Going back to customizer (file pods_processor.go
), see finally
UnknownStateMaintainerConfigAndHandlers
. We are again using Syncer
for pods, but it’s a separate instance with a different update mask. We
just want to control specific fields only, related to the status. Note that
as in Distribution pods, the observed state contains ALL pods, but the
desired state is only those with unknown status. To avoid bad deletions,
we are disabling deletions entirely, creations too, as we don’t need them.
We can now exit processor type and examine node customizer, which can be seen here: https://github.com/cloudwan/edgelq/blob/main/applications/controller/v1/applications_ctrl_node.go.
It is very similar to customizer for Inventory Manager, with some additional info:
- Note we are passing limits observer instance for limits guard integration. We will return to it shortly.
- For the
Filters()
call, we need to note that the Distribution resource is non-regional - in fact, its instances are copied to all regions where the project is present in the multi-region environment. In those situations, we should filter by themetadata.syncing.regions
field. This will return all distributions for all projects enabled in our region, which is basically what we need.
For the limits guard integration, also see a controller main.go
file:
https://github.com/cloudwan/edgelq/blob/main/applications/cmd/applicationscontroller/main.go.
Note that there we are constructing limits observer:
limitsObserver := v1ctrllg.NewLimitTrackersObserver(ctx, envRegistry)
We also need to run it:
g.Go(func() error {
return limitsObserver.Run(gctx)
})
Limits observer instance should be global for the whole NodesManager, and be declared before, in the main!
Scaling considerations
Note that processors, to work properly, need to have:
- Scope object (Project typically)
- All input collections (snapshot of each of them)
- All desired collections
In the case of multiple processors, input collections may be shared, but that’s not the point here, the point is, that the controller will need to have sufficient RAM, at least for now. This may be improved in the future, for example with disk usage. It won’t change the fact, that the Controller node needs to handle all assigned scope objects and their collections.
First, to minimize memory footprint, provide field masks for all collections, but be careful to include all necessary paths, there were bugs because of missing values! Then we need some horizontal scaling. For this, we use sharding.
Sharding divides resources into some groups. Note that in both examples
we used byProjectId
, declared explicitly in the protobuf file for
Inventory Manager and Applications controller. This project ID sharding
means that each Node instance will get a share of projects, not all
of them. If we have only one Node, then it will contain data for all
projects. But, if we have multiple Nodes, projects will be spread across
them. Sharding by project also guarantees that resources belonging
to one project will always belong to the same shard, this is why it is
called byProjectId. For each resource, we extract a name, then we extract
the project ID part from it and hash it. Hash is some large integer value,
like int64. We need to know how big the ring is: 16, 256, 4096… For each
ring, we modulo ring size and we get the shard number. For example
byProjectId hash mod 16 gives us the byProjectIdMod16
shard key. Those
values are saved in metadata.shards
for each resource. This is done by
sharding store plugins on the server side. Note that the field
metadata.shards
is a map<string, int64>
. See
https://github.com/cloudwan/goten/blob/main/types/meta.proto.
The ring size we use everywhere is 16 now, meaning we could potentially divide work across 16 nodes for all controller nodes.
When the first node starts, it will get assigned 16 shards, a value from 0 to 15. If the second node starts, it will get some random starting point, let’s say from 10-15, while the first node keeps 0-9. When the third node starts, it grabs some new random range, like 0-3. The remaining nodes are left with 4-9 and 10-15. It can continue till we are blocked by ring size and scaling is no longer effective.
Note that when the node starts, it can lower pressure on the two nodes at
the maximum, not all. For this reason, we have a thing called Node Managers
in all controllers, in all examples. We are building node managers in
main.go
files in the first place. Node managers start with one to four
virtual node instances, but the most common is two. This way, when the new
runtime starts, we have a good chance of taking pressure off from more
instances.
Node managers are responsible for communicating with each other and assigning shards to their nodes. As of now, we use a Redis instance for this purpose. If you examined generated files for nodes, you could see that each Node has a method for updating the shard range. Shard ranges add additional filter conditions to filters passed from the node customizer instance.
With Kubernetes Horizontal Pod Autoscaler we are solving some issues with scaling, by splitting projects across more instances. This gives us some room for breath. But we have remaining 2 issues:
- A super large project could potentially outgrow the controller.
- Super large shards (lots of projects assigned to the same value) can be too massive.
For the first issue, we could leverage multi-region env, like we already did for example, we get resources mostly from our region only, so large projects can be further split across regions. Still, we may get hit with a large project-region.
For the second issue, we could switch to a larger ring size: like 256. However, it means we will have lots of controller instances, like 20 or more. Controllers also induce their overhead, meaning that we are wasting plenty of resources just for a large number of instances.
Presented techniques still provide us with some flexibility and horizontal scaling. To scale further, we can:
- Introduce improvements in the framework, so it can compress data, use disk, or even “forget” data and retrieve it on demand.
- Use diagonal scaling - use horizontal autoscaling first (like in Kubernetes), then, if the number of instances hits some alert (like 4 pods), then we can increase assigned memory in the YAML declaration and redeploy.
Diagonal autoscaling with automation in one axis may be most efficient, even though it will require little reaction from the operator, to handle the alert and increase values in yaml. Note however this simple action also has a potential for automation.