随笔分类
RateLimiter
限流是保护高并发系统的三大利器之一,另外两个便是缓存与降级
顾名思义,限流即限定一定时间内处理请求的数量,限流业务场景十分普遍,如限定连接池指定时间内可获取的数量,限制瞬时并发数,也可以相应地来去做些预热操作,实现方案很多,Java中 Semaphore便是一种不错的可行方案
而 Guava中的 RateLimiter便是一种限流方案的实现,其实现思想便是基于令牌桶算法:往固定大小的一个桶中按指定速率存放令牌,桶有容量限制,溢出的令牌会被拒绝或者丢弃,到来的每一个网络请求都需要获取到桶中的一个令牌,获取到令牌则可以执行,获取不到则会阻塞等待或者请求直接被拒绝,具体操作取决于实现,但如果是笔者的话,则会倾向于阻塞请求,可以将其存放于队列中,以来保证请求得到处理的同时保证了公平性,避免饥饿线程的发生,而这带来的缺点便是系统整体吞吐量不会很高
令牌桶区别于漏桶算法,而这虽然在实现设计上类似,但还是需要区别对待:
- 漏桶是容器按照指定速率出水,而进水的速率随意,当进水溢出时,新的进水请求将被拒绝,对于漏桶而言,其注重在于出水口的限制固定速率;令牌桶则是按照指定速率往桶内添加令牌,请求是否能得到处理取决于桶内是否有令牌,获取不到令牌后的操作依据具体实现不同而不同
- 漏桶是按照指定出水速率出水,无法去处理突发请求量;而令牌桶可以,只要桶内存在一定数量令牌,可以去预支付一定数目的令牌,这实际上也是 RateLimiter的特性,即对于令牌桶而言有能力去处理一定程度的突发请求,而漏桶无法做到
常见的限流算法实际上还有:计数器算法、滑动窗口算法等
笔者认为 Guava的 RateLimiter便是令牌桶算法比较优秀的一种实现思路,RateLimiter体现了令牌桶的特性:能够预先支付一定数量的令牌,并且这支付的代价由下一个请求的线程来承担,当前线程不需要做其他额外的事情
/**
* 2021-11-12 20:16:34.483 INFO 31162 --- [ main] c.e.j.g.g.concurrent.RateLimiterTest : 第0次获取结果
* 2021-11-12 20:16:34.882 INFO 31162 --- [ main] c.e.j.g.g.concurrent.RateLimiterTest : 第1次获取结果
* 2021-11-12 20:16:35.282 INFO 31162 --- [ main] c.e.j.g.g.concurrent.RateLimiterTest : 第2次获取结果
* 1s内获取令牌次数:3
*/
@Test
public void testForSimple() {
RateLimiter rateLimiter = RateLimiter.create(3L);
Stopwatch stopwatch = Stopwatch.createStarted();
AtomicLong cnt = new AtomicLong();
Boolean loop = Boolean.TRUE;
while (loop) {
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) % 200 == 0) {
boolean getBucket = rateLimiter.tryAcquire();
if (getBucket) {
log.info("第{}次获取结果", cnt.getAndIncrement());
}
}
if (stopwatch.elapsed(TimeUnit.SECONDS) == 1L) loop = Boolean.FALSE;
}
System.out.println(StrUtil.format("1s内获取令牌次数:{}", cnt.get()));
}
限流器能以可配置的速率去下发许可证, 这不一定能够保证公平, 可能会出现饥饿现象
上述例子便是我们使用 RateLimiter常用的姿势,我们着重于源码的讲解:
// 参数 - 每秒允许下发的许可证数量
// 创建出具有稳定吞吐量的 RateLimiter
public static RateLimiter create(double permitsPerSecond) {
/*
* The default RateLimiter configuration can save the unused permits of up to one second. This
* is to avoid unnecessary stalls in situations like this: A RateLimiter of 1qps, and 4 threads,
* all calling acquire() at these moments:
*
* T0 at 0 seconds
* T1 at 1.05 seconds
* T2 at 2 seconds
* T3 at 3 seconds
*
* Due to the slight delay of T1, T2 would have to sleep till 2.05 seconds, and T3 would also
* have to sleep till 3.05 seconds.
*/
// 创建默认的限流器
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
SleepingStopWatch里头封装着 StopWatch
abstract static class SleepingStopwatch {
/**
* Constructor for use by subclasses.
*/
protected SleepingStopwatch() {}
/*
* We always hold the mutex when calling this. TODO(cpovirk): Is that important? Perhaps we need
* to guarantee that each call to reserveEarliestAvailable, etc. sees a value >= the previous?
* Also, is it OK that we don't hold the mutex when sleeping?
*/
protected abstract long readMicros();
protected abstract void sleepMicrosUninterruptibly(long micros);
// SleepingStopWatch里面封装着 StopWatch
public static SleepingStopwatch createFromSystemTimer() {
return new SleepingStopwatch() {
// 创建并开始计时
final Stopwatch stopwatch = Stopwatch.createStarted();
// 读取计时
@Override
protected long readMicros() {
return stopwatch.elapsed(MICROSECONDS);
}
@Override
protected void sleepMicrosUninterruptibly(long micros) {
if (micros > 0) {
// 非中断的进行 sleep - 这更安全以及保证中断状态的原有性
Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS);
}
}
};
}
}
然后接着便是重载 create()的调用
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
// 默认去处理的便是一个具有丝滑地去处理突发流量的限流器
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
// 设置每秒的吞吐量
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
可见,默认去创建的限流器便是 SmoothBursty,这是一个能够平滑去处理突发流量的限流器
RateLimiter提供给我们两种可选择的限流器:平滑突发限流器、平滑预热限流器
而这两种限流器实际上都是 SmoothRateLimiter的扩展子类
有些字段我们需要注意下其含义:
/** The currently stored permits. */
// 当前桶内存储的令牌数目
double storedPermits;
/** The maximum number of stored permits. */
// 桶内最大可存储的令牌数目
double maxPermits;
/**
* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
* per second has a stable interval of 200ms.
*/
// 获取令牌的间隔段
double stableIntervalMicros;
/**
* The time when the next request (no matter its size) will be granted. After granting a request,
* this is pushed further in the future. Large requests push this further than small requests.
*/
// 下一次请求可以获取到令牌的时间, 每次请求获取到令牌后该时间都会推后到未来的某一时刻
private long nextFreeTicketMicros = 0L; // could be either in the past or future
SmoothBursty
平滑突发限流器
case:
/**
* 可以保证一秒内获取的令牌不会超过 5个
* 此次获取令牌耗时:0.0
* 此次获取令牌耗时:0.160247
* 此次获取令牌耗时:0.193877
* 此次获取令牌耗时:0.197077
* 此次获取令牌耗时:0.197204
* 总耗时:803
*/
@Test
public void testForBurstyAcquire() {
RateLimiter rateLimiter = RateLimiter.create(5);
Stopwatch stopwatch = Stopwatch.createStarted();
Long cnt = 0L;
while (true) {
System.out.println(StrUtil.format("此次获取令牌耗时:{}", rateLimiter.acquire()));
cnt ++;
if (cnt == 5) {
System.out.println("总耗时:" + stopwatch.elapsed(TimeUnit.MILLISECONDS));
return;
}
}
}
// 创建一个能够丝滑地去处理突发流量的限流器
static final class SmoothBursty extends SmoothRateLimiter {
/** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
// 默认会是 1.0
final double maxBurstSeconds;
SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
super(stopwatch);
this.maxBurstSeconds = maxBurstSeconds;
}
private SmoothRateLimiter(SleepingStopwatch stopwatch) {
super(stopwatch);
}
RateLimiter(SleepingStopwatch stopwatch) {
this.stopwatch = checkNotNull(stopwatch);
}
默认去创建的便是一个平滑突发限流器,当请求到来时,acquire()获取令牌会发生什么?
// 返回值:获取当前令牌耗时
@CanIgnoreReturnValue
public double acquire() {
return acquire(1);
}
acquire()返回值是一个获取到令牌的耗时,这可以借鉴下这种设计
/**
* Acquires the given number of permits from this {@code RateLimiter}, blocking until the request
* can be granted. Tells the amount of time slept, if any.
*
* @param permits the number of permits to acquire
* @return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited
* @throws IllegalArgumentException if the requested number of permits is negative or zero
* @since 16.0 (present in 13.0 with {@code void} return type})
*/
@CanIgnoreReturnValue
public double acquire(int permits) {
// 获取得到令牌需要等待的时间
long microsToWait = reserve(permits);
// 线程等待指定时长
stopwatch.sleepMicrosUninterruptibly(microsToWait);
// 计算出此次获取令牌等待时间并且返回
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
首先计算出获取到令牌需要等待的时间:reverse()
/**
* Reserves the given number of permits from this {@code RateLimiter} for future use, returning
* the number of microseconds until the reservation can be consumed.
*
* @return time in microseconds to wait until the resource can be acquired, never negative
*/
final long reserve(int permits) {
checkPermits(permits);
// 可能存在并发获取锁, 此处加上互斥锁
synchronized (mutex()) {
// 参数一:此次请求想要获取的令牌数
// 参数二:当前时间(作为一个起始点)
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
// Can't be initialized in the constructor because mocks don't call the constructor.
@CheckForNull
private volatile Object mutexDoNotUseDirectly;
// double lock
// 懒加载方式的一个互斥锁, 锁的本质是一个对象
private Object mutex() {
Object mutex = mutexDoNotUseDirectly;
if (mutex == null) {
synchronized (this) {
mutex = mutexDoNotUseDirectly;
if (mutex == null) {
mutexDoNotUseDirectly = mutex = new Object();
}
}
}
return mutex;
}
这里采取的便是基于懒加载的单例模式,使用锁是为了保证并发操作的安全性
/**
* Reserves next ticket and returns the wait time that the caller must wait for.
*
* @return the required wait time, never negative
*/
final long reserveAndGetWaitLength(int permits, long nowMicros) {
// 以当前时间为基准, 计算获取指定令牌需要等待的时间
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
// 返回需要等待的时间
return max(momentAvailable - nowMicros, 0);
}
reverseEarliestAvaliable()是个抽象方法,对于 SmoothBursty和 SmoothyWarmUp具体实现不同
对于 SmoothBursty:
/**
* // 计算获取指定令牌需要等待的时间
* @param requiredPermits
* @param nowMicros
* @return
*/
// 参数一:需要获取的令牌数
// 参数二:当前基准时间
// 总结下这里做的事情:刷新桶内令牌(增加令牌数)以及下一次获取令牌的时间, 计算预支付令牌需要额外等待的时间, 然后更新桶内令牌数以及下一次获取令牌的时间
// 这里体现了 RateLimiter的特性:允许预支付令牌
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 刷新令牌与下一次请求获取令牌的预估时间(对于后者, 其实就是本次)
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
// 比较, 得到此次请求可以得到的最大许可数
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// freshPermits - 需要预先支付的令牌
// 可能存在某个时刻突然涌现大量的请求, 此时会先去支付一定的令牌数
double freshPermits = requiredPermits - storedPermitsToSpend;
// waitMicros - 产生预先支付一定数量令牌的时间
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
// 更新下一次获取令牌的时间, 这里实际上便体现了预支付的设计落地
// 比如桶内有 10个令牌, 但此次请求想要获取的是 20个令牌, rateLimiter实现是先去支付 10个令牌, 然后这十个令牌的产出时间由下一个线程来去承担着!
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// 更新桶内令牌数
this.storedPermits -= storedPermitsToSpend;
// 返回旧的 nectFreeTicketMicros, 无需为预支付的令牌增加等待时间
return returnValue;
}
上述做了三件事:刷新桶内令牌(增加令牌数)以及下一次获取令牌的时间, 计算预支付令牌需要额外等待的时间, 然后更新桶内令牌数以及下一次获取令牌的时间
resync()做的便是刷新操作
/** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
// 当前时间如果晚于预计下一次请求(其实便是此次)获取令牌时间, 此时去刷新令牌数以及下一次请求获取令牌的预估时间
if (nowMicros > nextFreeTicketMicros) {
// 当前时间减去 nextFreeTicketMicros, 获取间隔段, 然后 / coolDownIntervalMicros, 获取这段时间内需要往桶内添加的令牌数
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
// 更新桶内令牌数的同时确保令牌数不会溢出
storedPermits = min(maxPermits, storedPermits + newPermits);
// 更新当前时间便是下一次(其实也就是此次)的获取令牌时间
nextFreeTicketMicros = nowMicros;
}
// 如果 nowMicros < nextFreeTicketMicros, 那么线程需要等待到预计的下一次令牌获取时间, 而这时间间隔由下一个请求处理线程去等待
}
在上述我们观察到一个方法:coolDownIntervalMicros(),其返回的便是冷却期间获取令牌需要等待的时间,而其实现这两个限流器也不不同,而平滑突发限流器返回的便是:
/**
* Returns the number of microseconds during cool down that we have to wait to get a new permit.
*/
// 返回冷却期间获取新许可需要等待的毫秒数
// 平滑突发限流与平滑预热限流的时间不同
abstract double coolDownIntervalMicros();
# SmoothBursty
@Override
double coolDownIntervalMicros() {
// 对于平滑突发限流器, 返回的便是获取令牌的间隔时间
return stableIntervalMicros;
}
而在 reserveEarliestAvailable()方法实现上,充分也体现了 RateLimiter的特性:支持预先支付一定数量的令牌,而对于突发流量的处理是比较友好的,其次支付令牌付出的代价(等待时长)由下一个处理请求的线程去承担,当前线程不去承担任何代价
将视角转回到 acquire(),获取到需要等待的时长后,接下来便是去进行等待,调用了 sleepMicrosUninterruptibly(),然后将最终等待的时长返回去,此时令牌便获取成功,然后去做自己关心的操作即可
SmoothWarmUp
预热限流器
所谓预热,其实便是平滑上升的一个过程,速率由慢转快,然后到达我们所设定的固定速率
实现预热缓冲的关键在于其发放令牌的速率会随着时间和存储的令牌数目改变,有点自适应的味道
当存储令牌数减小至令牌阈值 thresholdPermits时,生成令牌的时间间隔会减少依赖实现快速增加令牌的需要,反之随着存储令牌数的增多,生成令牌数需要间隔也会越来越长,这实际上便是平滑预热限流器的实现思想(想一想其实也挺有道理,令牌数的快速减少可以通过令牌阈值来去判断,如果是的话则说明当前生成令牌的速率小于令牌流失的速率,此时要做的便是剪短令牌生成的速率)
看个简单 case:
/**
* 预热限流器 - 平滑上升, 然后达到指定固定速率的限流器
* 此次获取令牌耗时:0.0
* 此次获取令牌耗时:1.311546
* 此次获取令牌耗时:0.996465
* 此次获取令牌耗时:0.66343
* 此次获取令牌耗时:0.497813
* 总耗时:3503
*/
@Test
public void testForWarmUpAcquire() {
// 预热限流器
RateLimiter rateLimiter = RateLimiter.create(2, 3L, TimeUnit.SECONDS);
Stopwatch stopwatch = Stopwatch.createStarted();
Long cnt = 0L;
while (true) {
System.out.println(StrUtil.format("此次获取令牌耗时:{}", rateLimiter.acquire()));
cnt ++;
if (cnt == 10) {
System.out.println("总耗时:" + stopwatch.elapsed(TimeUnit.MILLISECONDS));
return;
}
}
}
关注于此方法:
/**
* <pre>
* ^ throttling
* |
* cold + /
* interval | /.
* | / .
* | / . ← "warmup period" is the area of the trapezoid between
* | / . thresholdPermits and maxPermits
* | / .
* | / .
* | / .
* stable +----------/ WARM .
* interval | . UP .
* | . PERIOD.
* | . .
* 0 +----------+-------+--------------→ storedPermits
* 0 thresholdPermits maxPermits
*/
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
// 获取当前桶中超出阈值的令牌数
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
// 超出了阈值
if (availablePermitsAboveThreshold > 0.0) {
// permitsAboveThresholdToTake: 超出阈值令牌数其实便是请求最大可获取的令牌数目
/**
* 下面去计算的其实便是梯形的面积
*/
// 其实便是梯形的高
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// TODO(cpovirk): Figure out a good name for this variable.
// permitsToTime(availablePermitsAboveThreshold):计算出梯形较长的边
// permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake):计算出梯形较短的 边
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
// 这里来计算出梯形的面积
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
// 减去已经获取的阈值右侧的令牌数
permitsToTake -= permitsAboveThresholdToTake;
}
// measuring the integral on the left part of the function (the horizontal line)
// 此时可能桶内令牌数可能已经刚好是阈值了, 然此次请求的令牌数还未完全满足, 需要以平稳时期 stable interval计算出获取令牌仍需要等待的时长
micros += (long) (stableIntervalMicros * permitsToTake);
// 返回需要等待耗时
return micros;
}
最后,需要注意一点:RateLimiter适用于单机的限流,如果想要分布式下集群的限流的话,可以去参考下 Sentinel中间件的实现方案(滑动窗口限流)