Heart and Soul - part 2
In part 1 of this series, I started digging into https://github.com/alphagov/pay-ledger.
At the end, I had uncovered the following application classes:
- EventResource, Event and EventDao
- TransactionResource, Transaction, TransactionService, TransactionView, TransactionSearchResponse, TransactionEventResponse and TransactionsForTransactionResponse
- HealthCheckResource
- SQSHealthCheck
- QueueMessageReceiver
Starting with QueueMessageReceiver
things start to get more detailed:
public class QueueMessageReceiver implements Managed {
@Inject
public QueueMessageReceiver(
Environment environment...EventMessageHandler eventMessageHandler) {
this.eventMessageHandler = eventMessageHandler;
scheduledExecutorService = environment
.lifecycle()
.scheduledExecutorService(QUEUE_MESSAGE_RECEIVER_THREAD_NAME)
.threads(queueReadScheduleNumberOfThreads)
.build();
}
@Override
public void start() {
long initialDelay = config.getThreadDelayInMilliseconds();
long delay = config.getThreadDelayInMilliseconds();
scheduledExecutorService.scheduleWithFixedDelay(
this::receive,
initialDelay,
delay,
TimeUnit.MILLISECONDS
);
}
private void receive() {
eventMessageHandler.handle();
}
@Override
public void stop() {
scheduledExecutorService.shutdown();
}
}
I’ve ellided some of the code for clarity.
This sets up a scheduled poll of something called an EventMessageHandler
and calls handle
on it. The executor is provided by the Dropwizard environment.
The Managed
interface gets started and stopped by Dropwizard.
EventMessageHandler
public class EventMessageHandler {
@Inject
public EventMessageHandler(EventQueue eventQueue, EventService eventService, TransactionService transactionService) {
this.eventQueue = eventQueue;
this.eventService = eventService;
this.transactionService = transactionService;
}
public void handle() throws QueueException {
List<EventMessage> eventMessages = eventQueue.retrieveEvents();
for (EventMessage message : eventMessages) {
processSingleMessage(message);
}
}
void processSingleMessage(EventMessage message) throws QueueException {
Event event = message.getEvent();
CreateEventResponse response = eventService.createIfDoesNotExist(event);
if(response.isSuccessful()) {
EventDigest eventDigest = eventService.getEventDigestForResource(event.getResourceExternalId());
transactionService.upsertTransactionFor(eventDigest);
eventQueue.markMessageAsProcessed(message);
} else {
eventQueue.scheduleMessageForRetry(message);
}
}
}
This is fairly easy to understand, the poller calls handle
which fetches
events from an EventQueue
, and then processes them individually.
It uses an EventService
and TransactionService
to get a digest and then save
the Event
, and then mark the event as processed, or, failing that, schedule it
to be retried.
EventQueue
EventQueue
is a wrapper around an SqsQueueService
:
public class EventQueue {
private SqsQueueService sqsQueueService;
@Inject
public EventQueue(SqsQueueService sqsQueueService, LedgerConfig configuration, ObjectMapper objectMapper) {
}
public List<EventMessage> retrieveEvents() throws QueueException {
List<QueueMessage> queueMessages = sqsQueueService
.receiveMessages(this.eventQueueUrl, EVENT_MESSAGE_ATTRIBUTE_NAME);
return queueMessages
.stream()
.map(this::getMessage)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
public void markMessageAsProcessed(EventMessage message) throws QueueException {
sqsQueueService.deleteMessage(this.eventQueueUrl, message.getQueueMessageReceiptHandle());
}
public void scheduleMessageForRetry(EventMessage message) throws QueueException {
sqsQueueService.deferMessage(this.eventQueueUrl, message.getQueueMessageReceiptHandle(), retryDelayInSeconds);
}
private EventMessage getMessage(QueueMessage queueMessage) {
EventMessageDto eventDto = objectMapper.readValue(queueMessage.getMessageBody(), EventMessageDto.class);
return EventMessage.of(eventDto, queueMessage);
}
}
This is just fetching events from SQS, and parsing them, using an ObjectMapper
, which means that the events are coming in in JSON.
I made a rough sketch as I went along of the SQS subsystem.
Essentially, the EventMessageHandler
polls SQS using the EventQueue
, which decodes the messages from JSON, it then uses the EventService
to create a record, and then records it using the TransactionService
, which the API can query for.
I need to understand a few more bits to fully understand the code:
EventService
public class EventService {
private EventDao eventDao;
@Inject
public EventService(EventDao eventDao) {
this.eventDao = eventDao;
}
public EventDigest getEventDigestForResource(String resourceExternalId) {
List<Event> events = eventDao.getEventsByResourceExternalId(resourceExternalId);
return EventDigest.fromEventList(events);
}
public CreateEventResponse createIfDoesNotExist(Event event) {
try {
Optional<Long> status = eventDao.insertEventIfDoesNotExistWithResourceTypeId(event);
return new CreateEventResponse(status);
} catch (Exception e) {
return new CreateEventResponse(e);
}
}
}
This is a simple wrapper around the EventDao
, this must be where the magic
is…
EventDao
public interface EventDao {
@CreateSqlObject
ResourceTypeDao getResourceTypeDao();
@SqlQuery("SELECT e.id, e.sqs_message_id, rt.name AS resource_type_name, e.resource_external_id, " +
"e.parent_resource_external_id, e.event_date," +
"e.event_type, e.event_data FROM event e, resource_type rt WHERE e.resource_external_id = :resourceExternalId" +
" AND e.resource_type_id = rt.id ORDER BY e.event_date DESC")
List<Event> getEventsByResourceExternalId(@Bind("resourceExternalId") String resourceExternalId);
@SqlUpdate("INSERT INTO event(sqs_message_id, resource_type_id, resource_external_id, parent_resource_external_id, " +
"event_date, event_type, event_data) " +
"VALUES (:sqsMessageId, :resourceTypeId, :resourceExternalId, :parentResourceExternalId, " +
":eventDate, :eventType, CAST(:eventData as jsonb))")
@GetGeneratedKeys
Long insert(@BindBean Event event, @Bind("resourceTypeId") int resourceTypeId);
@Transaction
default Long insertEventWithResourceTypeId(Event event) {
int resourceTypeId = getResourceTypeDao().getResourceTypeIdByName(event.getResourceType().name());
return insert(event, resourceTypeId);
}
}
Again, I’ve ellided some of the code, and chopped some of the methods out for now…
First off, this is a JDBI interface, and the second thing I notice is that this is using jsonb
to store the event data.
At this point, I want to know what the event data looks like, first off I look to see if there are any JSON fixtures in the test/resources
directory of the project, but nothing.
A look at the tests, finds code that looks like this:
import static uk.gov.pay.ledger.util.fixture.EventFixture.anEventFixture;
Event event = anEventFixture()
.withEventDate(CREATED_AT)
.withParentResourceExternalId("parent-resource-id")
.toEntity();
Which leads to a class called EventFixture
, which is a Builder
for event fixtures.
This gets me back to the Event
class which originally I didn’t look too carefully at:
public class Event {
@JsonIgnore
private Long id;
private String sqsMessageId;
private ResourceType resourceType;
private String resourceExternalId;
private String parentResourceExternalId;
@JsonSerialize(using = MicrosecondPrecisionDateTimeSerializer.class)
private ZonedDateTime eventDate;
private String eventType;
private String eventData;
}
TransactionService
As expected, based on the more complex query endpoints in the TransactionResource
, the TransactionService
has more complexity, this definitely appears to be the reporting endpoint.
public class TransactionService {
public Optional<TransactionView> getTransactionForGatewayAccount(String gatewayAccountId, String transactionExternalId, TransactionType transactionType, String parentTransactionExternalId) {}
public Optional<TransactionView> getTransaction(String transactionExternalId) {}
public TransactionsForTransactionResponse getTransactions(String parentTransactionExternalId, String gatewayAccountId) {}
public TransactionSearchResponse searchTransactions(TransactionSearchParams searchParams, UriInfo uriInfo) {}
public TransactionSearchResponse searchTransactions(String gatewayAccountId, TransactionSearchParams searchParams, UriInfo uriInfo) {}
private List<TransactionView> mapToTransactionViewList(List<Transaction> transactionList) {}
public void upsertTransactionFor(EventDigest eventDigest) {}
public TransactionEventResponse findTransactionEvents(String externalId, String gatewayAccountId,
boolean includeAllEvents) {}
private Map<String, TransactionEntity> getTransactionsAsMap(String externalId, String gatewayAccountId) {}
private List<TransactionEvent> getTransactionEventsFor(Map<String, TransactionEntity> transactionEntityMap) {}
private List<TransactionEvent> mapToTransactionEvent(Map<String, TransactionEntity> transactionEntityMap, List<Event> eventList) {}
private List<TransactionEvent> removeDuplicates(List<TransactionEvent> transactionEvents) {}
private TransactionsForTransactionResponse findTransactionsForParentExternalId(String parentTransactionExternalId, String gatewayAccountId) {}
}
Looking at this, I want to see a Transaction
.
public abstract class Transaction {
}
There are two subclasses Payment
and Refund
which contain details of the transactions for example:
public class Payment extends Transaction{
private String reference;
private String description;
@JsonSerialize(using = ToStringSerializer.class)
private TransactionState state;
private String language;
private String returnUrl;
private String email;
private String paymentProvider;
@JsonSerialize(using = ApiResponseDateTimeSerializer.class)
private ZonedDateTime createdDate;
private CardDetails cardDetails;
private Boolean delayedCapture;
private Map<String, Object> externalMetaData;
@JsonIgnore
private Integer eventCount;
private String gatewayTransactionId;
private Long corporateCardSurcharge;
private Long fee;
private Long netAmount;
private Long totalAmount;
private RefundSummary refundSummary;
private SettlementSummary settlementSummary;
This pretty much completes a high-level overview of how this service works, there are Events, which refer to Transactions, and there is a Web API for querying Transactions in reports.
There’s an event-driven mechanism for inserting new Events (with Transactions) into a database.
As I’m building understanding of how a service works, I like to keep a rough sketch of the key parts:
Completed Diagram
This makes it easier to tidy it up for use later:
This distills the 61 classes in the original service, down to just 9, illustrating how transactions enter and are retrieved, I left out the Payment
and Refund
subclasses of the Transaction, and most of the SQS subsystem, but it reflects the logical design.