Aggregator Pattern - part 6
In Part 5 I implemented the Aggregator in Java, with the proviso that aggregators in Java are best left to Camel.
Implementing this in Camel is fairly easy, it takes care of a lot of the infrastructure, providing an aggregate store (with options for persistence), and many ways to publish the event, with transactionality included.
There are two parts to this in Java, an implementation of the Aggregator
which Camel has an interface for (AggregationStrategy
) and then a route that uses the aggregator to route events.
Mocking Camel Routes
Camel provides a lot of functionality in its CamelTestSupport
base class, including logic for mocking routes, this makes the test a bit more complex:
public class AggregationRouteBuilderTest extends CamelTestSupport {
protected static final String TEST_EMAIL = "a@example.com";
@EndpointInject(uri = "direct:notifications")
private Endpoint notifications;
@EndpointInject(uri = "mock:aggregations")
private MockEndpoint aggregations;
@Before
public void setupMockRoutes() throws Exception {
context.getRouteDefinition("security-notifications")
.adviceWith(context, new AdviceWithRouteBuilder() {
@Override
public void configure() throws Exception {
weaveByToUri("direct:aggregations")
.replace()
.to("mock:aggregations");
}
});
}
public RoutesBuilder createRouteBuilder() {
return new AggregationRouteBuilder();
}
protected SecurityNotification makeNotification() {
return new SecurityNotification(
TEST_EMAIL,
"testing",
ZonedDateTime.now(ZoneId.of("UTC")),
Priority.LOW);
}
}
Testing the aggregation route
@Test
public void checkAggregationOfEvents() throws InterruptedException {
var notification = makeNotification();
template.sendBody(notifications, notification);
template.sendBody(notifications, notification);
template.sendBody(notifications, notification);
aggregations.expectedMessageCount(1);
aggregations.message(0).body().isInstanceOf(AggregateNotification.class);
assertMockEndpointsSatisfied();
}
With all that test infrastructure in place, the test itself is fairly understandable, it sends a notification three times, then sets a couple of expectations up, and ensures that the expectations are satisfied.
Nothing particularly remarkable in this test, the hardest part is working out the syntax for “weaving” the mock URLs into the routes.
Building the route
public class AggregationRouteBuilder extends RouteBuilder {
@Override
public void configure() {
from("direct:notifications").id("security-notifications")
.aggregate(simple("${body.getEmail}"), new NotificationAggregator()).completionSize(3)
.to("direct:aggregations");
from("direct:aggregations").id("aggregation-notifications")
.log("Got ${body}");
}
}
There are two routes in this, security-notifications
which aggregates using ${body.getEmail}
as the correlation ID, passing the current state to a NotificationAggregator
, and with a completionSize
of three, this is slightly different from the previous implementations, the “when to emit the aggregation” is decoupled from the process of aggregation, this could also be a simple
expression, which could check the current aggregation state for completion, for example, to implement the “or HIGH priority” requirement.
One of the nice things about this approach, is that it’s trivial to implement the requirement to emit the current state of the aggregation ” if there are unreleased notifications after three hours” with Camel’s completionTimeout
mechanism.
The second route is aggregation-notifications
, completed aggregations are emitted here from the security-notifications
route, it simply logs the body, but it could easily use another processor to turn the aggregation into an Email, and use Camel’s SMTP component to send the email, or my own SendGrid based component.
Implementing the Aggregation
import lombok.extern.java.Log;
import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;
@Log
public class NotificationAggregator implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
var notification = (SecurityNotification) newExchange.getIn().getBody();
AggregateNotification an;
if (oldExchange == null) {
an = new AggregateNotification(notification.getEmail());
} else {
an = (AggregateNotification) oldExchange.getIn().getBody();
}
an.addMessage(notification);
newExchange.getIn().setBody(an);
return newExchange;
}
}
This is different to the previous versions, it just aggregates the events in an AggregationNotification
event.
The aggregation data looks very similar to the previous versions…
import lombok.ToString;
import java.util.ArrayList;
import java.util.List;
@ToString
public class AggregateNotification {
private String to;
private List<SecurityNotification> notifications;
public AggregateNotification(String to) {
this.to = to;
this.notifications = new ArrayList<>();
}
public void addMessage(SecurityNotification notification) {
notifications.add(notification);
}
public int size() {
return notifications.size();
}
}
The AggregationStrategy
builds up the aggregation notification data, and that’s what’s emitted by the aggregator route.
Persistence of the aggregation data
Finally, in my earlier example, I used a Badger key-value store to record the aggregation data, Camel provides a default in-memory AggregationRepository
.
But many different implementations are available: