Aggregator Pattern - part 3
At the end of Part 2 I had a simple persistent aggregation storage.
In this part, I’ll cover transactional aggregation, i.e. processing the event within a transaction, so that we can be confident that if we are unable to act on an event, an error will be reported.
A rough sketch of the transactional boundary involved:
Within the scope of a database transaction for the store the following functionality is needed:
- Load the existing state from the store - if I get an error fail the transaction
- Pass the event and the existing state to the normal aggregation strategy
- If the aggregator returns a new event, try and publish the event - on error fail the transaction
- Save the new state to the database, fail the transaction if this result in an error
There are two ways to think about this, ask the store for a transaction, and then execute while keeping the transaction open, and finally close the transaction out.
Alternatively, I can provide some code and have it executed within the context of a transaction.
This is my preferered approach, and it’s fairly easy to drive through tests:
I have an interface for the code that is executed within the transaction.
// Processor handles incoming notifications and previous state.
//
// Executed within the context of a transaction, receives the incoming event and
// the current aggregate state for that event, returns the new state and an
// error if any.
type Processor interface {
Process(*SecurityNotification, Aggregation) (Aggregation, error)
}
It’s important that this is able to return an error, publishing the aggregate notification, if any, can result in an error, and by returning this, I can prevent the transaction being committed.
func TestProcessNotificationWithNoPreviousState(t *testing.T) {
store, cleanup := createBadgerStore(t)
defer cleanup()
processor := &mockProcessor{}
notification := makeNotification(testEmail)
err := store.ProcessNotification(notification, processor)
fatalIfError(t, err)
if processor.processedNotification == nil {
t.Fatalf("processor did not receive notification")
}
}
By creating a mock implementation of the Processor
, I can test the behaviour
fairly thoroughly.
type mockProcessor struct {
processedNotification *SecurityNotification
processedAggregation Aggregation
returnAggregation Aggregation
err error
}
func (p *mockProcessor) Process(n *SecurityNotification, a Aggregation) (Aggregation, error) {
p.processedNotification = n
p.processedAggregation = a
return p.returnAggregation, p.err
}
Implementing the transactional method
The implementation is fairly simple, it is doing the same thing as the Get
and
Save
methods, and also calling the processor in the middle.
func (a *AggregateStore) ProcessNotification(n *SecurityNotification, p Processor) error {
return a.db.Update(func(txn *badger.Txn) error {
id := n.Email
previous, err := getOrEmpty(txn, id)
if err != nil {
return err
}
newState, err := p.Process(n, previous)
if err != nil {
return err
}
b, err := marshal(newState)
if err != nil {
return err
}
return txn.Set(keyForId(defaultPrefix, id), b)
})
}
Additional testing drives out the behaviour:
func TestProcessNotificationUpdatesExistingState(t *testing.T) {
store, cleanup := createBadgerStore(t)
defer cleanup()
notifications := Aggregation{makeNotification(testEmail)}
err := store.Save(testEmail, notifications)
fatalIfError(t, err)
processor := &mockProcessor{}
notification := makeNotification(testEmail)
processor.returnAggregation = Aggregation{makeNotification(testEmail), makeNotification(testEmail)}
err = store.ProcessNotification(notification, processor)
fatalIfError(t, err)
if processor.processedNotification == nil {
t.Fatalf("processor did not receive notification")
}
loaded, err := store.Get(testEmail)
fatalIfError(t, err)
if !reflect.DeepEqual(processor.returnAggregation, loaded) {
t.Fatalf("got %#v, wanted %#v", loaded, processor.returnAggregation)
}
}
func TestExecuteAggregationErrorPublishing(t *testing.T) {
store, cleanup := createBadgerStore(t)
defer cleanup()
testError := errors.New("this is a test")
processor := &mockProcessor{}
processor.err = testError
notification := makeNotification(testEmail)
err := store.ProcessNotification(notification, processor)
if err != testError {
t.Fatalf("got error %s, wanted %s", err, testError)
}
loaded, err := store.Get(testEmail)
fatalIfError(t, err)
if loaded != nil {
t.Fatalf("saved aggregate despite error: %#v", loaded)
}
}
This allows trimming the code in the HTTP handler from the code, as it was in part 1.
func makeHandler(store *aggregator.AggregateStore) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
var sn aggregator.SecurityNotification
dec := json.NewDecoder(r.Body)
err := dec.Decode(&sn)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
correlationID := sn.Email
existingState, err := store.Get(correlationID)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
n, s := aggregator.Strategy(&sn, existingState)
err = store.Save(correlationID, s)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
Losing seven lines in the changes:
func makeHandler(store *aggregator.AggregateStore, processor aggregator.Processor) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
var sn aggregator.SecurityNotification
dec := json.NewDecoder(r.Body)
err := dec.Decode(&sn)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
err = store.ProcessNotification(&sn, processor)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
The implementation of the Processor used here is easily understood:
type PublishingProcessor struct {
}
func (p *PublishingProcessor) Process(evt *SecurityNotification, oldState Aggregation) (Aggregation, error) {
notification, newState := Strategy(evt, oldState)
if notification == nil {
return newState, nil
}
log.Printf("publishing %#v\n", notification)
return newState, nil
}
In a real implementation, this would contain something that could actually publish the notification.