回顾
上一篇聊到了如何去保证转账功能的唯一性,和浅谈了一下加入了cas提升了转账行为的效率,这期我们来谈一谈在延时队列的任务加入后我们做了什么。
我们预估的系统的使用人数如果是一百万的话,如果每个人在使用这个平台完成了五笔合同,(都是按月转账),那么系统每个月需要处理的转账到达五百万笔,那么平均每秒需要完成的交易金额在19290笔左右,这时候我们单纯的开辟一个线程池去直接消费,线程池压力会非常大,同时使用线程池也不能够支持削峰,面对突然激增的活动肯定会出现故障,因此我在这个过程中牺牲了一定的转账的准确性。
实现
在我的实现中一共设计了三个消费主题
@KafkaListener(topics = "contract-scheduled", groupId = "contract-group")
延时队列发送过来的消息进行消费
@KafkaListener(topics = "contract-scheduled-retries", groupId = "contract-group-retries")
如果消费失败就进行重试
@KafkaListener(topics = "contract-scheduled-dlq", groupId = "contract-group-dlq")
如果重试次数大于3次,就进入死信队列等待管理员操作或者
另外再提一嘴kafka的消费模式,kafka使用的是被动拉的模式,kafka会在被注册后一直执行poll()操作进行轮询,但是
主消费队列
1
| processTransferAsync(contractId, merchantId, unitAmount, transaction);
|
重试消息队列
尝试消息队列需要注意幂等性设计,因为我们的xxl-job也在做任务补偿,很可能造成重复转账
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| try { RLock merchantLock = redissonClient.getLock("merchantLock" + merchantId); try{ merchantLock.lock(); if (transaction.getRetryCount() < MAX_RETRY_COUNT) { processTransferAsync(contractId, merchantId, unitAmount, transaction); } } finally { merchantLock.unlock(); } merchantServiceFeign.receiveTransferAccount(new TransferRequest(merchantId, unitAmount));
transaction.setStatus(Transaction.TransactionStatus.SUCCESS); transactionDao.insertOrUpdateTransactionSuccess(transaction); logger.info("Retry SUCCESS for contract:{} merchant:{} amount:{}", contractId, merchantId, unitAmount); } catch (Exception e) { logger.error("Retry failed for contract:{} merchant:{}, scheduling another retry", contractId, merchantId, e); kafkaTemplate.send( "contract-scheduled-retries", null, System.currentTimeMillis(), null, transaction ).addCallback( result -> {}, ex -> logger.error("Retry send failed", ex) ); }
|
同时也需要进行注意使用退避策略,防止调用时间过长
死信队列
1 2 3 4 5 6 7 8 9 10 11 12 13
|
@KafkaListener(topics = "contract-scheduled-dlq", groupId = "contract-group-dlq") public void handleDeadLetterTransaction(ConsumerRecord<String, Transaction> record) { Transaction transaction = record.value(); logger.error("Dead letter transaction for contract:{} merchant:{}", transaction.getContractId(), transaction.getMerchantId()); }
|
Q&A
另外考虑一下
- 如果kafka消费者宕机了怎么办
- 在高并发场景下,kafka性能如何保证
- 以及一些其他的问题的理解我放在了我的kafka笔记篇了