SPEKTRA Edge Wide-Column Usage Example

Understanding the SPEKTRA Edge wide-column usage through example.
package example

import (
  "context"
  "fmt"
  "time"

  wcde "github.com/cloudwan/edgelq/common/widecolumn/dataentry"
  wcstore "github.com/cloudwan/edgelq/common/widecolumn/store"
  wcuid "github.com/cloudwan/edgelq/common/widecolumn/uid"
)

func RunDemo(ctx context.Context, store wcstore.WCStoreV3) error {
  // Create reusable fields descriptor. Define two key fields:
  // serial_number (no flags - 0), no default value, required, not
  // negative filters allowed product (no flags - 0), no default value,
  // required, not negative filters allowed.
  fieldsDesc := wcuid.NewKeyDescriptor().
    AddSKId("serial_number", 0, "", true, false).
    AddSKId("product", 0, "", true, false)

  // Allocate string - int mappings for later. Now keep fieldsDesc "forever".
  // This part can be completed at the startup.
  strUIDs := fieldsDesc.GetUnresolvedStrUIDs()
  uids, err := store.ResolveOrAllocateStrUIDs(ctx, strUIDs)
  if err != nil {
    return err
  }
  fieldsDesc.ResolveSKVsInBulk(strUIDs, uids)

  // Create specific key - err is if for example key is repeated or required
  // key is not provided. Undefined key are stored as unclassified - but they
  // can be classified later providing more field descriptors.
  describedSKey, err := fieldsDesc.NewDescribedSKeyBuilder().
    Add("serial_number", "1234").
    Add("product", "temp_sensor").Build()
  if err != nil {
    return err
  }

  // Prepare indices by which data must be split into. We want to index
  // by serial_number only.
  indices := []*wcuid.TableIndex{wcuid.NewTableIndex([]wcuid.UID{
    wcuid.KIdUID(
      fieldsDesc.GetFieldDescriptorBySKey("serial_number").GetKey(),
      0,
    ),
  })}

  // Prepare data to write. We could have allocated skey into key, but
  // we dont need.
  indexedValues := make([]wcstore.IndexedKeyedValues[*wcde.ColumnValueV3], 0)
  indexedValues = append(indexedValues, wcstore.IndexedKeyedValues[*wcde.ColumnValueV3]{
    Indices: indices,
    KeyedValues: wcstore.KeyedValues[*wcde.ColumnValueV3]{
      SKey: describedSKey.FormatRawKey(),
      Values: []*wcde.ColumnValueV3{
        {
          Family: "temperatures",
          Time:   time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC),
          Data:   marshalDouble(123.4),
        },
        {
          Family: "temperatures",
          Time:   time.Date(2020, 1, 1, 0, 1, 0, 0, time.UTC),
          Data:   marshalDouble(124.4),
        },
      },
    },
  })

  // Save the data
  err = store.Write(ctx, indexedValues)
  if err != nil {
    return err
  }

  // Prepare query - we will get only one of two points saved.
  // We filter by specific serial number and all keys should be automatically
  // validated (true flag)
  filter := wcstore.NewFilterFromSKVs([]wcuid.SKV{
    wcuid.NewSKIV("serial_number", "1234"),
  }, true)
  query := &wcstore.QueryV3{
    StartTime: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC),
    EndTime:   time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC),
    OrGroups:  []*wcstore.QueryV3OrGroup{
      {
        Indices:      indices,
        Filter:       filter,
        ColumnFamily: "temperatures",
      },
    },
  }

  tempsByKey := make(map[wcuid.Key][]float64, 0)
  uniqueKeys := make([]*wcuid.Key, 0)
  err = store.Query(
    ctx,
    query,
    func(
      ctx context.Context,
      seq wcde.RowSequenceV3,
      entries []wcstore.KeyedValues[*wcde.ColumnValueV3],
      isDone bool,
    ) (bool, error) {
      for _, entry := range entries {
        temps, ok := tempsByKey[*entry.Key]
        if !ok {
          temps = make([]float64, 0)
          uniqueKeys = append(uniqueKeys, entry.Key)
        }
        for _, v := range entry.Values {
          temps = append(temps, unmarshalDouble(v.Data))
        }
        tempsByKey[*entry.Key] = temps
      }
    
      // Return true to continue, false to stop
      return true, nil
    },
  )
  if err != nil {
    return err
  }
  
  skeys, err := store.ResolveKeys(ctx, uniqueKeys)
  if err != nil {
    return err
  }
  for i, key := range uniqueKeys {
    temps := tempsByKey[*key]
    descSKey, _ := fieldsDesc.ValidateSKey(skeys[i], false)
    serialNumber := descSKey.GetSVid("serial_number")
    fmt.Sprintf(
      "Serial number %s has %d temperatures: %v",
      serialNumber,
      len(temps),
      temps,
    )
  }
  return nil
}

It is important to learn how to use indices efficiently, basically to successfully query data, a filter query must define all fields that are defined in at least one index. It is like looking for a proper data partition. Tail keys are filtered out in memory (if there are filter conditions for them), but data is still being pulled.

Note also an important quirk about widecolumn storage: We need to specify indices both when writing and when querying. Widecolumn can detect what index is most optimal, but it does not remember them. There are no functions for saving/getting indices. It is the responsibility of a user to provide indices when saving, and a list of available when querying!

It is generally “fine”, however - all our implementations use descriptors which we need to get from the local database when validating user input anyway! If we get it from the database, then it makes no sense to have extra DB lookups when reading indices. They are used when writing, querying, and also validating.