常见流控算法及Java实现

现在主要的工作是处理一些中间件,所以流控必然是需要去考虑的东西。

流控更专业的叫法是:流量整形(traffic shaping),典型作用是限制流出某一网络的某一连接的流量与突发,使这类报文以比较均匀的速度向外发送。

常见算法

通常的做法就是通过建立一个缓存区或是令牌桶来实现。更具体的算法是:漏斗算法和桶令牌算法。

漏斗算法就是有一个斗:数据往这个斗中流入,然后开一口,以一定的速度将这个斗中的数据流出,不支持任持续突发和最大突发,至于这个斗满了如何处理再说。
桶令牌算法:一个存放令牌的桶,以一定的速度往这个桶生成令牌,数据流出先从这个桶中拿令牌,若是拿不到令牌就另行处理(具体自己设定)。

桶令牌跟漏斗最大的区别在于可以支撑一个突然的流量变化,就是满桶令牌数的峰值。
具体代码


package com.netease.datastream.util.flowcontrol;

import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.base.Preconditions;
import com.netease.datastream.util.framework.LifeCycle;

/**
 * <pre>
 * Created by inter12 on 15-3-18.
 * </pre>
 */
public class TokenBucket implements LifeCycle {

// 默认桶大小个数 即最大瞬间流量是64M
 private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;

// 一个桶的单位是1字节
 private int everyTokenSize = 1;

// 瞬间最大流量
 private int maxFlowRate;

// 平均流量
 private int avgFlowRate;

// 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64
 private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);

private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

private volatile boolean isStart = false;

private ReentrantLock lock = new ReentrantLock(true);

private static final byte A_CHAR = 'a';

public TokenBucket() {
 }

public TokenBucket(int maxFlowRate, int avgFlowRate) {
 this.maxFlowRate = maxFlowRate;
 this.avgFlowRate = avgFlowRate;
 }

public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
 this.everyTokenSize = everyTokenSize;
 this.maxFlowRate = maxFlowRate;
 this.avgFlowRate = avgFlowRate;
 }

public void addTokens(Integer tokenNum) {

// 若是桶已经满了,就不再家如新的令牌
 for (int i = 0; i < tokenNum; i++) {
 tokenQueue.offer(Byte.valueOf(A_CHAR));
 }
 }

public TokenBucket build() {

start();
 return this;
 }

/**
 * 获取足够的令牌个数
 *
 * @return
 */
 public boolean getTokens(byte[] dataSize) {

Preconditions.checkNotNull(dataSize);
 Preconditions.checkArgument(isStart, "please invoke start method first !");

int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数

final ReentrantLock lock = this.lock;
 lock.lock();
 try {
 boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量
 if (!result) {
 return false;
 }

int tokenCount = 0;
 for (int i = 0; i < needTokenNum; i++) {
 Byte poll = tokenQueue.poll();
 if (poll != null) {
 tokenCount++;
 }
 }

return tokenCount == needTokenNum;
 } finally {
 lock.unlock();
 }
 }

@Override
 public void start() {

// 初始化桶队列大小
 if (maxFlowRate != 0) {
 tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
 }

// 初始化令牌生产者
 TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
 scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
 isStart = true;

}

@Override
 public void stop() {
 isStart = false;
 scheduledExecutorService.shutdown();
 }

@Override
 public boolean isStarted() {
 return isStart;
 }

class TokenProducer implements Runnable {

private int avgFlowRate;
 private TokenBucket tokenBucket;

public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
 this.avgFlowRate = avgFlowRate;
 this.tokenBucket = tokenBucket;
 }

@Override
 public void run() {
 tokenBucket.addTokens(avgFlowRate);
 }
 }

public static TokenBucket newBuilder() {
 return new TokenBucket();
 }

public TokenBucket everyTokenSize(int everyTokenSize) {
 this.everyTokenSize = everyTokenSize;
 return this;
 }

public TokenBucket maxFlowRate(int maxFlowRate) {
 this.maxFlowRate = maxFlowRate;
 return this;
 }

public TokenBucket avgFlowRate(int avgFlowRate) {
 this.avgFlowRate = avgFlowRate;
 return this;
 }

private String stringCopy(String data, int copyNum) {

StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);

for (int i = 0; i < copyNum; i++) {
 sbuilder.append(data);
 }

return sbuilder.toString();

}

public static void main(String[] args) throws IOException, InterruptedException {

tokenTest();
 }

private static void arrayTest() {
 ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);
 tokenQueue.offer(1);
 tokenQueue.offer(1);
 tokenQueue.offer(1);
 System.out.println(tokenQueue.size());
 System.out.println(tokenQueue.remainingCapacity());
 }

private static void tokenTest() throws InterruptedException, IOException {
 TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();

BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test")));
 String data = "xxxx";// 四个字节
 for (int i = 1; i <= 1000; i++) {

Random random = new Random();
 int i1 = random.nextInt(100);
 boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
 TimeUnit.MILLISECONDS.sleep(100);
 if (tokens) {
 bufferedWriter.write("token pass --- index:" + i1);
 System.out.println("token pass --- index:" + i1);
 } else {
 bufferedWriter.write("token rejuect --- index" + i1);
 System.out.println("token rejuect --- index" + i1);
 }

bufferedWriter.newLine();
 bufferedWriter.flush();
 }

bufferedWriter.close();
 }

}

后来查了资料:google的guava也有一个类似的流控类:RateLimiter。不过这个不是基于流量的控制,更多是速度的控制,有点像TPS。
RateLimiter的具体使用:http://java.dzone.com/articles/ratelimiter-discovering-google
参考资料:
http://7658423.blog.51cto.com/7648423/1576118
http://colobu.com/2014/11/13/rate-limiting/
http://baike.baidu.com/view/2530454.htm

作者: inter12

在这苦短的人生中,追求点自己的简单快乐

发表评论

电子邮件地址不会被公开。 必填项已用*标注