At the end of Part 1 I had a working Aggregator pattern implementation.

There were two major unresolved issues at the end.

  • Persisting the aggregation data across process restarts.
  • What happens if we never receive the third notification, or a HIGH priority notification?

I’ll tackle the first of these in this part.

Persisting the aggregation data across restarts.

Aggregation data generally has a correlation ID, for the simple example, this is the email address of the user.

This lends itself to recording in a key-value store, with the key the correlation ID, and the value the current state of the aggregation data.

Implementation using Badger

For this example, I’m using “Badger”, it’s an on-disk store, similar to RocksDB etc., but the API is similar across key-value stores.

I need to be able to persist and retrieve data using a correlation ID, an in the case of the notification aggregation, the correlation ID is an email.

Starting with a test.

NOTE: in the code below, I’ve ellided errors to shorten the code

func TestGetWithUnknownID(t *testing.T) {
    store, cleanup := createBadgerStore(t)
    defer cleanup()

    a, err := store.Get(testEmail)
    fatalIfError(t, err)

    if a != nil {
        t.Fatalf("unknown ID wanted nil, got %#v", a)
    }
}

This is a basic test that ensures that I get a sensible response when there is no value in the database.

func TestSave(t *testing.T) {
    store, cleanup := createBadgerStore(t)
    defer cleanup()
    notifications := Aggregation{makeNotification(testEmail)}

    err := store.Save(testEmail, notifications)
    fatalIfError(t, err)

    loaded, err := store.Get(testEmail)
    fatalIfError(t, err)
    if !reflect.DeepEqual(notifications, loaded) {
        t.Fatalf("save failed to save: wanted %#v, got %#v", notifications, loaded)
    }
}

This case tests that I can write to the store and retrieve the value that was written.

The implementation is not particularly hard, Badger has transactional semantics, so the fetching and saving are both done within a transaction.

func (a *AggregateStore) Get(id string) (Aggregation, error) {
    var sns Aggregation
    err := a.db.View(func(txn *badger.Txn) error {
        var err error
        sns, err = getOrNil(txn, id)
        return err
    })
    return sns, err
}

func (a *AggregateStore) Save(id string, state Aggregation) error {
    b, err := marshal(state)
    // return if error
    return a.db.Update(func(txn *badger.Txn) error {
        return txn.Set(keyForId(defaultPrefix, id), b)
    })
}

I opted to serialise to JSON for storage the database, but Msgpak or even protobuf might be more appropriate.

A couple of small helper functions round out the code.

func keyForId(prefix, id string) []byte {
    return []byte(fmt.Sprintf("%s:%s", prefix, id))
}

Prefixing keys in a key-value store is important, it’s the equivalent of the “table name” in a relational database.

And the second helper, which reads from Badger within a provided transaction, and handles the ErrKeyNotFound error, by returning a nil.

The interface I’ve defined says that Get should return the aggregation state for the provided id or, nil if there is no current state.

func getOrNil(txn *badger.Txn, id string) (Aggregation, error) {
    item, err := txn.Get(keyForId(defaultPrefix, id))
    if err == badger.ErrKeyNotFound {
        return nil, nil
    }
    // return if error
    var sns Aggregation
    err = item.Value(func(val []byte) error {
        sns, err = unmarshal(val)
        return err
    })
    return sns, err
}

At this point, I can read and write to a Badger KV store, it’s easy to swap out the memory-based version from part 1 with the Badger implementation.

Updating the HTTP handler to use the new persistence mechanism

func main() {
    db, err := badger.Open(badger.DefaultOptions("./tmp"))
    // fail if error
    defer db.Close()

    store := aggregator.NewStore(db)

    http.HandleFunc("/notifications", makeHandler(store))

    fmt.Printf("receiving on http://localhost:8080/notifications\n")
    log.Fatal(http.ListenAndServe(":8080", nil))

}

func makeHandler(store *aggregator.AggregateStore) func(http.ResponseWriter, *http.Request) {
    return func(w http.ResponseWriter, r *http.Request) {
        var sn aggregator.SecurityNotification
        dec := json.NewDecoder(r.Body)
        err := dec.Decode(&sn)
        // return if error

        correlationID := sn.Email
        existingState, err := store.Get(correlationID)
        // return if error

        n, s := aggregator.Strategy(&sn, existingState)

        err = store.Save(correlationID, s)
        // return if error
        if n != nil {
            log.Printf("new event emitted for user %s\n", n.Email)
        }
    }
}

This is fairly straightforward, fetching from the database, updating the state, saving the state, logging out if an aggregate notification is emitted.

The problem with this code, is that it’s not transactionally safe, ideally, we’d load, update, possibly publish, and save within the scope of a transaction, and return the result of all this in the event processing, so we could not acknowledge the message to the upstream event stream in the event of a failure to process / save the state.

Generally, when you “ack” an event in a message system, the consumer is saying “yes, I have accepted this message and processed it successfully”, I’ll deal with this in part-3.