场景

在我的项目中,有遇到一个需要定期给用户转账的场景,但是最开始的想法是使用spring schduled来进行做定时任务,但是由于做逻辑的时候发现如果使用schduled来进行延迟性比较大,最后在网上查阅资料后发现可以使用redis的delayed queue来做定时任务,但是redis只能在单机节点上进行,如果在分布式场景下会导致重复消费,因此使用了redission来进行实现,同时在使用redission后为了保证交易的准确性,加入了xxl-job来做分布式分片广播定时任务来确保交易的完整性

实践

  1. 在使用redission的时候需要实现两个队列
1
2
RBlockingQueue<T> rBlockingQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);

第一个阻塞队列是添加已经到时间的资源
第二个是一个延迟队列接受传入的资源,并且进行到点弹出

  1. 启动一个监听延迟队列的任务
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    RBlockingQueue<T> queue = redissonClient.getBlockingQueue(listenerName);
    Thread thread = new Thread(() -> {
    logger.info("启动监听队列线程" + listenerName);
    while(true) {
    try {
    //用于从阻塞队列获取并且移除对头元素
    T t = queue.take();
    logger.info("获取到队列线程:{},获取到队列值{}", listenerName, t);
    new Thread(() ->{
    taskEventListenerEntry.invoke(t);
    }).start();
    } catch (InterruptedException e) {
    logger.warn("监听到线程错误",e);
    try {
    Thread.sleep(10000);
    } catch (InterruptedException ex) {
    }
    }
    }
    });
  2. 如果take后数据后那么就加入对应的监听队列的invoke方法
    1
    2
    3
    4
    5
    @Override
    public void invoke(ContractScheduledRequest contractScheduledRequest) {
    logger.info("fulfill contract, contractId:{}, merchantId:{}", contractScheduledRequest.getContractId(), contractScheduledRequest.getMerchantId());
    kafkaTemplate.send(TOPIC, contractScheduledRequest);
    }
    我们可以注意到我们将定时任务执行的任务发送到了kafka进行消费,这里我们为了不阻塞任务的执行选择异步去进行消费

同时我们为了解决如果转账失败我们应该怎么办,我们需要引入xxl-job,但是有一个问题就是如果我们为什么选择xxl-job,不选择spring scheduler,为了方便分布式环境使用,为什么我们要在补偿的时候选择分片广播呢,因为我们可以发现,在redis delayed queue中我们使用的是主动拉的模式,我们不需要去做重复消费的问题,而在分布环境中我们被动扫描的话需要解决重复消费的问题,因此需要引入分片的做法。同时如果发生故障,我们应该需要支持故障转移。

xxl-job

使用起来很简单,配好config后添加一个handler就可以

1
2
3
4
5
6
7
8
9
10
11
// 当前分片
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
List<Transaction> transactions = transactionDao.getFailedTransactionsByShard(
Transaction.TransactionStatus.FAILED, shardIndex, shardTotal
);

for(Transaction transaction : transactions) {
kafkaTemplate.send(TOPIC, transaction);
}
XxlJobHelper.log("分片处理完成: shardIndex={}, 处理条数={}", shardIndex, transactions.size())

process diagram

Q&A

  1. 为什么转账的过程放在kafka中?

    考虑到转账任务的数量比较大,不应该阻塞在转账阶段,介于Kafka的高吞吐量以及kafka的并发处理能力较大,同时kafka支持消息持久化,即使系统发生故障也不会消失。

  2. kafka进行处理会降低转账速度吗?如何应对
    会,但也比阻塞take的操作好,同时,我的建议是到期转账的时候可以在发送一个消息给user,先告知转账已经启动,让转账系统进行流转。