In Part 5 I implemented the Aggregator in Java, with the proviso that aggregators in Java are best left to Camel.

Implementing this in Camel is fairly easy, it takes care of a lot of the infrastructure, providing an aggregate store (with options for persistence), and many ways to publish the event, with transactionality included.

There are two parts to this in Java, an implementation of the Aggregator which Camel has an interface for (AggregationStrategy) and then a route that uses the aggregator to route events.

Mocking Camel Routes

Camel provides a lot of functionality in its CamelTestSupport base class, including logic for mocking routes, this makes the test a bit more complex:

public class AggregationRouteBuilderTest extends CamelTestSupport {
    protected static final String TEST_EMAIL = "a@example.com";

    @EndpointInject(uri = "direct:notifications")
    private Endpoint notifications;
    @EndpointInject(uri = "mock:aggregations")
    private MockEndpoint aggregations;

    @Before
    public void setupMockRoutes() throws Exception {
        context.getRouteDefinition("security-notifications")
                .adviceWith(context, new AdviceWithRouteBuilder() {
                    @Override
                    public void configure() throws Exception {
                        weaveByToUri("direct:aggregations")
                                .replace()
                                .to("mock:aggregations");
                    }
                });
    }

    public RoutesBuilder createRouteBuilder() {
        return new AggregationRouteBuilder();
    }

    protected SecurityNotification makeNotification() {
        return new SecurityNotification(
                TEST_EMAIL,
                "testing",
                ZonedDateTime.now(ZoneId.of("UTC")),
                Priority.LOW);
    }
}

Testing the aggregation route

@Test
public void checkAggregationOfEvents() throws InterruptedException {
    var notification = makeNotification();

    template.sendBody(notifications, notification);
    template.sendBody(notifications, notification);
    template.sendBody(notifications, notification);

    aggregations.expectedMessageCount(1);
    aggregations.message(0).body().isInstanceOf(AggregateNotification.class);

    assertMockEndpointsSatisfied();
}

With all that test infrastructure in place, the test itself is fairly understandable, it sends a notification three times, then sets a couple of expectations up, and ensures that the expectations are satisfied.

Nothing particularly remarkable in this test, the hardest part is working out the syntax for “weaving” the mock URLs into the routes.

Building the route

public class AggregationRouteBuilder extends RouteBuilder {
    @Override
    public void configure() {
        from("direct:notifications").id("security-notifications")
                .aggregate(simple("${body.getEmail}"), new NotificationAggregator()).completionSize(3)
                .to("direct:aggregations");

        from("direct:aggregations").id("aggregation-notifications")
                .log("Got ${body}");
    }
}

There are two routes in this, security-notifications which aggregates using ${body.getEmail} as the correlation ID, passing the current state to a NotificationAggregator, and with a completionSize of three, this is slightly different from the previous implementations, the “when to emit the aggregation” is decoupled from the process of aggregation, this could also be a simple expression, which could check the current aggregation state for completion, for example, to implement the “or HIGH priority” requirement.

One of the nice things about this approach, is that it’s trivial to implement the requirement to emit the current state of the aggregation ” if there are unreleased notifications after three hours” with Camel’s completionTimeout mechanism.

The second route is aggregation-notifications, completed aggregations are emitted here from the security-notifications route, it simply logs the body, but it could easily use another processor to turn the aggregation into an Email, and use Camel’s SMTP component to send the email, or my own SendGrid based component.

Implementing the Aggregation

import lombok.extern.java.Log;
import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;

@Log
public class NotificationAggregator implements AggregationStrategy {

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        var notification = (SecurityNotification) newExchange.getIn().getBody();
        AggregateNotification an;
        if (oldExchange == null) {
            an = new AggregateNotification(notification.getEmail());
        } else {
            an = (AggregateNotification) oldExchange.getIn().getBody();
        }
        an.addMessage(notification);
        newExchange.getIn().setBody(an);
        return newExchange;
    }
}

This is different to the previous versions, it just aggregates the events in an AggregationNotification event.

The aggregation data looks very similar to the previous versions…

import lombok.ToString;

import java.util.ArrayList;
import java.util.List;

@ToString
public class AggregateNotification {
    private String to;
    private List<SecurityNotification> notifications;

    public AggregateNotification(String to) {
        this.to = to;
        this.notifications = new ArrayList<>();
    }

    public void addMessage(SecurityNotification notification) {
        notifications.add(notification);
    }

    public int size() {
        return notifications.size();
    }
}

The AggregationStrategy builds up the aggregation notification data, and that’s what’s emitted by the aggregator route.

Persistence of the aggregation data

Finally, in my earlier example, I used a Badger key-value store to record the aggregation data, Camel provides a default in-memory AggregationRepository.

But many different implementations are available: