At the end of Part 2 I had a simple persistent aggregation storage.

In this part, I’ll cover transactional aggregation, i.e. processing the event within a transaction, so that we can be confident that if we are unable to act on an event, an error will be reported.

A rough sketch of the transactional boundary involved:

Transsactional Aggregator

Within the scope of a database transaction for the store the following functionality is needed:

  • Load the existing state from the store - if I get an error fail the transaction
  • Pass the event and the existing state to the normal aggregation strategy
  • If the aggregator returns a new event, try and publish the event - on error fail the transaction
  • Save the new state to the database, fail the transaction if this result in an error

There are two ways to think about this, ask the store for a transaction, and then execute while keeping the transaction open, and finally close the transaction out.

Alternatively, I can provide some code and have it executed within the context of a transaction.

This is my preferered approach, and it’s fairly easy to drive through tests:

I have an interface for the code that is executed within the transaction.

// Processor handles incoming notifications and previous state.
//
// Executed within the context of a transaction, receives the incoming event and
// the current aggregate state for that event, returns the new state and an
// error if any.
type Processor interface {
    Process(*SecurityNotification, Aggregation) (Aggregation, error)
}

It’s important that this is able to return an error, publishing the aggregate notification, if any, can result in an error, and by returning this, I can prevent the transaction being committed.

func TestProcessNotificationWithNoPreviousState(t *testing.T) {
    store, cleanup := createBadgerStore(t)
    defer cleanup()

    processor := &mockProcessor{}
    notification := makeNotification(testEmail)

    err := store.ProcessNotification(notification, processor)
    fatalIfError(t, err)

    if processor.processedNotification == nil {
        t.Fatalf("processor did not receive notification")
    }
}

By creating a mock implementation of the Processor, I can test the behaviour fairly thoroughly.

type mockProcessor struct {
    processedNotification *SecurityNotification
    processedAggregation  Aggregation
    returnAggregation     Aggregation
    err                   error
}

func (p *mockProcessor) Process(n *SecurityNotification, a Aggregation) (Aggregation, error) {
    p.processedNotification = n
    p.processedAggregation = a
    return p.returnAggregation, p.err
}

Implementing the transactional method

The implementation is fairly simple, it is doing the same thing as the Get and Save methods, and also calling the processor in the middle.

func (a *AggregateStore) ProcessNotification(n *SecurityNotification, p Processor) error {
    return a.db.Update(func(txn *badger.Txn) error {
        id := n.Email
        previous, err := getOrEmpty(txn, id)
        if err != nil {
            return err
        }
        newState, err := p.Process(n, previous)
        if err != nil {
            return err
        }
        b, err := marshal(newState)
        if err != nil {
            return err
        }
        return txn.Set(keyForId(defaultPrefix, id), b)
    })
}

Additional testing drives out the behaviour:

func TestProcessNotificationUpdatesExistingState(t *testing.T) {
    store, cleanup := createBadgerStore(t)
    defer cleanup()

    notifications := Aggregation{makeNotification(testEmail)}
    err := store.Save(testEmail, notifications)
    fatalIfError(t, err)
    processor := &mockProcessor{}
    notification := makeNotification(testEmail)
    processor.returnAggregation = Aggregation{makeNotification(testEmail), makeNotification(testEmail)}

    err = store.ProcessNotification(notification, processor)
    fatalIfError(t, err)

    if processor.processedNotification == nil {
        t.Fatalf("processor did not receive notification")
    }
    loaded, err := store.Get(testEmail)
    fatalIfError(t, err)
    if !reflect.DeepEqual(processor.returnAggregation, loaded) {
        t.Fatalf("got %#v, wanted %#v", loaded, processor.returnAggregation)
    }
}

func TestExecuteAggregationErrorPublishing(t *testing.T) {
    store, cleanup := createBadgerStore(t)
    defer cleanup()
    testError := errors.New("this is a test")

    processor := &mockProcessor{}
    processor.err = testError
    notification := makeNotification(testEmail)

    err := store.ProcessNotification(notification, processor)
    if err != testError {
        t.Fatalf("got error %s, wanted %s", err, testError)
    }

    loaded, err := store.Get(testEmail)
    fatalIfError(t, err)
    if loaded != nil {
        t.Fatalf("saved aggregate despite error: %#v", loaded)
    }
}

This allows trimming the code in the HTTP handler from the code, as it was in part 1.

func makeHandler(store *aggregator.AggregateStore) func(http.ResponseWriter, *http.Request) {
    return func(w http.ResponseWriter, r *http.Request) {
        var sn aggregator.SecurityNotification
        dec := json.NewDecoder(r.Body)
        err := dec.Decode(&sn)
        if err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
        }

        correlationID := sn.Email
        existingState, err := store.Get(correlationID)
        if err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
        }

        n, s := aggregator.Strategy(&sn, existingState)
        err = store.Save(correlationID, s)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
    }
}

Losing seven lines in the changes:

func makeHandler(store *aggregator.AggregateStore, processor aggregator.Processor) func(http.ResponseWriter, *http.Request) {
    return func(w http.ResponseWriter, r *http.Request) {
        var sn aggregator.SecurityNotification
        dec := json.NewDecoder(r.Body)
        err := dec.Decode(&sn)
        if err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
        }

        err = store.ProcessNotification(&sn, processor)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
    }
}

The implementation of the Processor used here is easily understood:

type PublishingProcessor struct {
}

func (p *PublishingProcessor) Process(evt *SecurityNotification, oldState Aggregation) (Aggregation, error) {
    notification, newState := Strategy(evt, oldState)
    if notification == nil {
        return newState, nil
    }
    log.Printf("publishing %#v\n", notification)
    return newState, nil
}

In a real implementation, this would contain something that could actually publish the notification.