Skip to content

Commit

Permalink
fix rate limit issue
Browse files Browse the repository at this point in the history
  • Loading branch information
chenby committed Feb 21, 2024
1 parent 258dcfc commit ee85942
Showing 1 changed file with 16 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package com.moilioncircle.redis.replicator.io;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.lang.System.currentTimeMillis;

import java.io.IOException;
import java.io.InputStream;

import static java.lang.System.currentTimeMillis;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Leon Chen
Expand All @@ -32,7 +32,8 @@ public class RateLimitInputStream extends InputStream {
private static final Logger logger = LoggerFactory.getLogger(RateLimitInputStream.class);

private static final int DEFAULT_PERMITS = 100 * 1024 * 1000; // 97.65MB/sec


private final int unit;
private final int permits;
private RateLimiter limiter;
private final InputStream in;
Expand All @@ -48,6 +49,8 @@ public RateLimitInputStream(InputStream in, int permits) {

this.in = in;
this.permits = permits;
// permits per ms
this.unit = this.permits / 1000;
this.limiter = new TokenBucketRateLimiter(this.permits);
}

Expand All @@ -66,7 +69,7 @@ public int read(byte[] b) throws IOException {
public int read(byte[] b, int offset, int length) throws IOException {
int total = length, index = offset;
while (total > 0) {
int len = Math.min(permits, total);
int len = Math.min(unit, total);
limiter.acquire(len);
int r = in.read(b, index, len);
index += r;
Expand All @@ -82,7 +85,7 @@ public int read(byte[] b, int offset, int length) throws IOException {
public long skip(long length) throws IOException {
long total = length;
while (total > 0) {
int skip = (int) Math.min(permits, total);
int skip = (int) Math.min(unit, total);
limiter.acquire(skip);
long r = in.skip(skip);
total -= r;
Expand All @@ -109,17 +112,14 @@ private interface RateLimiter {
private class TokenBucketRateLimiter implements RateLimiter {

private long access;
private long borrow;
private long permits;
private final long size;
private final double sleep;

private TokenBucketRateLimiter(int permits) {
this.access = currentTimeMillis();
this.size = this.permits = permits;
this.sleep = 1 * this.size / 1000d;
}

@Override
public void acquire(long permits) {
try {
Expand All @@ -131,17 +131,13 @@ public void acquire(long permits) {
this.permits -= permits;
return;
}
double r = permits / sleep;
if (r < 1) {
this.borrow += permits;
while (this.borrow >= sleep) {
Thread.sleep(1);
this.borrow -= sleep;
}
return;
} else {
Thread.sleep((int) r);
long x = permits / unit;
long y = permits % unit;
if (y != 0) {
x += 1;
this.permits -= (unit - y);
}
Thread.sleep(x);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down

0 comments on commit ee85942

Please sign in to comment.