Wednesday, 7 December 2011

Eventual Consistency in MySQL Cluster - using epochs




Before getting to the details of how eventual consistency is implemented, we need to look at epochs. Ndb Cluster maintains an internal distributed logical clock known as the epoch, represented as a 64 bit number. This epoch serves a number of internal functions, and is atomically advanced across all data nodes.

Epochs and consistent distributed state

Ndb is a parallel database, with multiple internal transaction coordinator components starting, executing and committing transactions against rows stored in different data nodes. Concurrent transactions only interact where they attempt to lock the same row. This design minimises unnecessary system-wide synchronisation, enabling linear scalability of reads and writes.

The stream of changes made to rows stored at a data node are written to a local Redo log for node and system recovery. The change stream is also published to NdbApi event listeners, including MySQLD servers recording Binlogs. Each node's change stream contains the row changes it was involved in, as committed by multiple transactions, and coordinated by multiple independent transaction coordinators, interleaved in a partial order.

  Incoming independent transactions
affecting multiple rows

T3 T4 T7
T1 T2 T5

| | |
V V V

-------- -------- --------
| 1 | | 2 | | 3 |
| TC | | TC | | TC | Data nodes with multiple
| |--| |--| | transaction coordinators
|------| |------| |------| acting on data stored in
| | | | | | different nodes
| DATA | | DATA | | DATA |
-------- -------- --------

| | |
V V V

t4 t4 t3
t1 t7 t2
t2 t1 t7
t5

Outgoing row change event
streams by causing
transaction


These row event streams are generated independently by each data node in a cluster, but to be useful they need to be correlated together. For system recovery from a crash, the data nodes need to recover to a cluster-wide consistent state. A state which contains only whole transactions, and a state which, logically at least, existed at some point in time. This correlation could be done by an analysis of the transaction ids and row dependencies of each recorded row change to determine a valid order for the merged event streams, but this would add significant overhead. Instead, the Cluster uses a distributed logical clock known as the epoch to group large sets of committed transactions together.

Each epoch contains zero or more committed transactions. Each committed transaction is in only one epoch. The epoch clock advances periodically, every 100 milliseconds by default. When it is time for a new epoch to start, a distributed protocol known as the Global Commit Protocol (GCP) results in all of the transaction coordinators in the Cluster agreeing on a point of time in the flow of committing transactions at which to change epoch. This epoch boundary, between the commit of the last transaction in epoch n, and the commit of the first transaction in epoch n+1, is a cluster-wide consistent point in time. Obtaining this consistent point in time requires cluster-wide synchronisation, between all transaction coordinators, but it need only happen periodically.

Furthermore, each node ensures that the all events for epoch n are published before any events for epoch n+1 are published. Effectively the event streams are sorted by epoch number, and the first time a new epoch is encountered signifies a precise epoch boundary.

 Incoming independent transactions

T3 T4 T7
T1 T2 T5

| | |
V V V

-------- -------- --------
| 1 | | 2 | | 3 |
| TC | | TC | | TC | Data nodes with multiple
| |--| |--| | transaction coordinators
|------| |------| |------| acting on data stored in
| | | | | | different nodes
| DATA | | DATA | | DATA |
-------- -------- --------

| | |
V V V

t4(22) t4(22) t3(22) Epoch 22
...... ...... ......
t1(23) t7(23) t2(23) Epoch 23
t2(23) t1(23) t7(23)
......
t5(24) Epoch 24

Outgoing row change event
streams by causing transaction
with epoch numbers in ()



When these independent streams are merge-sorted by epoch number we get a unified change stream. Multiple possible orderings can result.
One Partial ordering is shown here :

      Events      Transactions
contained in epoch

t4(22)
t4(22) {T4,T3}
t3(22)

......

t1(23)
t2(23)
t7(23)
t1(23) {T1, T2, T7}
t2(23)
t7(23)

......

t5(24) {T5}



Note that we can state from this that T4 -> T1 (Happened before), and T1 -> T5. However we cannot say whether T4 -> T3 or T3 -> T4. In epoch 23 we see that the row events resulting from T1, T2 and T7 are interleaved.

Epoch boundaries act as markers in the flow of row events generated by each node, which are then used as consistent points to recover to. Epoch boundaries also allow a single system wide unified transaction log to be generated from each node's row change stream, by merge-sorting the per-node row change streams by epoch number. Note that the order of events within an epoch is still not tightly constrained. As concurrent transactions can only interact via row locks, the order of events on a single row (Table and Primary key value) signifies transaction commit order, but there is by definition no order between transactions affecting independent row sets.

To record a Binlog of Ndb row changes, MySQLD listens to the row change streams arriving from each data node, and merge-sorts them them by epoch into a single, epoch-ordered stream. When all events for a given epoch have been received, MySQLD records a single Binlog transaction containing all row events for that epoch. This Binlog transaction is referred to as an 'Epoch transaction' as it describes all row changes that occurred in an epoch.

Epoch transactions in the Binlog

Epoch transactions in the Binlog have some interesting properties :
  • Efficiency : They can be considered a kind of Binlog group commit, where multiple user transactions are recorded in one Binlog (epoch) transaction. As an epoch normally contains 100 milliseconds of row changes from a cluster, this is a significant amortisation.
  • Consistency : Each epoch transaction contains the row operations which occurred when moving the cluster from epoch boundary consistent state A to epoch boundary consistent state B
    Therefore, when applied as a transaction by a slave, the slave will atomically move from consistent state A to consistent state B
  • Inter-epoch ordering : Any row event recorded in epoch n+1 logically happened after every row event in epoch n
  • Intra-epoch disorder : Any two row events recorded in epoch n, affecting different rows, may have happened in any order.
  • Intra-epoch key-order : Any two row events recorded in epoch n, affecting the same row, happened in the order they are recorded.

The ordering properties show that epochs give only a partial order, enough to subdivide the row change streams into self-consistent chunks. Within an epoch, row changes may be interleaved in any way, except that multiple changes to the same row will be recorded in the order they were committed.

Each epoch transaction contains the row changes for a particular epoch, and that information is recorded in the epoch transaction itself, as an extra WRITE_ROW event on a system table called mysql.ndb_apply_status. This WRITE_ROW event contains the binlogging MySQLD's server id and the epoch number. This event is added so that it will be atomically applied by the Slave along with the rest of the row changes in the epoch transaction, giving an atomically reliable indicator of the replication 'position' of the Slave relative to the Master Cluster in terms of epoch number. As the epoch number is abstracted from the details of a particular Master MySQLD's binlog files and offsets, it can be used to failover to an alternative Master.

We can visualise a MySQL Cluster Binlog as looking something like this. Each Binlog transaction contains one 'artificially generated' WRITE_ROW event at the start, and then RBR row events for all row changes that occurred in that epoch.

    BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=6998
WRITE_ROW ...
UPDATE_ROW ...
DELETE_ROW ...
...
COMMIT # Consistent state of the database

BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=6999
...
COMMIT # Consistent state of the database

BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=7000
...
COMMIT # Consistent state of the database
...


A series of epoch transactions, each with a special WRITE_ROW event for recording the epoch on the Slave. You can see this structure using the mysqlbinlog tool with the --verbose option.

Rows tagged with last-commit epoch

Each row in a MySQL Cluster stores a hidden metadata column which contains the epoch at which a write to the row was last committed. This information is used internally by the Cluster during node recovery and other operations. The ndb_select_all tool can be used to see the epoch numbers for rows in a table by supplying the --gci or --gci64 options. Note that the per-row epoch is not a row version, as two updates to a row in reasonably quick succession will have the same commit epoch.

Epochs and eventual consistency

Reviewing epochs from the point of view of my previous posts on eventual consistency we see that :
  • Epochs provide an incrementing logical clock
  • Epochs are recorded in the Binlog, and therefore shipped to Slaves
  • Epoch boundaries imply happened-before relationships between events before and after them in the Binlog

The properties mean that epochs are almost perfect for monitoring conflict windows in an active-active circular replication setup, with only a few enhancements required.

I'll describe these enhancements in the next post.

Edit 23/12/11 : Added index

6 comments:

Max said...

Very nice and clear post Frazer,you should become a trainer :)

A question I have regarding parallell procesing of the transactions on the slave side. How do you know which transactions within an epoch are on using the same primary key values? or do you?

Frazer Clement said...

Hi Max,
Good to hear from you!
Thanks for the compliments,

Regarding the slave, each epoch transaction is applied as a single transaction via the Ndb table handler.
When tracking inter-transaction dependencies (independent transactions accessing the same primary key), the Ndb handler uses an internal hash table, This hash table is maintained for the duration of the (epoch) transaction. The keys of the hash are the table id and primary key values. For each key, the hash stores a transaction id.
As each row event is applied, the Ndb handler inserts an entry into the hash with the event's table id and primary key values, and the event's user transaction id. If a hash entry already exists then the Ndb handler knows that some row event has already been applied to that row in this epoch transaction. If that event had a different transaction id to the current event then there is an inter-transaction dependency between the existing entry and the new entry.
So dependency tracking within an epoch is done using an in-memory hash table.
I hope that answers your question. Let me know if it's not clear.
Frazer

Max said...

Thanks Frazer, this does partly answer my question.

The remaining question would be that using this hash table you could then execute the queries within an epoch in parallell threads. Is this done? or has any work been done to this end?

Frazer Clement said...

Hi Max,
You're correct that the transaction dependencies can be used to find sets of independent transactions which can be executed in parallel.
For Cluster, all user transactions recorded in a single epoch transaction on the Master are applied as a single transaction on the slave. The parallelism of application on the slave is therefore only limited by the Slave execution batch size. In other words, we already have parallel application of all user transaction operations within an epoch on the Slave. These parallel operations share a single TC instance thread, but are fully parallel in terms of LQH execution (and are asynchronous with respect to each other).
Monitoring dependencies between separate Binlog transactions (epoch or otherwise) could be used to increase parallelism on the Slave. For Cluster it could be used to apply epochs in parallel, and for other engines such as InnoDB, it would give the inter-transaction parallelism which is currently missing.
I believe that the replication team are working on increasing the utility of the multithreaded slave work they have started (currently only multithreaded between DBs). I'm not sure if they're looking at key-based dependency analysis yet.
Frazer

Max said...

Thanks Frazer. Yes, my assumption was that since there would only be one binlog applier on the slave cluster that could easilily become a bottleneck. I mean on the MySQL server side, not the data nodes, as the binlog execution thread is also monothreaded on the server. But it might be that the load on the MySQL server is so small compared to the data nodes that this isn't an issue?

Frazer Clement said...

Hi Max,
Yes asynchronous replication through MySQL is single threaded in a number of places including the Binlog writing, the Slave IO thread and the Slave SQL thread. For Ndb, the TC thread handling each epoch transaction is also a single-threaded choke point. The Slave SQL thread is often the biggest bottleneck as it executes and commits transactions from the Binlog serially.
For Ndb I am not sure what the first bottleneck would be. Due to the batching and intra-epoch parallelism we rarely hear of users with replication performance problems.
If we were to have a performance problem due to single threading in the slave SQL thread then we could attempt to parallelise some part of the Slave work from a single serial Binlog, such as executing multiple epochs concurrently.

However, perhaps a better fix would be to parallelise across multiple replication 'channels', e.g. Binlogging Master MySQLDs and Applying Slave MySQLDs. This could be done by splitting the row change events by some PK hash on the Master Cluster, and having each of n Binlogging MySQLDs only receive and record a subset of the row changes in an epoch. These partial Binlogs can then be pulled and applied on the Slave in parallel by independent Slave MySQLDs. The problem then becomes ensuring consistency on the Slave Cluster if required. One option is to support a single transaction with multiple contributing clients, but a single commit point. This is not too tricky to implement in Ndb, but would still have a single TC thread as a potential bottleneck. Another option is to support a kind of distributed commit of independent partial-epoch transactions so that everything is parallel up to the point of commit.

The cost of synchronising multiple processes at commit time can be amortised across one or more epochs. The balance here is to reduce synchronisation as much as possible without replication latency becoming excessive. I suppose an ideal system would 'change gears' depending on the load + size of epochs received.

Anyway, this is all theory as currently async replication performance is not a big problem for our users.

Frazer