Write-ahead logging

Write-ahead logging WAL is one of the most important parts of databases which implements atomicity and durability. The base idea of logging is to write the data to the storage before make this data accessible by others.

We will reference 2.x series of Tarantool source code.

Journal subsystem

The journal subsystem provides the following API calls:

  • journal_create to create a new journal;
  • journal_set to destroy old journal and set a new one;
  • journal_entry_new to create a new journal entry;
  • journal_write to write a journal entry.

There might be only one active journal at a time which is referred via current_journal variable (thus we can create as many journals as we need with journal_create but only one must be activated via journal_set).

The journal_entry_new requires a callback to be associated with the journal entry to be called when entry processing is complete (with success or failure). Since we use journal to process database transactions we pass txn_entry_complete_cb as a callback and a pointer to the transaction itself as a data.

To explain in details how transactions are processed on journal level we need to look at transaction structure (short version to be precise, only with the memebers we’re interesed in).

struct txn {
    /* transaction cache */
    struct stailq_entry in_txn_cache;
    /* memory to keep transaction data */
    struct region region;
    /* transaction ID */
    int64_t id;
    /* LSN assigned by WAL or -1 on error */
    int64_t signature;
    /* A fiber initiated the transaction */
    struct fiber *fiber;
};

A transaction is allocated via txn_new call. The memory allocation is a bit tricky here. We request memory region from the main cord where we allocate the txn itself and may increase memory usage if txn need additional data to carry. On txn_free the additional memory get freed and we put txn to cache for fast reuse.

The txn_entry_complete_cb is basically a wrapper over txn_complete.

WAL thread

Because writting data to the real storage such as hardware drive is a way too slower than plain memory access the storage operations are implemented in a separate thread.

int
wal_init(enum wal_mode wal_mode, const char *wal_dirname,
         int64_t wal_max_size, const struct tt_uuid *instance_uuid,
         wal_on_garbage_collection_f on_garbage_collection,
         wal_on_checkpoint_threshold_f on_checkpoint_threshold)
{
    /* Initialize the state. */
    struct wal_writer *writer = &wal_writer_singleton;
    wal_writer_create(writer, wal_mode, wal_dirname,
                      wal_max_size, instance_uuid,
                      on_garbage_collection,
                      on_checkpoint_threshold);

    /* Start WAL thread. */
    if (cord_costart(&writer->cord, "wal", wal_writer_f, NULL) != 0)
        return -1;

    /* Create a pipe to WAL thread. */
    cpipe_create(&writer->wal_pipe, "wal");
    cpipe_set_max_input(&writer->wal_pipe, IOV_MAX);
    return 0;
}

The writer->cord points to the statically allocated wal_writer_singleton. In cord_costart we start a new cord

cord_costart
    ...
    cord_start(cord, name, cord_costart_thread_func, ctx)
        ...
        cord->loop = ev_loop_new()
        ...
        tt_pthread_create(cord_thread_func)
            // New thread
            cord_thread_func
                cord_create
                cord_costart_thread_func
                    fiber_new("main", wal_writer_f);
                        wal_writer_f

Once the new event loop is allocated this thread runs wal_writer_f

static int
wal_writer_f(va_list ap)
    struct wal_writer *writer = &wal_writer_singleton;
    // Init coio in this thread
    coio_enable();

    // This is new thread and new cord thus
    // we need own fiber scheduler, this is
    // event consumer.
    struct cbus_endpoint endpoint;
    cbus_endpoint_create(&endpoint, "wal", fiber_schedule_cb, fiber());

    // This one is event producer from wal thread to
    // the main thread.
    cpipe_create(&writer->tx_prio_pipe, "tx_prio");

    // Enter the event loop
    cbus_loop(&endpoint);
    ...

We’re running a new thread with own event loop and a fiber scheduler. To communicate with this cord we use communication bus (cbus) engine (the very rought cbus arhitecure is the following: there are endpoints with names which are event consumers, and cpipe peers which are event producers; producer push an event into endpoints and cbus deliver a message to the destination by specified routes).

The cbus_endpoint_create creates "wal" endpoint which is an event consumer (ie inside newly created wal thread).

int
cbus_endpoint_create(struct cbus_endpoint *endpoint,
                     const char *name,
                     void (*fetch_cb)(...),
                     void *fetch_data)
{
    ...
    snprintf(endpoint->name, sizeof(endpoint->name), "%s", name);
    endpoint->consumer = loop();
    ...
    ev_async_init(&endpoint->async, fetch_cb);
    endpoint->async.data = fetch_data;
    ev_async_start(endpoint->consumer, &endpoint->async);
}

Right after creating the consumer we make an event producer writer->tx_prio_pipe.

void
cpipe_create(struct cpipe *pipe, const char *consumer)
{
    ...
    pipe->producer = cord()->loop;

    ev_async_init(&pipe->flush_input, cpipe_flush_cb);
    pipe->flush_input.data = pipe;

    struct cbus_endpoint *endpoint =
        cbus_find_endpoint_locked(&cbus, consumer);
    ...
    pipe->endpoint = endpoint;
}

Note that writer->tx_prio_pipe connects to the endpoint allocated in the main tarantool thread.

box_cfg_xc(void)
    ...
    cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb...);

Thus we have two endpoints - "wal" which sits in the wal thread and "tx_prio" which sits in the main tarantool thread. This allows us to notify wal thread from main thread via "wal" endpoint and reverse via "tx_prio" endpoint.

Back to wal_writer_f code: we enter the event loop cbus_loop and wait for events to to appear (via traditional libev delivery).

void
cbus_loop(struct cbus_endpoint *endpoint)
{
    while (true) {
        cbus_process(endpoint);
        if (fiber_is_cancelled())
            break;
        fiber_yield();
    }
}

The cbus_process above fetches message from a queue and process them (or we call it cmsg_deliver which is basically a chain of function pointers and cpipes to notify).

Now back to wal_init. The wal thread is running but we need to push the messages to it from our side. For this sake we create a communication pipe (cpipe).

wal_init
    ...
    /* Create a pipe to WAL thread. */
    cpipe_create(&writer->wal_pipe, "wal");

Since endpoint name is "wal" this cpipe will be nofitying wal thread.

In summary we have:

  • endpoint "tx_prio" which listens for events inside main tarantool thread;
  • endpoint "wal" for events inside wal thread;
  • cpipe tx_prio_pipe to notify main thread from inside of wal thread;
  • cpipe wal_pipe to notify wal thread from inside of main thread.

Write data to WAL

When we need to issue a real write we allocate an journal entry which has a complete set of data to be written in a one pass.

struct journal_entry {
    // To link entries
    struct stailq_entry         fifo;
    // vclock or error code
    int64_t                     res;
    // transaction completions
    journal_entry_complete_cb   on_complete_cb;
    void                        *on_complete_cb_data;
    // real user data to write
    size_t                      approx_len;
    int                         n_rows;
    struct xrow_header          *rows[];
};

We are not interested in specific data associated with the write but need to point that entries are chained via fifo member and comes in strict order to be able to rollback if something goes wrong.

Once allocated the entry is passed to

static int
wal_write(struct journal *journal, struct journal_entry *entry)
    ...
    batch = (struct wal_msg *)mempool_alloc(&writer->msg_pool);
    wal_msg_create(batch);
    stailq_add_tail_entry(&batch->commit, entry, fifo);
    cpipe_push(&writer->wal_pipe, &batch->base);
    ...
    cpipe_flush_input(&writer->wal_pipe);

Here we allocate the communication record (wal_msg_create) then bind journal entry into it, push it into writer->wal_pipe and notify the producer that there is data to handle. Note that notification does not mean the data gonna be handled immediately but get queued into the event loop. The loop here is our main cord loop (remember as we create writer->wal_pipe in wal_write).

Once main loop start handling this message it calls a callback associated with this wal pipe

static inline void
cpipe_flush_input(struct cpipe *pipe)
{
    ...
    if (pipe->n_input < pipe->max_input) {
        ev_feed_event(pipe->producer,
                      &pipe->flush_input, EV_CUSTOM);
    } else {
        ev_invoke(pipe->producer,
                  &pipe->flush_input, EV_CUSTOM);
    }
}

The associated call is

static void
cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events)
{
    struct cbus_endpoint *endpoint = pipe->endpoint;
    ...
    stailq_concat(&endpoint->output, &pipe->input);
    ...
    ev_async_send(endpoint->consumer, &endpoint->async);
}

The endpoint belongs to wal-thread event loop to which we send the notifcation. Once notification received it runs a callback which has been initialized earlier in wal_write

wal_write(struct journal *journal, struct journal_entry *entry)
    ...
    wal_msg_create(batch);
    ...

where

static struct cmsg_hop wal_request_route[] = {
    {wal_write_to_disk, &wal_writer_singleton.tx_prio_pipe},
    {tx_schedule_commit, NULL},
};

static void
wal_msg_create(struct wal_msg *batch)
{
    cmsg_init(&batch->base, wal_request_route);
    ...
}

In other words the cbus_loop inside wal thread wakes and fetches the message (we’re sharing memory between main tarantool thread and wal thread) and manage that named “route” functions one by one in direct order.

The routing functions are a bit tricky: the first argument is the function to call and the second is the cpipe to feed event to once function is complete.

First the wal_write_to_disk tries to write journal entries in a batch to the storage. Actually it does a way more than simply write to the disk but we’re not going to consider it right now. What is important is that each journal entry gets vclock value assigned to the journal_entry:res member (which is set to -1 on failure).

Once everything is written the tx_prio_pipe is notified and then tx_schedule_commit is running inside main thread.

static void
tx_schedule_queue(struct stailq *queue)
{
    struct journal_entry *req, *tmp;
    stailq_foreach_entry_safe(req, tmp, queue, fifo)
        journal_entry_complete(req);
}

static void
tx_schedule_commit(struct cmsg *msg)
{
    struct wal_writer *writer = &wal_writer_singleton;
    struct wal_msg *batch = (struct wal_msg *) msg;

    if (!stailq_empty(&batch->rollback)) {
        stailq_concat(&writer->rollback, &batch->rollback);
    }

    vclock_copy(&replicaset.vclock, &batch->vclock);
    tx_schedule_queue(&batch->commit);
    mempool_free(&writer->msg_pool, ...);
}

Here we call a callback associated with journal entry (it is been assigned during entry allocation we will talk about it later) and then drop the cbus message back to free pool.

In summary:

  • we notify the wal thread via wal_pipe;
  • wal thread runs wal_write_to_disk and notifies main thread via tx_prio_pipe;
  • main thread runs tx_schedule_commit.

Transactions processing

Transactions processing in 1.x series

In this series all transactions are processed in synchronous way. The journal entry carries no callbacks. We allocate the journal entry and bind the transaction into from inside of the main cord

struct txn *
txn_begin(bool is_autocommit)
{
    static int64_t txn_id = 0;
    struct txn *txn = region_alloc_object(&fiber()->gc, struct txn);
    txn->id = ++txn_id;
    txn->signature = -1;
    txn->engine = NULL;
    txn->engine_tx = NULL;
    fiber_set_txn(fiber(), txn);
    return txn;
}

which implies that the fiber which issue the trancaction must not be freed until the transaction processing is finished. The txn->signature is set to -1 pointing that transaction has not yet been processed (same code is used in case if transaction has failed though). The signature is set to vclock upon transaction completion by the wal engine.

int
txn_commit(struct txn *txn)
{
    if (txn->engine != NULL) {
        if (engine_prepare(txn->engine, txn) != 0)
            goto fail;
    }

    if (txn->n_rows > 0) {
        txn->signature = txn_write_to_wal(txn);
        if (txn->signature < 0)
            goto fail;
    }
    if (txn->engine != NULL)
        engine_commit(txn->engine, txn);

    fiber_gc();
    fiber_set_txn(fiber(), NULL);
    return 0;
fail:
    txn_rollback();
    return -1;
}

The key moment here is txn_write_to_wal function which sends the transaction to the journal engine, which in turn passes it to the wal thread.

static int64_t
txn_write_to_wal(struct txn *txn)
{
    struct journal_entry *req = journal_entry_new(txn->n_rows);
    ...
    int64_t res = journal_write(req);
    ...
    if (res < 0)
        txn_rollback();
    ...
    return res;
}

The journal_write sends to wal thread and what is important it yields the current fiber. Unlinke 2.x series there is no callbacks associated with journal entry we just wake up the fiber which has initiated the transaction (the fiber initiating the transaction saves pointer to the self in journal_entry structure.

// main cord thread
wal_write
    // notify wal thread about queued data
    cpipe_flush_input(&writer->wal_pipe);
    ...
    bool cancellable = fiber_set_cancellable(false);
    fiber_yield();
    fiber_set_cancellable(cancellable);
    return entry->res;

// wal thread
wal_write_to_disk
    ...
    // notify main thread
    wal_writer_singleton.tx_prio_pipe

// main thread
tx_schedule_queue
    stailq_foreach_entry(req, queue, fifo)
        fiber_wakeup(req->fiber);

Thus the transaction is woken up once wal thread finished processing of the transaction and wrote the entry to the storage.

Then the fiber from wal_write is woken up and test for write result via txn->signature and either pass the commit to engine or calls txn_rollback to rollback the transaction on failure.

Transactions processing in 2.x series

The transaction processing in 2.x series is almost the same as in 1.x with one significant exception - journal writes became asynchronous. We bind callback txn_entry_complete_cb to the journal entry which completes the transaction. This has been done in the sake of parallel applier (which is heavily used in replication engine).

Thus the txn_write routine does not wait for transaction to complete, still for synchrounous transactions we wait explicitly until the journal callback finished

int
txn_commit(struct txn *txn)
{
    txn->fiber = fiber();

    // Async processing
    if (txn_write(txn) != 0)
        return -1;

    // Wait for completion
    if (!txn_has_flag(txn, TXN_IS_DONE)) {
        bool cancellable = fiber_set_cancellable(false);
        fiber_yield();
        fiber_set_cancellable(cancellable);
    }

    int res = txn->signature >= 0 ? 0 : -1;
    txn_free(txn);
    return res;
}