Documentation

Reactive Implementation

Brian Sam-Bodden
Author
Brian Sam-Bodden, Developer Advocate at Redis

A basic Spring Web Flux App#

  1. 1.In your browser, launch the Spring Initilizr
  2. 2.Fill out the form using the following settings:
  • Project: Maven Project
  • Language: Java
  • Spring Boot: 2.5.4
  • Project Metadata:
    • Group: com.redis.rl
    • Artifact: fixed-window-rate-limiter
    • Name: fixed-window-rate-limiter
    • Description: Redis Fixed Window Rate Limiter
    • Package Name: com.redis.rl
    • Packaging: JAR
    • Java: 11
  • Dependencies:
    • Spring Reactive Web
    • String Data Reactive
    • Spring Boot DevTools
cd ~/my-directory
unzip ~/Downloads/fixed-window-rate-limiter.zip
package com.redis.rl;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class FixedWindowRateLimiterApplication {

    public static void main(String[] args) {
        SpringApplication.run(FixedWindowRateLimiterApplication.class, args);
    }

}
@Bean
RouterFunction<ServerResponse> routes() {
  return route() //
      .GET("/api/ping", r -> ok() //
          .contentType(TEXT_PLAIN) //
          .body(BodyInserters.fromValue("PONG")) //
      ).build();
}
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
import static org.springframework.http.MediaType.TEXT_PLAIN;

import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.BodyInserters;
/>curl http://localhost:8080/api/ping
PONG

Spring WebFlux Filters#

class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {

  @Override
  public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
    // TODO Auto-generated method stub
    return null;
  }
}
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
  return next.handle(request);
}
@Bean
RouterFunction<ServerResponse> routes() {
  return route() //
      .GET("/api/ping", r -> ok() //
          .contentType(TEXT_PLAIN) //
          .body(BodyInserters.fromValue("PONG")) //
      ).filter(new RateLimiterHandlerFilterFunction()).build();
}

Reactive Redis Template#

@Bean
ReactiveRedisTemplate<String, Long> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
  JdkSerializationRedisSerializer jdkSerializationRedisSerializer = new JdkSerializationRedisSerializer();
  StringRedisSerializer stringRedisSerializer = StringRedisSerializer.UTF_8;
  GenericToStringSerializer<Long> longToStringSerializer = new GenericToStringSerializer<>(Long.class);
  ReactiveRedisTemplate<String, Long> template = new ReactiveRedisTemplate<>(factory,
      RedisSerializationContext.<String, Long>newSerializationContext(jdkSerializationRedisSerializer)
          .key(stringRedisSerializer).value(longToStringSerializer).build());
  return template;
}

@Autowired
private ReactiveRedisTemplate<String, Long> redisTemplate;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import org.springframework.beans.factory.annotation.Autowired;
class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {

  private ReactiveRedisTemplate<String, Long> redisTemplate;

  public RateLimiterHandlerFilterFunction(ReactiveRedisTemplate<String, Long> redisTemplate) {
    this.redisTemplate = redisTemplate;
  }
@Bean
RouterFunction<ServerResponse> routes() {
  return route() //
      .GET("/api/ping", r -> ok() //
          .contentType(TEXT_PLAIN) //
          .body(BodyInserters.fromValue("PONG")) //
      ).filter(new RateLimiterHandlerFilterFunction(redisTemplate)).build();
}

Identifying the Requester#

private String requestAddress(Optional<InetSocketAddress> maybeAddress) {
  return maybeAddress.isPresent() ? maybeAddress.get().getHostName() : "";
}
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
  int currentMinute = LocalTime.now().getMinute();
  String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);
  System.out.println(">>>> key " + key);

  return next.handle(request);
}
>>>> key rl_localhost:34

Increment and Expire Key#

MULTI
INCR [user-api-key]:[current minute number]
EXPIRE [user-api-key]:[current minute number] 59
EXEC
private Mono<ServerResponse> incrAndExpireKey(String key, ServerRequest request,
  HandlerFunction<ServerResponse> next) {
  return redisTemplate.execute(new ReactiveRedisCallback<List<Object>>() {
    @Override
    public Publisher<List<Object>> doInRedis(ReactiveRedisConnection connection) throws DataAccessException {
      ByteBuffer bbKey = ByteBuffer.wrap(key.getBytes());
      return Mono.zip( //
          connection.numberCommands().incr(bbKey), //
          connection.keyCommands().expire(bbKey, Duration.ofSeconds(59L)) //
      ).then(Mono.empty());
    }
  }).then(next.handle(request));
}
  1. 1.The method returns an Asynchronous (0-1) Result, a Mono (a specialized Publisher<T> that emits at most one item in this case a ServerResponse)
  2. 2.The method takes the calculated rate limiting key, the original server request and next handler function
  3. 3.In the doInRedis method we turn the key into a ByteBuffer in order to use with ReactiveRedisConnection commands
  4. 4.The zip methods waits for all the sources to emit one element and combines these elements into an output value ,which we are ignoring since all we are after is sequential execution on the INCR and EXPIRE commands.
  5. 5.The method then returns a Mono.empty()
  6. 6.Finally handle the request.

Completing the filter implementation#

@Value("${MAX_REQUESTS_PER_MINUTE}")
private static Long MAX_REQUESTS_PER_MINUTE = 20L;
  1. 1.We use the Redis template opsForValue() to retrieve the value stored under the calculated key.
  2. 2.If the value..
    • Is greater than or equal to the max quota we deny the request with a 409 response.
    • Otherwise, call incrementAndExpireKey
  3. 3.
    • Is empty/key not found (first request on this window) we call incrementAndExpireKey
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
  int currentMinute = LocalTime.now().getMinute();
  String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);

  return redisTemplate //
      .opsForValue().get(key) //
      .flatMap( //
          value -> value >= MAX_REQUESTS_PER_MINUTE ? //
              ServerResponse.status(TOO_MANY_REQUESTS).build() : //
              incrAndExpireKey(key, request, next) //
      ).switchIfEmpty(incrAndExpireKey(key, request, next));
}

Testing with curl#

for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done
for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done
PONG :: HTTP 200, 4 bytes, 0.393156 s
PONG :: HTTP 200, 4 bytes, 0.019530 s
PONG :: HTTP 200, 4 bytes, 0.023677 s
PONG :: HTTP 200, 4 bytes, 0.019922 s
PONG :: HTTP 200, 4 bytes, 0.025573 s
PONG :: HTTP 200, 4 bytes, 0.018916 s
PONG :: HTTP 200, 4 bytes, 0.019548 s
PONG :: HTTP 200, 4 bytes, 0.018335 s
PONG :: HTTP 200, 4 bytes, 0.010105 s
PONG :: HTTP 200, 4 bytes, 0.008416 s
PONG :: HTTP 200, 4 bytes, 0.009829 s
PONG :: HTTP 200, 4 bytes, 0.011766 s
PONG :: HTTP 200, 4 bytes, 0.010809 s
PONG :: HTTP 200, 4 bytes, 0.015483 s
PONG :: HTTP 200, 4 bytes, 0.009732 s
PONG :: HTTP 200, 4 bytes, 0.009970 s
PONG :: HTTP 200, 4 bytes, 0.008696 s
PONG :: HTTP 200, 4 bytes, 0.009176 s
PONG :: HTTP 200, 4 bytes, 0.009678 s
PONG :: HTTP 200, 4 bytes, 0.012497 s
:: HTTP 429, 0 bytes, 0.010071 s
:: HTTP 429, 0 bytes, 0.006625 s
1630366639.188290 [0 172.17.0.1:65016] "GET" "rl_localhost:37"
1630366639.200956 [0 172.17.0.1:65016] "INCR" "rl_localhost:37"
1630366639.202372 [0 172.17.0.1:65016] "EXPIRE" "rl_localhost:37" "59"
...
1630366649.891110 [0 172.17.0.1:65016] "GET" "rl_localhost:37"
1630366650.417131 [0 172.17.0.1:65016] "GET" "rl_localhost:37"