In part 1 of this series, I started digging into https://github.com/alphagov/pay-ledger.

At the end, I had uncovered the following application classes:

  • EventResource, Event and EventDao
  • TransactionResource, Transaction, TransactionService, TransactionView, TransactionSearchResponse, TransactionEventResponse and TransactionsForTransactionResponse
  • HealthCheckResource
  • SQSHealthCheck
  • QueueMessageReceiver

Starting with QueueMessageReceiver things start to get more detailed:

public class QueueMessageReceiver implements Managed {
    @Inject
    public QueueMessageReceiver(
            Environment environment...EventMessageHandler eventMessageHandler) {
        this.eventMessageHandler = eventMessageHandler;
        scheduledExecutorService = environment
                .lifecycle()
                .scheduledExecutorService(QUEUE_MESSAGE_RECEIVER_THREAD_NAME)
                .threads(queueReadScheduleNumberOfThreads)
                .build();
    }

    @Override
    public void start() {
        long initialDelay = config.getThreadDelayInMilliseconds();
        long delay = config.getThreadDelayInMilliseconds();
        scheduledExecutorService.scheduleWithFixedDelay(
                this::receive,
                initialDelay,
                delay,
                TimeUnit.MILLISECONDS
        );
    }

    private void receive() {
        eventMessageHandler.handle();
    }

    @Override
    public void stop() {
        scheduledExecutorService.shutdown();
    }
}

I’ve ellided some of the code for clarity.

This sets up a scheduled poll of something called an EventMessageHandler and calls handle on it. The executor is provided by the Dropwizard environment.

The Managed interface gets started and stopped by Dropwizard.

EventMessageHandler

public class EventMessageHandler {
    @Inject
    public EventMessageHandler(EventQueue eventQueue, EventService eventService, TransactionService transactionService) {
        this.eventQueue = eventQueue;
        this.eventService = eventService;
        this.transactionService = transactionService;
    }

    public void handle() throws QueueException {
        List<EventMessage> eventMessages = eventQueue.retrieveEvents();

        for (EventMessage message : eventMessages) {
            processSingleMessage(message);
        }
    }

    void processSingleMessage(EventMessage message) throws QueueException {
        Event event = message.getEvent();
        CreateEventResponse response = eventService.createIfDoesNotExist(event);

        if(response.isSuccessful()) {
            EventDigest eventDigest = eventService.getEventDigestForResource(event.getResourceExternalId());
            transactionService.upsertTransactionFor(eventDigest);
            eventQueue.markMessageAsProcessed(message);
        } else {
            eventQueue.scheduleMessageForRetry(message);
        }
    }
}

This is fairly easy to understand, the poller calls handle which fetches events from an EventQueue, and then processes them individually.

It uses an EventService and TransactionService to get a digest and then save the Event, and then mark the event as processed, or, failing that, schedule it to be retried.

EventQueue

EventQueue is a wrapper around an SqsQueueService:

public class EventQueue {
    private SqsQueueService sqsQueueService;

    @Inject
    public EventQueue(SqsQueueService sqsQueueService, LedgerConfig configuration, ObjectMapper objectMapper) {
    }

    public List<EventMessage> retrieveEvents() throws QueueException {
        List<QueueMessage> queueMessages = sqsQueueService
                .receiveMessages(this.eventQueueUrl, EVENT_MESSAGE_ATTRIBUTE_NAME);

        return queueMessages
                .stream()
                .map(this::getMessage)
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
    }

    public void markMessageAsProcessed(EventMessage message) throws QueueException {
        sqsQueueService.deleteMessage(this.eventQueueUrl, message.getQueueMessageReceiptHandle());
    }

    public void scheduleMessageForRetry(EventMessage message) throws QueueException {
        sqsQueueService.deferMessage(this.eventQueueUrl, message.getQueueMessageReceiptHandle(), retryDelayInSeconds);
    }

    private EventMessage getMessage(QueueMessage queueMessage) {
        EventMessageDto eventDto = objectMapper.readValue(queueMessage.getMessageBody(), EventMessageDto.class);

        return EventMessage.of(eventDto, queueMessage);
    }
}

This is just fetching events from SQS, and parsing them, using an ObjectMapper, which means that the events are coming in in JSON.

I made a rough sketch as I went along of the SQS subsystem.

Sketch of event receiver

Essentially, the EventMessageHandler polls SQS using the EventQueue, which decodes the messages from JSON, it then uses the EventService to create a record, and then records it using the TransactionService, which the API can query for.

I need to understand a few more bits to fully understand the code:

EventService

public class EventService {
    private EventDao eventDao;

    @Inject
    public EventService(EventDao eventDao) {
        this.eventDao = eventDao;
    }

    public EventDigest getEventDigestForResource(String resourceExternalId) {
        List<Event> events = eventDao.getEventsByResourceExternalId(resourceExternalId);
        return EventDigest.fromEventList(events);
    }

    public CreateEventResponse createIfDoesNotExist(Event event) {
        try {
            Optional<Long> status = eventDao.insertEventIfDoesNotExistWithResourceTypeId(event);
            return new CreateEventResponse(status);
        } catch (Exception e) {
            return new CreateEventResponse(e);
        }
    }
}

This is a simple wrapper around the EventDao, this must be where the magic is…

EventDao

public interface EventDao {
    @CreateSqlObject
    ResourceTypeDao getResourceTypeDao();

    @SqlQuery("SELECT  e.id, e.sqs_message_id, rt.name AS resource_type_name, e.resource_external_id, " +
            "e.parent_resource_external_id, e.event_date," +
            "e.event_type, e.event_data FROM event e, resource_type rt WHERE e.resource_external_id = :resourceExternalId" +
            " AND e.resource_type_id = rt.id ORDER BY e.event_date DESC")
    List<Event> getEventsByResourceExternalId(@Bind("resourceExternalId") String resourceExternalId);


    @SqlUpdate("INSERT INTO event(sqs_message_id, resource_type_id, resource_external_id, parent_resource_external_id, " +
                "event_date, event_type, event_data) " +
            "VALUES (:sqsMessageId, :resourceTypeId, :resourceExternalId, :parentResourceExternalId, " +
                ":eventDate, :eventType, CAST(:eventData as jsonb))")
    @GetGeneratedKeys
    Long insert(@BindBean Event event, @Bind("resourceTypeId") int resourceTypeId);

    @Transaction
    default Long insertEventWithResourceTypeId(Event event) {
        int resourceTypeId = getResourceTypeDao().getResourceTypeIdByName(event.getResourceType().name());
        return insert(event, resourceTypeId);
    }
}

Again, I’ve ellided some of the code, and chopped some of the methods out for now…

First off, this is a JDBI interface, and the second thing I notice is that this is using jsonb to store the event data.

At this point, I want to know what the event data looks like, first off I look to see if there are any JSON fixtures in the test/resources directory of the project, but nothing.

A look at the tests, finds code that looks like this:

import static uk.gov.pay.ledger.util.fixture.EventFixture.anEventFixture;

Event event = anEventFixture()
    .withEventDate(CREATED_AT)
    .withParentResourceExternalId("parent-resource-id")
    .toEntity();

Which leads to a class called EventFixture, which is a Builder for event fixtures.

This gets me back to the Event class which originally I didn’t look too carefully at:

public class Event {

    @JsonIgnore
    private Long id;
    private String sqsMessageId;
    private ResourceType resourceType;
    private String resourceExternalId;
    private String parentResourceExternalId;
    @JsonSerialize(using = MicrosecondPrecisionDateTimeSerializer.class)
    private ZonedDateTime eventDate;
    private String eventType;
    private String eventData;
}

TransactionService

As expected, based on the more complex query endpoints in the TransactionResource, the TransactionService has more complexity, this definitely appears to be the reporting endpoint.

public class TransactionService {
    public Optional<TransactionView> getTransactionForGatewayAccount(String gatewayAccountId, String transactionExternalId, TransactionType transactionType, String parentTransactionExternalId) {}

    public Optional<TransactionView> getTransaction(String transactionExternalId) {}

    public TransactionsForTransactionResponse getTransactions(String parentTransactionExternalId, String gatewayAccountId) {}

    public TransactionSearchResponse searchTransactions(TransactionSearchParams searchParams, UriInfo uriInfo) {}

    public TransactionSearchResponse searchTransactions(String gatewayAccountId, TransactionSearchParams searchParams, UriInfo uriInfo) {}

    private List<TransactionView> mapToTransactionViewList(List<Transaction> transactionList) {}

    public void upsertTransactionFor(EventDigest eventDigest) {}
    public TransactionEventResponse findTransactionEvents(String externalId, String gatewayAccountId,
                                                          boolean includeAllEvents) {}

    private Map<String, TransactionEntity> getTransactionsAsMap(String externalId, String gatewayAccountId) {}

    private List<TransactionEvent> getTransactionEventsFor(Map<String, TransactionEntity> transactionEntityMap) {}

    private List<TransactionEvent> mapToTransactionEvent(Map<String, TransactionEntity> transactionEntityMap, List<Event> eventList) {}

    private List<TransactionEvent> removeDuplicates(List<TransactionEvent> transactionEvents) {}

    private TransactionsForTransactionResponse findTransactionsForParentExternalId(String parentTransactionExternalId, String gatewayAccountId) {}
}

Looking at this, I want to see a Transaction.

public abstract class Transaction {
}

There are two subclasses Payment and Refund which contain details of the transactions for example:

public class Payment extends Transaction{

    private String reference;
    private String description;
    @JsonSerialize(using = ToStringSerializer.class)
    private TransactionState state;
    private String language;
    private String returnUrl;
    private String email;
    private String paymentProvider;
    @JsonSerialize(using = ApiResponseDateTimeSerializer.class)
    private ZonedDateTime createdDate;
    private CardDetails cardDetails;
    private Boolean delayedCapture;
    private Map<String, Object> externalMetaData;
    @JsonIgnore
    private Integer eventCount;
    private String gatewayTransactionId;
    private Long corporateCardSurcharge;
    private Long fee;
    private Long netAmount;
    private Long totalAmount;
    private RefundSummary refundSummary;
    private SettlementSummary settlementSummary;

This pretty much completes a high-level overview of how this service works, there are Events, which refer to Transactions, and there is a Web API for querying Transactions in reports.

There’s an event-driven mechanism for inserting new Events (with Transactions) into a database.

As I’m building understanding of how a service works, I like to keep a rough sketch of the key parts:

Heart Sketch

Completed Diagram

This makes it easier to tidy it up for use later:

Heart Diagram

This distills the 61 classes in the original service, down to just 9, illustrating how transactions enter and are retrieved, I left out the Payment and Refund subclasses of the Transaction, and most of the SQS subsystem, but it reflects the logical design.