Код
@Service
@Slf4j
public class SpendingServiceImpl implements SpendingService {
private final SpendingGroupRepository spendingGroupRepository;
private final SpendingRepository spendingRepository;
private final IdempotencyRepository idempotencyRepository;
private final TransactionTemplate transactionTemplate;
public SpendingServiceImpl(SpendingGroupRepository spendingGroupRepository, SpendingRepository spendingRepository, IdempotencyRepository idempotencyRepository, PlatformTransactionManager platformTransactionManager) {
this.spendingGroupRepository = spendingGroupRepository;
this.spendingRepository = spendingRepository;
this.idempotencyRepository = idempotencyRepository;
this.transactionTemplate = new TransactionTemplate(platformTransactionManager);
this.transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
}
@Override
public void addSpending(SpendingMessageDto spendingMessageDto) {
SpendingGroup spendingGroup;
try {
spendingGroup = spendingGroupRepository
.findByExternalId(spendingMessageDto.getSpendingGroupId())
.orElseGet(() -> spendingGroupRepository.save(SpendingGroup.builder()
.externalId(spendingMessageDto.getSpendingGroupId())
.description("Banana").build()));
} catch (DataIntegrityViolationException e) {
// Parallel Insert Violates Unique Key, get the entry
spendingGroup = spendingGroupRepository.findByExternalId(spendingMessageDto.getSpendingGroupId()).orElseThrow();
}
if (spendingGroup == null) {
throw new RuntimeException(""); // TODO: Real exception
}
Idempotency idempotency;
try {
// Needs transaction for LOB (TEXT)
idempotency = transactionTemplate.execute(transactionStatus -> idempotencyRepository.findByIdempotencyKey(spendingMessageDto.getIdempotencyKey())
.orElseGet(() -> idempotencyRepository.save(Idempotency.builder()
.idempotencyKey(spendingMessageDto.getIdempotencyKey())
.status(IdempotencyStatus.CREATED)
.build())));
} catch (DataIntegrityViolationException e) {
// Parallel Insert Violates Unique Key, get the entry
idempotency = idempotencyRepository.findByIdempotencyKey(spendingMessageDto.getIdempotencyKey()).orElseThrow();
}
if (idempotency == null) {
throw new RuntimeException(""); // TODO: Real exception
}
switch (idempotency.getStatus()) {
case ERROR_NOT_RETRYABLE, FINISHED -> {
log.info("Returning response: " + idempotency.getStatus() + " - " + idempotency.getResponse());
return;
}
}
// Now try to lock the row
final SpendingGroup spendingGroup1 = spendingGroup;
transactionTemplate.executeWithoutResult(transactionStatus -> {
// PESSIMISTIC_WRITE, SHOULD block all others with SELECT FOR UPDATE
Idempotency idm = idempotencyRepository.findByIdempotencyKeyPW(spendingMessageDto.getIdempotencyKey()).orElseThrow();
switch (idm.getStatus()) {
case ERROR_NOT_RETRYABLE, FINISHED -> {
log.info("Returning response: " + idm.getStatus() + " - " + idm.getResponse());
return;
}
}
log.warn("-----------PROCESSING----------");
Spending spending = Spending.builder()
.spendingGroupId(spendingGroup1.getId())
.amount(new BigDecimal(spendingMessageDto.getAmount()))
.build();
spendingRepository.save(spending);
idm.setStatus(IdempotencyStatus.FINISHED);
idm.setCode(200);
idm.setResponse("Ok");
});
}
}
Какая цель?
Из очереди сообщений доставляется много параллельных сообщений. Эти сообщения содержат «платеж» и следуют шаблону amount=11.11;spending_group=adeu47;idempotency_key={uuid}
Так что по сути Payment Group — это сборщик платежей. И каждый платеж нужно вводить только один раз. Но брокер сообщений имеет механизм повтора, когда сообщение не подтверждено вовремя.
Текущий дизайн
- Для каждого сообщения проверяйте, существует ли группа расходов. Если не вставьте.
- Это может потерпеть неудачу, если параллельный процесс также читает и пытается вставить. Тогда решение — прочитать написанное. Правильно ли здесь поймать UniqueKeyViolation?
- То же самое с ключом идемпотентности, получить или создать, поймать UniqueKeyViolation
- Если запрос идемпотентности находится в конечном состоянии (в основном он уже был отправлен), верните сохраненный ответ.
- Если запрос идемпотентности имеет статус новый или повторный, выберите снова, на этот раз с пессимистической блокировкой записи.
- Есть ли лучший способ сделать это? Двойной выбор кажется странным. Но мне нужна блокировка только тогда, когда состояние не завершено.
- Удерживая блокировку строки, проделываем обработку. затем сохраните ответ и установите статус FINISHED.
Дополнительный вопрос
- Я попробовал дизайн в параллельном тестовом примере, и он, похоже, сработал. Есть ли явный недосмотр?