Идемпотентная обработка сообщений

Код

@Service
@Slf4j
public class SpendingServiceImpl implements SpendingService {
    private final SpendingGroupRepository spendingGroupRepository;
    private final SpendingRepository spendingRepository;
    private final IdempotencyRepository idempotencyRepository;
    private final TransactionTemplate transactionTemplate;

    public SpendingServiceImpl(SpendingGroupRepository spendingGroupRepository, SpendingRepository spendingRepository, IdempotencyRepository idempotencyRepository, PlatformTransactionManager platformTransactionManager) {
        this.spendingGroupRepository = spendingGroupRepository;
        this.spendingRepository = spendingRepository;
        this.idempotencyRepository = idempotencyRepository;
        this.transactionTemplate = new TransactionTemplate(platformTransactionManager);
        this.transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
    }

    @Override
    public void addSpending(SpendingMessageDto spendingMessageDto) {
        SpendingGroup spendingGroup;
        try {
            spendingGroup = spendingGroupRepository
                    .findByExternalId(spendingMessageDto.getSpendingGroupId())
                    .orElseGet(() -> spendingGroupRepository.save(SpendingGroup.builder()
                            .externalId(spendingMessageDto.getSpendingGroupId())
                            .description("Banana").build()));
        } catch (DataIntegrityViolationException e) {
            // Parallel Insert Violates Unique Key, get the entry
            spendingGroup = spendingGroupRepository.findByExternalId(spendingMessageDto.getSpendingGroupId()).orElseThrow();
        }

        if (spendingGroup == null) {
            throw new RuntimeException(""); // TODO: Real exception
        }

        Idempotency idempotency;
        try {
            // Needs transaction for LOB (TEXT)
            idempotency = transactionTemplate.execute(transactionStatus -> idempotencyRepository.findByIdempotencyKey(spendingMessageDto.getIdempotencyKey())
                    .orElseGet(() -> idempotencyRepository.save(Idempotency.builder()
                            .idempotencyKey(spendingMessageDto.getIdempotencyKey())
                            .status(IdempotencyStatus.CREATED)
                            .build())));
        } catch (DataIntegrityViolationException e) {
            // Parallel Insert Violates Unique Key, get the entry
            idempotency = idempotencyRepository.findByIdempotencyKey(spendingMessageDto.getIdempotencyKey()).orElseThrow();
        }

        if (idempotency == null) {
            throw new RuntimeException(""); // TODO: Real exception
        }

        switch (idempotency.getStatus()) {
            case ERROR_NOT_RETRYABLE, FINISHED -> {
                log.info("Returning response: " + idempotency.getStatus() + " - " + idempotency.getResponse());
                return;
            }
        }

        // Now try to lock the row
        final SpendingGroup spendingGroup1 = spendingGroup;
        transactionTemplate.executeWithoutResult(transactionStatus -> {
            // PESSIMISTIC_WRITE, SHOULD block all others with SELECT FOR UPDATE
            Idempotency idm = idempotencyRepository.findByIdempotencyKeyPW(spendingMessageDto.getIdempotencyKey()).orElseThrow();
            switch (idm.getStatus()) {
                case ERROR_NOT_RETRYABLE, FINISHED -> {
                    log.info("Returning response: " + idm.getStatus() + " - " + idm.getResponse());
                    return;
                }
            }
            log.warn("-----------PROCESSING----------");

            Spending spending = Spending.builder()
                    .spendingGroupId(spendingGroup1.getId())
                    .amount(new BigDecimal(spendingMessageDto.getAmount()))
                    .build();
            spendingRepository.save(spending);
            idm.setStatus(IdempotencyStatus.FINISHED);
            idm.setCode(200);
            idm.setResponse("Ok");
        });
    }
}

Какая цель?

Из очереди сообщений доставляется много параллельных сообщений. Эти сообщения содержат «платеж» и следуют шаблону amount=11.11;spending_group=adeu47;idempotency_key={uuid}

Так что по сути Payment Group — это сборщик платежей. И каждый платеж нужно вводить только один раз. Но брокер сообщений имеет механизм повтора, когда сообщение не подтверждено вовремя.

Текущий дизайн

  1. Для каждого сообщения проверяйте, существует ли группа расходов. Если не вставьте.
    • Это может потерпеть неудачу, если параллельный процесс также читает и пытается вставить. Тогда решение — прочитать написанное. Правильно ли здесь поймать UniqueKeyViolation?
  2. То же самое с ключом идемпотентности, получить или создать, поймать UniqueKeyViolation
  3. Если запрос идемпотентности находится в конечном состоянии (в основном он уже был отправлен), верните сохраненный ответ.
  4. Если запрос идемпотентности имеет статус новый или повторный, выберите снова, на этот раз с пессимистической блокировкой записи.
    • Есть ли лучший способ сделать это? Двойной выбор кажется странным. Но мне нужна блокировка только тогда, когда состояние не завершено.
  5. Удерживая блокировку строки, проделываем обработку. затем сохраните ответ и установите статус FINISHED.

Дополнительный вопрос

  1. Я попробовал дизайн в параллельном тестовом примере, и он, похоже, сработал. Есть ли явный недосмотр?

0

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *