I got some feedback on this series:

How hard is this to do in Kotlin or Java?

I ran two training sessions last year on getting to grips with Aggregators in Java…but, I’ll be honest, I cheated…I used Apache Camel which makes it fairly easy.

Camel has specific support for the Aggregator Pattern as part of its implementation of the EIP patterns, and this is what I’d recommend.

I’ll show a quick implementation in Java, and I think a Kotlin version would look quite similar to the Go version, I’ll see if I can find time to write that up.

But, honestly, both Spring Integration and Camel have great support for powerful Aggregators, writing your should really only be an exercise in learning the pattern.

public class SecurityNotificationStrategy {
    public Optional<AggregateNotification> aggregate(SecurityNotification notification, Aggregation state) {
        state.addNotification(notification);
        if (state.size() < 3) {
            return Optional.empty();
        }

        var aggregateNotification =  Optional.of(makeAggregateNotification(state));
        state.clearNotifications();
        return aggregateNotification;
    }

    private AggregateNotification makeAggregateNotification(Aggregation aggregation) {
        return new AggregateNotification(
                aggregation.getEmail(),
                new HashSet<>(aggregation.getSecurityNotifications()));
    }
}

This uses a Java 8 Optional for the return value, but otherwise it looks fairly similar to the Go version.

@Data
@AllArgsConstructor
public class Aggregation {
    private String email;
    private ZonedDateTime lastUpdated;
    private Set<SecurityNotification> securityNotifications;

    public Aggregation(String email) {
        this.email = email;
        this.securityNotifications = new HashSet<>();
    }

    public int size() {
        return this.securityNotifications.size();
    }

    protected void addNotification(SecurityNotification notification) {
        this.securityNotifications.add(notification);
        this.lastUpdated = ZonedDateTime.now(ZoneId.of("UTC"));

    }
}

I’ll cheat and use Lombok with the Aggregation, this is where the @Data and @AllArgsConstructor annotations come from, Lombok takes a bit of getting used to, but it does tidy up code and remove large sets of getters and setters.

class SecurityNotificationStrategyTest {
    private static final String TEST_EMAIL = "a@example.com";
    private SecurityNotificationStrategy strategy;

    @BeforeEach
    public void setUpStrategy() {
        strategy = new SecurityNotificationStrategy();
    }

    @Test
    public void testAggregationWithEmptyState() {
        var existingState = new Aggregation(TEST_EMAIL);
        var notification = strategy.aggregate(makeNotification(), existingState);

        assertThat(notification).isEmpty();
    }

    @Test
    public void testAggregationUpdating() {
        var existingState = new Aggregation(TEST_EMAIL);
        var notification = strategy.aggregate(makeNotification(), existingState);
        assertThat(notification).isEmpty();

        notification = strategy.aggregate(makeNotification(), existingState);
        assertThat(notification).isEmpty();

        notification = strategy.aggregate(makeNotification(), existingState);
        assertThat(notification).isNotEmpty();
    }

    @Test
    public void testAggregationWithNearlyCompleteState() {
        var existingState = new Aggregation(TEST_EMAIL);
        existingState.addNotification(makeNotification());
        existingState.addNotification(makeNotification());

        var notification = strategy.aggregate(makeNotification(), existingState);

        assertThat(notification).isNotEmpty();
        notification.ifPresent(n -> {
            assertThat(n.getEmail()).isEqualTo(TEST_EMAIL);
            assertThat(n.getNotifications()).hasSize(3);
        });
    }

    private SecurityNotification makeNotification() {
        return new SecurityNotification(
                TEST_EMAIL,
                "testing",
                ZonedDateTime.now(ZoneId.of("UTC")),
                Priority.LOW);
    }
}

The tests are fairly straightforward, the state is passed by reference, so it can be updated in place, rather than returning the new state and an optional notification.

EDIT: Yes, I know that the Go version is passed by reference too, I like the symmetric nature of passing current state, and receiving new state, and if I was to have the aggregate method return a Response type object, with the optional AggregateNotification and the new state, this would be more like the Go approach.

Wrap up

I’ve written a lot about the Aggregator pattern, it’s really versatile for lots of common problems.

I still have a couple of examples I’d like to write up:

  • Benthos processing with Aggregation - I discovered Benthos recently, it describes itself as “The stream processor for mundane tasks”, and it’s quite flexible, implementing an aggregator turned out to be fairly simple in Go.
  • The aforementioned Kotlin version, this shouldn’t be that hard, and I’m curious how it’ll look.
  • A Camel-based implementation of the “Security Notification” aggregation problem.