Atomicity with Gears

Improving atomicity and performance with RedisGears#

What is RedisGears?#

RedisGears is a dynamic server-side data processing engine, where the "server" part is Redis itself. RedisGears is distributed as a Redis module. You can start a Redis instance preconfigured with Gears using the official Docker image:

docker run -p 6379:6379 redislabs/redisgears:latest

Or, as I do most of the time, using the "redismod" image which include Gears and all the other Redis, Inc. supported modules:

docker run -p 6379:6379 redislabs/redismod

RedisGears was built with the purpose of providing a data processing engine inside of Redis, with more formal semantics than the simpler Lua server-side scripting. Think of it as a more flexible map-reduce engine for Redis. It supports supports transaction, batch, and event-driven processing of Redis data. Gears allow you to localize computation and data provides a built-in coordinator to facilitate processing distributed data in a clustered environment.

In RedisGears, the main unit of processing is the RedisGears function, which can be (currently) written in Python (more languages are being worked on). These functions run on their own thread, separate from Redis' main thread and can be executed in response to keyspace events or imperatively as a result of external commands. The functions are "registered" (deployed) to the Gears engine, and have an associated name and a registration Id.

During registration we pick a specific reader for our function which defines how the function gets its initial data:

  • KeysReader: Redis keys and values.
  • KeysOnlyReader: Redis keys.
  • StreamReader: Redis Stream messages.
  • PythonReader: Arbitrary Python generator.
  • ShardsIDReader: Shard ID.
  • CommandReader: Command arguments from application client.

A Rate-Limiting RedisGears Function#

Depending on the reader type, Gear Functions can either be run immediately, on demand, as batch jobs or in an event-driven manner by registering it to trigger automatically on various types of events.

The Python function rate_limit takes 3 parameters:

  • key: The Redis key backing the counter for a given user.
  • max_request: The request quota for the user.
  • expiry: The number of seconds in the future to set the counter TTL.
def rate_limit(key, max_requests, expiry):
requests = execute('GET', key)
requests = int(requests) if requests else -1
max_requests = int(max_requests)
expiry = int(expiry)
if (requests == -1) or (requests < max_requests):
with atomic():
execute('INCR', key)
execute('EXPIRE', key, expiry)
return False
else:
return True
# Function registration
gb = GB('CommandReader')
gb.map(lambda x: rate_limit(x[1], x[2], x[3]))
gb.register(trigger='RateLimiter')

Place the script under src/main/resources/scripts. Now, Let's break it down:

The rate_limit function#

Similarly to what we did in the previous implementation, we:

  1. Retrieve the current number of requests for the passed key by execute-ing the GET command.
  2. Cast the result to an int and if not found, default to -1
  3. Cast max_requests and expiry to int
  4. If the quota hasn't been exceeded, perform the INCR/EXPIRE commands in a transactions (with atomic():) and return False (no rate limiting - request is allowed)
  5. Otherwise, return True (deny the request)

Function Registration#

  1. At the bottom of the script, in the # Function registration section, we instantiate the GearsBuilder(GB) using the CommandReader reader. The GearsBuilder "builds" the context of the function, in parameters, transformations, triggers, etc.
  2. We use the map method to performs a one-to-one mapping of records to the params of the rate_limit function via a mapper function callback.
  3. We can now invoke the register action to register the function as an event handler. The event in our case is the trigger 'RateLimiter'.

RedisGears in SpringBoot#

In order to use our RedisGear function from our SpringBoot application we need to do a few things:

  1. Deploy the function to the Redis server
  2. Execute the function to get a yay/nay answer on each request

Lettuce Mod#

LettuceMod is a Java client for Redis Modules based on Lettuce created by Julien Ruaux . It supports the following modules in standalone or cluster configurations:

  • RedisGears
  • RedisJSON
  • RediSearch
  • RedisTimeSeries

To use LettuceMod we'll add the dependency to the Maven POM as shown:

<dependency>
<groupId>com.redis</groupId>
<artifactId>spring-lettucemod</artifactId>
<version>1.7.0</version>
</dependency>

Accessing Gears Commands in SpringBoot#

To access any of the LettuceMod supported modules we will inject a StatefulRedisModulesConnection in our FixedWindowRateLimiterApplication class as follows:

@Autowired
StatefulRedisModulesConnection<String, String> connection;

Add the matching import statement:

import com.redis.lettucemod.api.StatefulRedisModulesConnection;

Registering the Gears function#

We'll start by writing a function to determine whether the function with the trigger RateLimiter has been registered. It takes a List of Registrations and digs deep to extract the value of the trigger argument using the Java Streams API:

private Optional<String> getGearsRegistrationIdForTrigger(List<Registration> registrations, String trigger) {
return registrations.stream().filter(r -> r.getData().getArgs().get("trigger").equals(trigger)).findFirst().map(Registration::getId);
}

In the @PostConstruct annotated method loadGearsScript method:

  1. We retrieve an instance of the RedisGearsCommands from the previously injected StatefulRedisModulesConnection
  2. We get the currently registered Gears functions via the dumpregistrations method
  3. We pass the list of registrations to our getGearsRegistrationIdForTrigger
  4. If we don't find the registration we proceed to register the function:
    • Load the function from the classpath into a String named py
    • Use the pyexecute method passing the py script payload
@PostConstruct
public void loadGearsScript() throws IOException {
String py = StreamUtils.copyToString(new ClassPathResource("scripts/rateLimiter.py").getInputStream(),
Charset.defaultCharset());
RedisGearsCommands<String, String> gears = connection.sync();
List<Registration> registrations = gears.dumpregistrations();
Optional<String> maybeRegistrationId = getGearsRegistrationIdForTrigger(registrations, "RateLimiter");
if (maybeRegistrationId.isEmpty()) {
try {
ExecutionResults er = gears.pyexecute(py);
if (er.isOk()) {
logger.info("RateLimiter.py has been registered");
} else if (er.isError()) {
logger.error(String.format("Could not register RateLimiter.py -> %s", Arrays.toString(er.getErrors().toArray())));
}
} catch (RedisCommandExecutionException rcee) {
logger.error(String.format("Could not register RateLimiter.py -> %s", rcee.getMessage()));
}
} else {
logger.info("RateLimiter.py has already been registered");
}
}

Modifying the Filter to use the Gears function#

Next, we'll modify the filter to include the StatefulRedisModulesConnection as well as the quota; the value that we need to pass to the function:

class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {
private StatefulRedisModulesConnection<String, String> connection;
private Long maxRequestPerMinute;
public RateLimiterHandlerFilterFunction(StatefulRedisModulesConnection<String, String> connection,
Long maxRequestPerMinute) {
this.connection = connection;
this.maxRequestPerMinute = maxRequestPerMinute;
}

Now we can modify the filter method to use the function. Gears functions are invoked by triggering the correct event RateLimiter and passing the parameters required by the function; the key, the quota and the TTL seconds in the future.

As we've have done previously, if the function returns false we let the request through, otherwise we return an HTTP 429:

@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
int currentMinute = LocalTime.now().getMinute();
String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);
RedisGearsCommands<String, String> gears = connection.sync();
List<Object> results = gears.trigger("RateLimiter", key, Long.toString(maxRequestPerMinute), "59");
if (!results.isEmpty() && !Boolean.parseBoolean((String) results.get(0))) {
return next.handle(request);
} else {
return ServerResponse.status(TOO_MANY_REQUESTS).build();
}
}

Testing with curl#

Once again, we use curl loop to test the limiter:

for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done

You should see the 21st request being rejected:

โžœ for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done
PONG :: HTTP 200, 4 bytes, 0.064786 s
PONG :: HTTP 200, 4 bytes, 0.009926 s
PONG :: HTTP 200, 4 bytes, 0.009546 s
PONG :: HTTP 200, 4 bytes, 0.010189 s
PONG :: HTTP 200, 4 bytes, 0.009399 s
PONG :: HTTP 200, 4 bytes, 0.009210 s
PONG :: HTTP 200, 4 bytes, 0.008333 s
PONG :: HTTP 200, 4 bytes, 0.008009 s
PONG :: HTTP 200, 4 bytes, 0.008919 s
PONG :: HTTP 200, 4 bytes, 0.009271 s
PONG :: HTTP 200, 4 bytes, 0.007515 s
PONG :: HTTP 200, 4 bytes, 0.007057 s
PONG :: HTTP 200, 4 bytes, 0.008373 s
PONG :: HTTP 200, 4 bytes, 0.007573 s
PONG :: HTTP 200, 4 bytes, 0.008209 s
PONG :: HTTP 200, 4 bytes, 0.009080 s
PONG :: HTTP 200, 4 bytes, 0.007595 s
PONG :: HTTP 200, 4 bytes, 0.007955 s
PONG :: HTTP 200, 4 bytes, 0.007693 s
PONG :: HTTP 200, 4 bytes, 0.008743 s
:: HTTP 429, 0 bytes, 0.007226 s
:: HTTP 429, 0 bytes, 0.007388 s

If we run Redis in monitor mode, we should see the Lua calls to RG.TRIGGER and under that you should see the calls to GET, INCR and EXPIRE for allowed requests:

1631249244.006212 [0 172.17.0.1:56036] "RG.TRIGGER" "RateLimiter" "rl_localhost:47" "20" "59"
1631249244.006995 [0 ?:0] "GET" "rl_localhost:47"
1631249244.007182 [0 ?:0] "INCR" "rl_localhost:47"
1631249244.007269 [0 ?:0] "EXPIRE" "rl_localhost:47" "59"

And for rate limited request you should see only the call to GET:

1631249244.538478 [0 172.17.0.1:56036] "RG.TRIGGER" "RateLimiter" "rl_localhost:47" "20" "59"
1631249244.538809 [0 ?:0] "GET" "rl_localhost:47"

The complete code for this implementation is under the branch with_gears.