Aggregator Pattern - part 4
At the end of Part 3 my aggregation implementation was being processed within a transaction.
Processing incomplete aggregations
My simple aggregation function, which has been the same since Part 1 looks like this:
func Strategy(evt *SecurityNotification, s Aggregation) (*AggregateNotification, Aggregation) {
s = append(s, evt)
if len(s) == 3 OR evt.PRIORITY == HIGH {
return &AggregateNotification{
Notifications: s,
}, nil
}
return nil, s
}
This emits a notification if it receives 3 notifications, or if the event it receives is HIGH priority.
This means I can have aggregations that are not complete, and may never complete, depending on your requirements, this might not be an issue, but for the purposes of this example, I’m going to say that I want a notification if there are unreleased notifications after three hours, there are lots of options here, it might be that incomplete aggregations should be deleted if they never complete, or perhaps not receiving an aggregation for a period is a notifiable error case.
First off, I need to make some changes to extend the model to make things a bit easier.
Extending the aggregation model
This requires some changes in the Aggregation
model I’ve been using since part 1.
type Aggregation struct {
Email string
LastUpdated time.Time
Notifications []*SecurityNotification
}
I could work out the LastUpdated
time by sorting the Timestamp
fields on the included notifications (I could do this for the Email
too), but this is a bit more explicit.
With this, I need to write the new business logic to handle what to do when checking an Aggregation
without an event.
func Strategy(evt *SecurityNotification, s *Aggregation) (*AggregateNotification, *Aggregation) {
s.Notifications = append(s.Notifications, evt)
if len(s.Notifications) == 3 || evt.Priority == HIGH {
return aggregationToNotification(s), nil
}
return nil, s
}
func StrategyWithoutNotification(s *Aggregation) (*AggregateNotification, *Aggregation) {
cutOffTime := clock().Add(time.Hour * -3)
if cutOffTime.Before(s.LastUpdated) {
return nil, s
}
if len(s.Notifications) > 0 {
return aggregationToNotification(s), nil
}
return nil, nil
}
func aggregationToNotification(s *Aggregation) *AggregateNotification {
return &AggregateNotification{
Email: s.Email,
Notifications: s.Notifications,
}
}
StrategyWithoutNotification
looks to see if the last update time was more than 3 hours ago, if it’s not, then it can return the existing state, and no notification.
If there are unsent notifications, then it sends them, and updates the new state to nil
, otherwise it just updates the new state to nil
.
Processing aggregations without events
With badger, I can iterate over the aggregations fairly easily, it grabs a read/write transaction and iterates over the matching keys, this is why having a key prefix is really useful.
func (a *AggregateStore) ProcessAggregations(p AggregationProcessor) error {
err := a.db.Update(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
prefix := []byte(defaultPrefix)
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
err := processAggregation(txn, item, item.Key(), p)
if err != nil {
return err
}
}
return nil
})
return err
}
AggregationProcessor
is an interface that looks like this:
// AggregationProcessor handles bulk aggregation processing.
//
// There is no new event in this case, if it returns nil, then the existing
// aggregation state can be removed from the aggregation store.
type AggregationProcessor interface {
ProcessWithoutEvent(*Aggregation) (*Aggregation, error)
}
Most of the magic in ProcessAggregations
is in the processAggregation
function:
func processAggregation(txn *badger.Txn, item *badger.Item, key []byte, p AggregationProcessor) error {
err := item.Value(func(v []byte) error {
state, err := unmarshal(v)
if err != nil {
return err
}
newState, err := p.Process(state)
if err != nil {
return err
}
if newState == nil {
return txn.Delete(key)
}
return nil
})
return err
}
This unmarshals the current state, calls the AggregationProcessor
implementation, passing it the current state, and if it returns nil, it deletes the state entry.
There’s no facility in this implementation for updating the state, but this could easily save the returned state back to the store.
So, detecting aggregations that have timed out is as easy as triggering ProcessAggregations
on the store on a regular basis, this could be easily driven by a Goroutine and a Ticker
in a select loop, at scale, this might need to be split into key-shards, with routines processing each shard individually, or perhaps indexing the last-updated timestamps, or even switching to a more appropriate store.
A simple extension to the PublishingProcessor
from part-3 makes it possible to
use the same processor for each:
func (p *PublishingProcessor) ProcessWithoutEvent(existingState *Aggregation) (*Aggregation, error) {
notification, newState := StrategyWithoutEvent(oldState)
if notification == nil {
return newState, nil
}
log.Printf("publishing %#v\n", notification)
return newState, nil
}
Again, if this fails to publish, then it should return an error.
Duplicate aggregation prevention
In the book description of the Aggregator
pattern, there’s the following advice.
Depending on the aggregation strategy, the Aggregator may have to deal with the situation that an incoming message belongs to an aggregate that has already been closed out—that is, the incoming message arrives after the aggregate message has been published. In order to avoid starting a new aggregate, the Aggregator must keep a list of aggregates that have been closed out. This list should be purged periodically so that it does not grow indefinitely. However, we need to be careful not to purge closed-out aggregates too soon because that would cause any messages that are delayed to start a new aggregate.
This is easy enough to do in the ProcessNotification
implementation, adding a field to the Aggregation
to reflect the fact that this aggregation had been sent would be enough, and the ProcessAggregations
mechanism will take care of cleaning up the data.
func Strategy(evt *SecurityNotification, s *Aggregation) (*AggregateNotification, *Aggregation) {
if s.Sent {
return nil, s
}
s.Notifications = append(s.Notifications, evt)
if len(s.Notifications) == 3 || evt.Priority == HIGH {
return aggregationToNotification(s), nil
}
return nil, s
}
With this approach, it’s necessary to return an Aggregation
that retains those properties, rather than nil
, so, when emitting new notifications, the existing ones need to be truncated.
Rendezvous aggregation
Sometimes, you want to wait for two (or more) different types of events that have a collation ID, in this case, the ProcessNotification
method takes an interface, and you need to do a type switch in the code and update the aggregation appropriately.
func Strategy(evt interface{}, s *Aggregation) (*AggregateNotification, *Aggregation) {
switch evt.(type) {
case *SecurityNotification:
s.TeamReceived = true
case *TeamNotification:
s.TeamReceived = true
}
if s.TeamReceived && s.SecurityReceived {
// return new notification
}
}