At the end of Part 3 my aggregation implementation was being processed within a transaction.

Processing incomplete aggregations

My simple aggregation function, which has been the same since Part 1 looks like this:

func Strategy(evt *SecurityNotification, s Aggregation) (*AggregateNotification, Aggregation) {
    s = append(s, evt)
    if len(s) == 3  OR evt.PRIORITY == HIGH {
        return &AggregateNotification{
            Notifications: s,
        }, nil
    }
    return nil, s
}

This emits a notification if it receives 3 notifications, or if the event it receives is HIGH priority.

This means I can have aggregations that are not complete, and may never complete, depending on your requirements, this might not be an issue, but for the purposes of this example, I’m going to say that I want a notification if there are unreleased notifications after three hours, there are lots of options here, it might be that incomplete aggregations should be deleted if they never complete, or perhaps not receiving an aggregation for a period is a notifiable error case.

First off, I need to make some changes to extend the model to make things a bit easier.

Extending the aggregation model

This requires some changes in the Aggregation model I’ve been using since part 1.

type Aggregation struct {
    Email         string
    LastUpdated     time.Time
    Notifications []*SecurityNotification
}

I could work out the LastUpdated time by sorting the Timestamp fields on the included notifications (I could do this for the Email too), but this is a bit more explicit.

With this, I need to write the new business logic to handle what to do when checking an Aggregation without an event.

func Strategy(evt *SecurityNotification, s *Aggregation) (*AggregateNotification, *Aggregation) {
    s.Notifications = append(s.Notifications, evt)
    if len(s.Notifications) == 3 || evt.Priority == HIGH {
        return aggregationToNotification(s), nil
    }
    return nil, s
}

func StrategyWithoutNotification(s *Aggregation) (*AggregateNotification, *Aggregation) {
    cutOffTime := clock().Add(time.Hour * -3)
    if cutOffTime.Before(s.LastUpdated) {
        return nil, s
    }
    if len(s.Notifications) > 0 {
        return aggregationToNotification(s), nil
    }
    return nil, nil
}


func aggregationToNotification(s *Aggregation) *AggregateNotification {
    return &AggregateNotification{
        Email:         s.Email,
        Notifications: s.Notifications,
    }
}

StrategyWithoutNotification looks to see if the last update time was more than 3 hours ago, if it’s not, then it can return the existing state, and no notification.

If there are unsent notifications, then it sends them, and updates the new state to nil, otherwise it just updates the new state to nil.

Processing aggregations without events

With badger, I can iterate over the aggregations fairly easily, it grabs a read/write transaction and iterates over the matching keys, this is why having a key prefix is really useful.

func (a *AggregateStore) ProcessAggregations(p AggregationProcessor) error {
    err := a.db.Update(func(txn *badger.Txn) error {
        it := txn.NewIterator(badger.DefaultIteratorOptions)
        defer it.Close()
        prefix := []byte(defaultPrefix)
        for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
            item := it.Item()
            err := processAggregation(txn, item, item.Key(), p)
            if err != nil {
                return err
            }
        }
        return nil
    })
    return err
}

AggregationProcessor is an interface that looks like this:

// AggregationProcessor handles bulk aggregation processing.
//
// There is no new event in this case, if it returns nil, then the existing
// aggregation state can be removed from the aggregation store.
type AggregationProcessor interface {
    ProcessWithoutEvent(*Aggregation) (*Aggregation, error)
}

Most of the magic in ProcessAggregations is in the processAggregation function:

func processAggregation(txn *badger.Txn, item *badger.Item, key []byte, p AggregationProcessor) error {
    err := item.Value(func(v []byte) error {
        state, err := unmarshal(v)
        if err != nil {
            return err
        }
        newState, err := p.Process(state)
        if err != nil {
            return err
        }
        if newState == nil {
            return txn.Delete(key)
        }
        return nil
    })
    return err
}

This unmarshals the current state, calls the AggregationProcessor implementation, passing it the current state, and if it returns nil, it deletes the state entry.

There’s no facility in this implementation for updating the state, but this could easily save the returned state back to the store.

So, detecting aggregations that have timed out is as easy as triggering ProcessAggregations on the store on a regular basis, this could be easily driven by a Goroutine and a Ticker in a select loop, at scale, this might need to be split into key-shards, with routines processing each shard individually, or perhaps indexing the last-updated timestamps, or even switching to a more appropriate store.

A simple extension to the PublishingProcessor from part-3 makes it possible to use the same processor for each:

func (p *PublishingProcessor) ProcessWithoutEvent(existingState *Aggregation) (*Aggregation, error) {
    notification, newState := StrategyWithoutEvent(oldState)
    if notification == nil {
        return newState, nil
    }
    log.Printf("publishing %#v\n", notification)
    return newState, nil
}

Again, if this fails to publish, then it should return an error.

Duplicate aggregation prevention

In the book description of the Aggregator pattern, there’s the following advice.

Depending on the aggregation strategy, the Aggregator may have to deal with the situation that an incoming message belongs to an aggregate that has already been closed out—that is, the incoming message arrives after the aggregate message has been published. In order to avoid starting a new aggregate, the Aggregator must keep a list of aggregates that have been closed out. This list should be purged periodically so that it does not grow indefinitely. However, we need to be careful not to purge closed-out aggregates too soon because that would cause any messages that are delayed to start a new aggregate.

This is easy enough to do in the ProcessNotification implementation, adding a field to the Aggregation to reflect the fact that this aggregation had been sent would be enough, and the ProcessAggregations mechanism will take care of cleaning up the data.

func Strategy(evt *SecurityNotification, s *Aggregation) (*AggregateNotification, *Aggregation) {
    if s.Sent {
        return nil, s
    }
    s.Notifications = append(s.Notifications, evt)
    if len(s.Notifications) == 3 || evt.Priority == HIGH {
        return aggregationToNotification(s), nil
    }
    return nil, s
}

With this approach, it’s necessary to return an Aggregation that retains those properties, rather than nil, so, when emitting new notifications, the existing ones need to be truncated.

Rendezvous aggregation

Sometimes, you want to wait for two (or more) different types of events that have a collation ID, in this case, the ProcessNotification method takes an interface, and you need to do a type switch in the code and update the aggregation appropriately.

func Strategy(evt interface{}, s *Aggregation) (*AggregateNotification, *Aggregation) {
    switch evt.(type) {
        case *SecurityNotification:
            s.TeamReceived = true
        case *TeamNotification:
            s.TeamReceived = true
    }
    if s.TeamReceived && s.SecurityReceived {
        // return new notification
    }
}