在服务中需要通过企微的接口向指定的群发送消息。但是企业微信对接口的调用有频率限制,超过这个限制之后它默认就把消息给丢弃了。微信这么做是出于自保,防止自己的接口被无限制的调用,从而导致资源被消耗殆尽。而我们的业务现实是,再短时间内可能会突发大量的消息需要发送,但是不能因为这些消息太多,导致消息丢失。两者的根本矛盾就在这里,因此我们需要对消息发送的频率进行控制。
一开始我想的是使用mq先把消息存储起来,这样可以防止消息丢失,方便以后出现异常之后的消息重试。但是就算消息存储到mq中也只是解决了消息丢失的问题,对于消费速率的控制问题还是没有解决。mq的本质是因为需要处理的东西太多,消费者的处理能力处理不过来,需要先把东西临时存储起来,存储起来之后,消费者再慢慢消费,或者通过增加消费者的方式提高处理的效率,它本身不是为了限制效率的,而是因为效率被限制了才想到的解决方案。因此我们必须使用一个限制速率的手段,来保证调用企微接口的频率不超过企微的频率限制。
这里我们为了快速的限流防止消息丢失,我最先想到的是先上一个单机的限流器,原因是因为我们的并发并不高,但是会有陡增的请求,因此为了快度的解决,我们就先用单机限流器来解决。后续如果还有问题,我们再上分布式的限流器。
问题就变成如何快速实现一个单机的限流器或者找到一个已经开源的好用的限流器了。我以前使用过 guava 的 RateLimiter。 现在有了强大的AI ,我又搜索到了其他几个限流器。在这里再记录并回顾一下关于限流方面的一些知识。
/**
* 测试限流的场景。1分钟最多两个请求,超过之后剩下的直接丢弃
*/
@Test
public void test01() throws InterruptedException {
// 每秒创建2个令牌
RateLimiter limiter = RateLimiter.create(2);
CompletableFuture[] futures = IntStream.range(0, 5).mapToObj(i -> CompletableFuture.runAsync(() -> {
if (limiter.tryAcquire()) {
log.info("我拿到令牌,我执行了{}", i);
} else {
log.info("我没有令牌,我放弃了{}", i);
}
})).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
}
上面会跟你想象的不太一样
2024-12-01 16:23:17 [ForkJoinPool.commonPool-worker-4] INFO com.seed.server.utils.GuavaRateLimiterTest - 我没有令牌,我放弃了3
2024-12-01 16:23:17 [ForkJoinPool.commonPool-worker-11] INFO com.seed.server.utils.GuavaRateLimiterTest - 我没有令牌,我放弃了2
2024-12-01 16:23:17 [ForkJoinPool.commonPool-worker-2] INFO com.seed.server.utils.GuavaRateLimiterTest - 我拿到令牌,我执行了1
2024-12-01 16:23:17 [ForkJoinPool.commonPool-worker-13] INFO com.seed.server.utils.GuavaRateLimiterTest - 我没有令牌,我放弃了4
2024-12-01 16:23:17 [ForkJoinPool.commonPool-worker-9] INFO com.seed.server.utils.GuavaRateLimiterTest - 我没有令牌,我放弃了0