回顾

上一篇聊到了如何去保证转账功能的唯一性,和浅谈了一下加入了cas提升了转账行为的效率,这期我们来谈一谈在延时队列的任务加入后我们做了什么。

我们预估的系统的使用人数如果是一百万的话,如果每个人在使用这个平台完成了五笔合同,(都是按月转账),那么系统每个月需要处理的转账到达五百万笔,那么平均每秒需要完成的交易金额在19290笔左右,这时候我们单纯的开辟一个线程池去直接消费,线程池压力会非常大,同时使用线程池也不能够支持削峰,面对突然激增的活动肯定会出现故障,因此我在这个过程中牺牲了一定的转账的准确性。

实现

在我的实现中一共设计了三个消费主题

  1. @KafkaListener(topics = "contract-scheduled", groupId = "contract-group")延时队列发送过来的消息进行消费
  2. @KafkaListener(topics = "contract-scheduled-retries", groupId = "contract-group-retries")如果消费失败就进行重试
  3. @KafkaListener(topics = "contract-scheduled-dlq", groupId = "contract-group-dlq")如果重试次数大于3次,就进入死信队列等待管理员操作或者

另外再提一嘴kafka的消费模式,kafka使用的是被动拉的模式,kafka会在被注册后一直执行poll()操作进行轮询,但是

主消费队列

1
processTransferAsync(contractId, merchantId, unitAmount, transaction);

重试消息队列

尝试消息队列需要注意幂等性设计,因为我们的xxl-job也在做任务补偿,很可能造成重复转账

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
try {
RLock merchantLock = redissonClient.getLock("merchantLock" + merchantId);
try{
merchantLock.lock();
if (transaction.getRetryCount() < MAX_RETRY_COUNT) {
processTransferAsync(contractId, merchantId, unitAmount, transaction);
}
} finally {
merchantLock.unlock();
}
// 尝试再次进行转账
merchantServiceFeign.receiveTransferAccount(new TransferRequest(merchantId, unitAmount));

transaction.setStatus(Transaction.TransactionStatus.SUCCESS);
transactionDao.insertOrUpdateTransactionSuccess(transaction);
logger.info("Retry SUCCESS for contract:{} merchant:{} amount:{}", contractId, merchantId, unitAmount);
} catch (Exception e) {
//退避策略增强
logger.error("Retry failed for contract:{} merchant:{}, scheduling another retry", contractId, merchantId, e);
// 将消息再次推送到重试队列,增加延迟(可以增加延迟时间)
// 在重试发送时添加延时
kafkaTemplate.send(
"contract-scheduled-retries",
null,
System.currentTimeMillis(),
null,
transaction
).addCallback(
result -> {},
ex -> logger.error("Retry send failed", ex)
);
}

同时也需要进行注意使用退避策略,防止调用时间过长

死信队列

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 死信队列的设计是有缺陷的
* 是否应该加入自动重试机制,还是直接发送给管理员进行处理
* 日志如何保存,如何通知管理员
* 是否应该加入普罗米修斯+ grafana
*/
// 死信队列消费
@KafkaListener(topics = "contract-scheduled-dlq", groupId = "contract-group-dlq")
public void handleDeadLetterTransaction(ConsumerRecord<String, Transaction> record) {
Transaction transaction = record.value();
logger.error("Dead letter transaction for contract:{} merchant:{}", transaction.getContractId(), transaction.getMerchantId());
// 此处可以将失败的交易记录到数据库或发送通知给管理员等操作
}

Q&A

另外考虑一下

  1. 如果kafka消费者宕机了怎么办
  2. 在高并发场景下,kafka性能如何保证
  3. 以及一些其他的问题的理解我放在了我的kafka笔记篇了