Event-driven systems are becoming more and more common.

Systems that receive events, process, and then emit new events to drive processes, both synchronously and asynchronously, and the “event bus”, often Kafka, are a major architectural style (if not a new one).

For me, the “bible” of designing event-driven systems, is “Enterprise Integration Patterns”.

The website only has a subset of the information in the book, so it’s well worth buying the book.

One of the most useful patterns, is the “Aggregator”.

How do we combine the results of individual, but related messages so that they can be processed as a whole?

The basic Aggregator looks like this:

Aggregator

The Aggregator has many uses, from rolling up notifications, to reducing the number of lines emitted to a log, to providing a rendezvous for multiple events to trigger a further event.

I’ll show how to implement a simple Aggregator in Go.

The problem

A common use-case is rolling-up notifications, rather than sending multiple emails for each notification received, grouping these single emails can be less annoying for users.

I’ll implement a simple notification roll-up service.

This is the model.

const (
    LOW    = iota
    MEDIUM
    HIGH
)

type SecurityNotification struct {
    Notification string
    Timestamp    time.Time
    Priority     int
}

A notification, destined for a user, with a message, and a timestamp, and a priority, if you want to see a real world notification see 1.

I’ll start with a test.

func TestAggregation(t *testing.T) {
    n, s := Strategy(makeNotification(testEmail), make(Aggregation, 0))
    if n != nil {
        t.Fatalf("unexpectedly received a notification: got %#v", n)
    }
    n, s = Strategy(makeNotification(testEmail), s)
    if n != nil {
        t.Fatalf("unexpectedly received a notification: got %#v", n)
    }
    n, s = Strategy(makeNotification(testEmail), s)
    if n == nil {
        t.Fatal("expected a notification, got nil")
    }
    if n.Email != testEmail {
        t.Fatalf("incorrect aggregate email: got %s, wanted %s", n.Email, testEmail)
    }
    if l := len(n.Notifications); l != 3 {
        t.Fatalf("expected 3 messages in the aggregation, got %d", l)
    }
}

I’ve compressed this into a single test for brevity, but it basically sends events, until it receives a notification.

A small helper, creates SecurityNotification values:

func makeNotification(email string) *SecurityNotification {
    return &SecurityNotification{
        Email:        email,
        Notification: "testing",
        Timestamp:    time.Now().UTC(),
        Priority:     LOW,
    }
}

And the actual aggregation function:

type Aggregation []*SecurityNotification

func Strategy(evt *SecurityNotification, s Aggregation) (*AggregateNotification, Aggregation) {
    s = append(s, evt)
    if len(s) == 3 {
        return &AggregateNotification{
            Notifications: s,
        }, nil
    }
    return nil, s
}

Starting with the signature, this function takes a new event *SecurityNotification and the current state, and returns an aggregated notification, along with the new state.

The logic begins, by appending the new event to the state, and if there are 3 elements, it emits a new event, an AggregateNotification, along with the new state, nil.

If there are not 3 elements in the state, no AggregateNotification is emitted, and the updated state is returned.

Looking at the function, it’s fairly obvious how it works, it relies only on the new event, and the existing state to return a value.

This means it’s referentially transparent,

So, how does this work in practice?

var existingState aggregator.Aggregation

func main() {
    existingState = make(aggregator.Aggregation, 0)
    http.HandleFunc("/notifications", aggregatorHandler)

    fmt.Printf("receiving on http://localhost:8080/notifications\n")
    log.Fatal(http.ListenAndServe(":8080", nil))

}

func aggregatorHandler(w http.ResponseWriter, r *http.Request) {
    var sn aggregator.SecurityNotification
    dec := json.NewDecoder(r.Body)
    err := dec.Decode(&sn)
    if err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    var n *aggregator.AggregateNotification
    n, existingState = aggregator.Strategy(&sn, existingState)
    if n != nil {
        log.Printf("new event emitted for user %s\n", n.Email)
        return
    }
    log.Println("event processed - no event emitted")
}

Note: THIS IS NOT Goroutine SAFE, access to the global existingState should be mutexed.

A really simple HTTP endpoint for testing, that receives JSON blobs, decodes them, and then aggregates them.

I can test this really easily with a JSON blob in a file, and posting it to the test endpoint.

 $ curl -H "Content-Type: application/json" \
     -d '{"Email":"a@example.com","Notification":"test notification","Timestamp":"2019-08-10T15:31:47.808596Z","Priority":0}' \
     http://localhost:8080/notifications

Executing this repeatedly gets the expected output.

receiving on http://localhost:8080/notifications
2019/08/11 15:38:43 event processed - no event emitted
2019/08/11 15:38:44 event processed - no event emitted
2019/08/11 15:38:45 new event emitted for user a@example.com
2019/08/11 15:38:45 event processed - no event emitted
2019/08/11 15:38:46 event processed - no event emitted
2019/08/11 15:38:47 new event emitted for user a@example.com
2019/08/11 15:38:47 event processed - no event emitted

But, this is aggregating all events into batches, it’s not splitting them per user.

Correlation IDs

If I want to group by the email address of the user, then it’s as easy as maintaining a state per email address (the correlation ID).

The actual aggregator remains unchanged.

An in-memory implementation might look like this:

var aggregationStore map[string]aggregator.Aggregation

func main() {
    aggregationStore = make(map[string]aggregator.Aggregation)
    http.HandleFunc("/notifications", aggregatorHandler)

    fmt.Printf("receiving on http://localhost:8080/notifications\n")
    log.Fatal(http.ListenAndServe(":8080", nil))

}

func aggregatorHandler(w http.ResponseWriter, r *http.Request) {
    var sn aggregator.SecurityNotification
    dec := json.NewDecoder(r.Body)
    err := dec.Decode(&sn)
    if err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    correlationID := sn.Email
    log.Printf("processing event for %s\n", correlationID)

    existingState, ok := aggregationStore[correlationID]
    if !ok {
        existingState = make(aggregator.Aggregation, 0)
    }

    n, newState := aggregator.Strategy(&sn, existingState)
    aggregationStore[correlationID] = newState

    if n != nil {
        log.Printf("new event emitted for user %s\n", n.Email)
        return
    }
    log.Println("event processed - no event emitted")
}

Note: THIS IS NOT Goroutine SAFE, access to the global aggregationStore should be mutexed.

This change is easy to test:

receiving on http://localhost:8080/notifications
2019/08/11 16:29:32 processing event for a@example.com
2019/08/11 16:29:32 event processed - no event emitted
2019/08/11 16:29:37 processing event for b@example.com
2019/08/11 16:29:37 event processed - no event emitted
2019/08/11 16:29:41 processing event for a@example.com
2019/08/11 16:29:41 event processed - no event emitted
2019/08/11 16:29:44 processing event for a@example.com
2019/08/11 16:29:44 new event emitted for user a@example.com

After 3 events for a@example.com, an event is emitted for a@example.com, even though an event for b@example.com is mixed in.

Changing the business logic

I now have code that aggregates notifications per user, but, perhaps I want to automatically trigger delivery of the notification if I receive a HIGH priority notification.

This is as easy as updating the aggregation function:

First off, a test:

func TestAggregationPublishesOnHighPriorityEvent(t *testing.T) {
    n, s := Strategy(makeNotification(testEmail), make(Aggregation, 0))
    if n != nil {
        t.Fatalf("unexpectedly received a notification: got %#v", n)
    }
    evt2 := makeNotification(testEmail)
    evt2.Priority = HIGH
    n, s = Strategy(evt2, s)
    if n == nil {
        t.Fatal("expected a notification, got nil")
    }

    if l := len(n.Notifications); l != 2 {
        t.Fatalf("expected 2 messages in the aggregation, got %d", l)
    }
}

Then a quick tweak to the strategy function:

func Strategy(evt *SecurityNotification, s Aggregation) (*AggregateNotification, Aggregation) {
    s = append(s, evt)
    if len(s) == 3 || evt.Priority == HIGH {
        return &AggregateNotification{
            Email:         evt.Email,
            Notifications: s,
        }, nil
    }
    return nil, s
}

This changes the logic to emit the aggregation if we have 3 events, or if the the incoming event is HIGH priority.

The same curl posts, but varying the priority value.

receiving on http://localhost:8080/notifications
2019/08/11 16:39:31 processing low priority event for a@example.com
2019/08/11 16:39:31 event processed - no event emitted
2019/08/11 16:39:35 processing high priority event for a@example.com
2019/08/11 16:39:35 new event emitted for user a@example.com

The collation logic doesn’t need to change at all.

I have some other things that are missing from this example:

  • Persisting the aggregation data across process restarts.
  • What happens if we never receive the third notification, or a HIGH priority notification?

I’ll address these issues in a subsequent post.

  1. https://developer.github.com/v3/activity/events/types/#securityadvisoryevent