com.redis.rl
fixed-window-rate-limiter
fixed-window-rate-limiter
com.redis.rl
cd ~/my-directory
unzip ~/Downloads/fixed-window-rate-limiter.zip
package com.redis.rl;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class FixedWindowRateLimiterApplication {
public static void main(String[] args) {
SpringApplication.run(FixedWindowRateLimiterApplication.class, args);
}
}
@Bean
RouterFunction<ServerResponse> routes() {
return route() //
.GET("/api/ping", r -> ok() //
.contentType(TEXT_PLAIN) //
.body(BodyInserters.fromValue("PONG")) //
).build();
}
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
import static org.springframework.http.MediaType.TEXT_PLAIN;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.BodyInserters;
/>curl http://localhost:8080/api/ping
PONG
class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
// TODO Auto-generated method stub
return null;
}
}
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
return next.handle(request);
}
@Bean
RouterFunction<ServerResponse> routes() {
return route() //
.GET("/api/ping", r -> ok() //
.contentType(TEXT_PLAIN) //
.body(BodyInserters.fromValue("PONG")) //
).filter(new RateLimiterHandlerFilterFunction()).build();
}
@Bean
ReactiveRedisTemplate<String, Long> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
JdkSerializationRedisSerializer jdkSerializationRedisSerializer = new JdkSerializationRedisSerializer();
StringRedisSerializer stringRedisSerializer = StringRedisSerializer.UTF_8;
GenericToStringSerializer<Long> longToStringSerializer = new GenericToStringSerializer<>(Long.class);
ReactiveRedisTemplate<String, Long> template = new ReactiveRedisTemplate<>(factory,
RedisSerializationContext.<String, Long>newSerializationContext(jdkSerializationRedisSerializer)
.key(stringRedisSerializer).value(longToStringSerializer).build());
return template;
}
@Autowired
private ReactiveRedisTemplate<String, Long> redisTemplate;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.beans.factory.annotation.Autowired;
class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {
private ReactiveRedisTemplate<String, Long> redisTemplate;
public RateLimiterHandlerFilterFunction(ReactiveRedisTemplate<String, Long> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Bean
RouterFunction<ServerResponse> routes() {
return route() //
.GET("/api/ping", r -> ok() //
.contentType(TEXT_PLAIN) //
.body(BodyInserters.fromValue("PONG")) //
).filter(new RateLimiterHandlerFilterFunction(redisTemplate)).build();
}
private String requestAddress(Optional<InetSocketAddress> maybeAddress) {
return maybeAddress.isPresent() ? maybeAddress.get().getHostName() : "";
}
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
int currentMinute = LocalTime.now().getMinute();
String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);
System.out.println(">>>> key " + key);
return next.handle(request);
}
>>>> key rl_localhost:34
MULTI
INCR [user-api-key]:[current minute number]
EXPIRE [user-api-key]:[current minute number] 59
EXEC
private Mono<ServerResponse> incrAndExpireKey(String key, ServerRequest request,
HandlerFunction<ServerResponse> next) {
return redisTemplate.execute(new ReactiveRedisCallback<List<Object>>() {
@Override
public Publisher<List<Object>> doInRedis(ReactiveRedisConnection connection) throws DataAccessException {
ByteBuffer bbKey = ByteBuffer.wrap(key.getBytes());
return Mono.zip( //
connection.numberCommands().incr(bbKey), //
connection.keyCommands().expire(bbKey, Duration.ofSeconds(59L)) //
).then(Mono.empty());
}
}).then(next.handle(request));
}
Mono
(a specialized Publisher<T>
that emits at most one item in this case a ServerResponse
)key
, the original server request
and next
handler functiondoInRedis
method we turn the key into a ByteBuffer
in order to use with ReactiveRedisConnection
commandsINCR
and EXPIRE
commands.Mono.empty()
@Value("${MAX_REQUESTS_PER_MINUTE}")
private static Long MAX_REQUESTS_PER_MINUTE = 20L;
opsForValue()
to retrieve the value stored under the calculated key.incrementAndExpireKey
incrementAndExpireKey
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
int currentMinute = LocalTime.now().getMinute();
String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);
return redisTemplate //
.opsForValue().get(key) //
.flatMap( //
value -> value >= MAX_REQUESTS_PER_MINUTE ? //
ServerResponse.status(TOO_MANY_REQUESTS).build() : //
incrAndExpireKey(key, request, next) //
).switchIfEmpty(incrAndExpireKey(key, request, next));
}
for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done
➜ for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done
PONG :: HTTP 200, 4 bytes, 0.393156 s
PONG :: HTTP 200, 4 bytes, 0.019530 s
PONG :: HTTP 200, 4 bytes, 0.023677 s
PONG :: HTTP 200, 4 bytes, 0.019922 s
PONG :: HTTP 200, 4 bytes, 0.025573 s
PONG :: HTTP 200, 4 bytes, 0.018916 s
PONG :: HTTP 200, 4 bytes, 0.019548 s
PONG :: HTTP 200, 4 bytes, 0.018335 s
PONG :: HTTP 200, 4 bytes, 0.010105 s
PONG :: HTTP 200, 4 bytes, 0.008416 s
PONG :: HTTP 200, 4 bytes, 0.009829 s
PONG :: HTTP 200, 4 bytes, 0.011766 s
PONG :: HTTP 200, 4 bytes, 0.010809 s
PONG :: HTTP 200, 4 bytes, 0.015483 s
PONG :: HTTP 200, 4 bytes, 0.009732 s
PONG :: HTTP 200, 4 bytes, 0.009970 s
PONG :: HTTP 200, 4 bytes, 0.008696 s
PONG :: HTTP 200, 4 bytes, 0.009176 s
PONG :: HTTP 200, 4 bytes, 0.009678 s
PONG :: HTTP 200, 4 bytes, 0.012497 s
:: HTTP 429, 0 bytes, 0.010071 s
:: HTTP 429, 0 bytes, 0.006625 s
1630366639.188290 [0 172.17.0.1:65016] "GET" "rl_localhost:37"
1630366639.200956 [0 172.17.0.1:65016] "INCR" "rl_localhost:37"
1630366639.202372 [0 172.17.0.1:65016] "EXPIRE" "rl_localhost:37" "59"
...
1630366649.891110 [0 172.17.0.1:65016] "GET" "rl_localhost:37"
1630366650.417131 [0 172.17.0.1:65016] "GET" "rl_localhost:37"