Events are the core of Redis server, reponsible for dealing with two important tasks:

  • File events: receive the requests from multiple clients, process and responde to clients.
  • Time events: server cron job.

In this post, we will take a look at multiplexing event handling processing requests from clients. Clients connect to Redis server via sockets, but the server interacts with these clients only when the sockets can be read or written without blocking.

The code analysed in this post is based on 5.0.0

Data Structures

redisServer , global state of Redis server, has a lot of fields including

  • el with type aeEventLoop
  • clients, a list of active clients

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
    // server.c
    struct redisServer server; /* Server global state */
    
    // server.h
    struct redisServer {
    // ...
    aeEventLoop *el;
    list *clients; 
    // ...
    }

aeEventLoop is defined in ae.h, a simple event-driven programming library.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// ae.h

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
} aeEventLoop

And the client,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
struct client {
    int fd; 
    sds querybuf; 
    int argc;
    robj **argv;
    redisDb *db;
    int flags;
    list *reply;
    char buf[PROTO_REPLY_CHUNK_BYTES];
    // ....
}

The client structure defines a connected client, referencing from Redis Internals.

  • The fd field is the client socket file descriptor.
  • argc and argv are populated with the command the client is executing, so that functions implementing a given Redis command can read the arguments.
  • querybuf accumulates the requests from the client, which are parsed by the Redis server according to the Redis protocol and executed by calling the implementations of the commands the client is executing.
  • reply and buf are dynamic and static buffers that accumulate the replies the server sends to the client. These buffers are incrementally written to the socket as soon as the file descriptor is writable.

Create Event Loop

initServer in main method creates

  1. a event loop for server.el
  2. a timer callback serverCron for time events
  3. a event handler acceptTcpHandler for accepting TCP connections, wating for readable events

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    
    // server.c
    
    int main(int argc, char **argv) {
    // ...
    initServer();
    //...
    
    aeMain(server.el)
    // ...
    }
    
    void initServer(void) {
    // ...
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    if (server.el == NULL) {
        serverLog(LL_WARNING,
            "Failed creating the event loop. Error message: '%s'",
            strerror(errno));
        exit(1);
    }
    
    /* Create the timer callback - serverCron, which is called periodically and performs tasks from time to time, like checking timeout, eviction of expired keysand so forth. */
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
    
    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
    }

Let’s see the implementation of aeCreateEventLoop allocating memory and initializing for event loop.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// ae.c

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    // allocate memory for event loop
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;

    // initializaiton
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

aeCreateTimeEvent and aeCreateFileEvent both register a processor for time/file events, take aeCreateFileEvent for example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// ae.c

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;

    // Register processor for readble and writable events
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

Process Events

aeMain() starts the event loop which listens for new connections.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// ae.c

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

The call stack is

main -> aeMain -> aeProcessEvents -> acceptTcpHandler -> acceptCommonHandler

aeProcessEvents separates mechanism and strategy with the second parameter flags processing every pending time event and then every pending file event.

  1. Get the most recent time event to be processed from timeEventHead of aeEventLoop
  2. Calculate the time difference of events that have expired in the current time event based on the time of the most recent event to be processed
  3. Get the current file event to be processed by numevents = aeApiPoll(eventLoop, tvp)
  4. Process the file event based on the events acquired in time calling the acceptTcpHandler registered by aeCreateFileEvent.
  5. Process time event

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    
    // ae.c
    
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
    // ...
    
    for (j = 0; j < numevents; j++) {
        aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
        int mask = eventLoop.fired[j].mask;
        int fd = eventLoop.fired[j].fd;
    
        int invert = fe->mask & AE_BARRIER;
    
        /* Fire the readable event if the call sequence is not
        * inverted. */
        if (!invert && fe->mask & mask & AE_READABLE) {
            fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            fired++;
        }
    
        /* Fire the writable event. */
        if (fe->mask & mask & AE_WRITABLE) {
            if (!fired || fe->wfileProc != fe->rfileProc) {
                fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
        }
    
        /* If we have to invert the call, fire the readable event now
        * after the writable one. */
        if (invert && fe->mask & mask & AE_READABLE) {
            if (!fired || fe->wfileProc != fe->rfileProc) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
        }
    }
    
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    }

It’s not the debut of acceptTcpHandler, which locates in network.c.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// network.c

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        // Accept TCP requests
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);

        // Constructs a client object and inserts it into asEventLoop
        acceptCommonHandler(cfd,0,cip);
    }
}

acceptCommonHandler calls createClient which constructs a client object and registers readQueryFromClient to aeEventLoop.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));

    /* passing -1 as fd it is possible to create a non connected client.
     * This is useful since all the commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    if (fd != -1) {
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);

        // Register readQueryFromClient for readable events
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
    
    // ...
}

Read Requests

networking.c defines all the I/O functions with clients, masters and replicas (which in Redis are just special clients):

  • createClient() allocates and initializes a new client.
  • the addReply*() family of functions are used by commands implementations in order to append data to the client structure, that will be transmitted to the client as a reply for a given command executed.
  • writeToClient() transmits the data pending in the output buffers to the client and is called by the writable event handler sendReplyToClient().
  • readQueryFromClient() is the readable event handler and accumulates data from read from the client into the query buffer.
  • processInputBuffer() is the entry point in order to parse the client query buffer according to the Redis protocol. Once commands are ready to be processed, it calls processCommand() which is defined inside server.c in order to actually execute the command.
  • freeClient() deallocates, disconnects and removes a client.

The call stack of reading and processing commands from client is

readQueryFromClient -> processInputBufferAndReplicate -> processInputBuffer -> processCommand

readQueryFromClient reads requests from socket and writes to the querybuf of client.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// network.c

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) 
    client *c = (client*) privdata;

    //...
    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = read(fd, c->querybuf+qblen, readlen);

    // Time to process the buffer
    processInputBufferAndReplicate(c);
}

processInputBuffer is called every time in the client structure when there is more query buffer to process. This chinese post(Redis 启动流程:命令注册和执行) provides more details about this function.

Reference