Thursday, 8 December 2011

Eventual consistency in MySQL Cluster - implementation part 1

The last post described MySQL Cluster epochs and why they provide a good basis for conflict detection, with a few enhancements required. This post describes the enhancements.

The following four mechanisms are required to implement conflict detection via epochs :
  1. Slaves should 'reflect' information about replicated epochs they have applied
    Applied epoch numbers should be included in the Slave Binlog events returning to the originating cluster, in a Binlog position corresponding to the commit time of the replicated epoch transaction relative to Slave local transactions.
  2. Masters should maintain a maximum replicated epoch
    A cluster should use the reflected epoch information to track which of its epochs has been applied by a Slave cluster. This will be the maximum of all epochs applied by the Slave.
  3. Masters should track commit-time epoch per row
    To allow per-row detection of conflicts
  4. Masters should track commit-authorship per row
    To differentiate recent epochs due to replication or conflicting activity.

'Reflecting' epoch information and maintaining the maximum replicated epoch

Every epoch transaction in the Binlog contains a special WRITE_ROW event on the mysql.ndb_apply_status table which carries the epoch transaction's epoch number. This is designed to give an atomically consistent way to determine a Slave cluster's position relative to a Master cluster. Normally these WRITE_ROW events are applied by the Slave but not logged in the Slave's Binlog, even when --log-slave-updates is ON. A new MySQLD option, --ndb-log-apply-status causes WRITE_ROW events applied to the mysql.ndb_apply_status table to be binlogged at a Slave, even when --log-slave-updates is OFF. These events are logged with the ServerId of the Slave MySQLD, so that they can be applied on the Master, but will not loop infinitely.

Allowing this applied epoch information to propagate through a Slave Cluster has the following effects :
  1. Downstream Clusters become aware of their position relative to all upstream Master clusters, not just their immediate Master cluster.
    They gain extra mysql.ndb_apply_status entries for all upstream Masters.
  2. Circularly replicating clusters become aware of which of their epochs, and epoch transactions, have been applied to all clusters in the circle.
    They gain extra mysql.ndb_apply_status entries for all Binlogging MySQLDs in the loop

Effect 1 is useful for replication failover with more than two replication-chained clusters where an intermediate cluster is being routed-around (A->B->C) -> (A->C). Cluster C knows the correct Binlog file and position to resume from on A, without consulting B.

Effect 2 could be used to allow clients to wait until their writes have been fully replicated and are globally visible, a kind of synchronous replication. More relevantly, effect 2 allows us to maintain a maximum replicated epoch value for detecting conflicts.

The visible result of using --ndb-log-apply-status on a Slave is that the mysql.ndb_apply_status table on the Master contains extra entries for the Binlogging MySQLDs attached to its Cluster. The maximum replicated epoch is the maximum of these epoch values.

    Cluster 1 Epoch transactions in flight in
a circular configuration
(Ignoring Cluster 2 epochs)

39 38 37
/ \ (Queued epochs 36-26)
Cluster 1 Cluster 2
(Queued epochs 23,24) \ /
25 26 27

Current epoch = 40
Max replicated epoch = 22

A MySQLD acting as a conflict detecting Slave for a cluster needs to know the attached cluster's maximum replicated epoch for conflict detection. On Slave start, before the Slave starts applying replicated changes to the Ndb storage engine, it scans the mysql.ndb_apply_status table to find the highest reflected epoch value. Rows in mysql.ndb_apply_status with server ids in the CHANGE MASTER TO IGNORE_SERVER_IDS list are considered to be local servers, as well as the Slave's own server id, and the maximum replicated epoch is the maximum epoch value from these rows.

@ Slave start

max_replicated_epoch = SELECT MAX(epoch)
FROM mysql.ndb_apply_status

Once the Max_replicated_epoch has been initialised at slave start, it is updated as each reflected epoch event (WRITE_ROW event to mysql.ndb_apply_status) arrives and is processed by the Slave SQL thread. The current Max_replicated_epoch can be seen by issuing the command SHOW STATUS LIKE 'Ndb_slave_max_replicated_epoch';. Note that this is really just a cached copy of the current result of the SELECT MAX(epoch) query from above. One subtlety is that the max_replicated_epoch is only changed when the Slave commits an epoch transaction, as it is only at this point that we know for sure that any event committed on the other cluster before the replicated epoch was applied has been handled.

Per row last-modified epoch storage

Each row stored in Ndb has a built-in hidden metadata column called NDB$GCI64. This columns stores the epoch number at which the row was last modified. For normal system recovery purposes, only the top 32 bits of the 64 bit epoch, called the Global Checkpoint Index or GCI are used. NDB$EPOCH needs further bits to be stored per-row. Epoch values only use a few of the bits in the bottom 32 bits of the epoch, so by default 6 extra bits per row are used to enable a full 64 bit epoch to be stored for each row. The actual number of bits used can be controlled by a parameter to NDB$EPOCH. Where some epoch is not fully expressible in the number of bits available, the bottom 32 bits are saturated, which again errs on the side of safety, potentially causing false conflicts, but ensuring no real conflicts are missed. The ndb_select_all tool has a --gci64 option which shows each row's stored epoch value.

A conflict detecting slave detects conflicts between transactions already committed, whose rows have their commit-time epoch numbers, and incoming operations in an epoch transaction, which are considered to have been committed at the epoch given by the current Maximum Replicated Epoch. An incoming operation is considered to be in-conflict if the row it affects has a last-committed epoch that is greater than the current Maximum Replicated Epoch.

  in_conflict = (ndb_gci64 > max_replicated_epoch)

In other words, at the time the change was committed on the other Cluster, that other Cluster was only aware of our changes as-of our epoch (max_replicated_epoch). Therefore it was unaware of any changes committed in more recent epochs. If the row being changed has been locally modified since that epoch then there have been concurrent modifications and a conflict has been discovered.

Note that this mechanism is purely based on monitoring serialisation of updates to rows. No semantic understanding of row data, or the meaning of applied changes is attempted. Even if both clusters update some row to contain exactly the same value it will be considered to be a conflict, as the updates were not serialised with respect to each other.

Per row hidden Author metacolumn

One advantage of reusing the row's last-modified epoch number for conflict detection is that it is automatically set on every commit. However the downside is that when a replicated modification is found to not be in conflict, and is applied, the row's epoch is automatically set to the current value at commit time as normal. By definition, the current epoch value is always greater than the maximum replicated epoch, and so if a further replicated modification to the same row were to arrive, it would find the row's epoch to be higher than the current maximum replicated epoch, and detect a false conflict.

In theory we could consider the current maximum replicated epoch to be the row's commit time epoch, but as the per-row epoch is used for other more critical DB recovery purposes it's not safe to abuse it in this way. Instead we use the observation that if we found a previous row update from some other cluster to be not-in-conflict, then further updates from it are also not-in-conflict.

To detect this, a new hidden metadata column is introduced called NDB$AUTHOR. This column is set to zero when a row is modified by any unmodified NdbApi client, including MySQLD, but when a row is modified by the MySQLD Slave SQL thread, it is set to one. More generally, NDB$AUTHOR could be set to a non-zero identifier of which other cluster sourced an accepted change. Just setting to one limits us to having one other cluster originating potentially conflicting changes. The ndb_select_all tool has a --author option which shows each row's stored Author value.

By extending the conflict detecting function to examine the NDB$AUTHOR value, we avoid the problem of falsely detecting conflicts when applied consecutive replicated changes.
  in_conflict = (ndb$author != change_author) && (ndb_gci64 > max_replicated_epoch)

We are currently just using 1 to mean 'other author', so this simplifies to :
 in_conflict = (ndb$author != 1) && (ndb_gci64 > max_replicated_epoch)

= (ndb$author == 0) && (ndb_gci64 > max_replicated_epoch)

This conflict detection function is encoded in an Ndb interpreted program and attached to the replicated DELETE and UPDATE NdbApi operations so that it can be quickly and atomically executed at the Ndb data nodes as a predicate prior to applying the operation.

Ndb binlog row event ordering and false conflicts

The happened-before relationship between reflected epoch events (WRITE_ROW to mysql.ndb_apply_status) and incoming row events is used to determine whether a conflict has occurred. As described in the last post, Ndb offers limited ordering guarantees on the row events within an epoch transaction. The only guarantee is that multiple changes to the same row will be recorded in the order they committed. This implies that the relative ordering of the reflected epoch WRITE_ROW event, on some row in mysql.ndb_apply_status, and other row events on other tables sharing the same epoch transaction is meaningless. The only ordering guarantees between different rows exist at epoch boundaries.

This means that if we see a reflected epoch WRITE_ROW event somewhere in replicated epoch j, then we can only safely assume that this happened before incoming row events in epoch j+1 and later. The row events appearing before and after the reflected epoch WRITE_ROW event in epoch j may have committed before or after the reflected epoch event.

The relaxed relative ordering gives us reduced precision in determining happened-before, and to be safe, we must err on the side of assuming that a conflict exists rather than that it does not. Consider a Master committing a change to row X, recorded in epoch N. This is then applied on the Slave in Slave epoch S. If the Slave then commits a local change affecting the same row X in the same epoch S, this will be returned to the Master in the same Slave epoch transaction, and the Master will be unable to determine whether it occurred before or after it's original write to X, so must assume that it occurred before and is therefore in conflict. If the Slave had committed its change in epoch S+1 or later, the happened-before relationship would be clear and the change would not be considered in conflict.

These potential false conflicts are the price paid here for the lack of fine grained event ordering in the Ndb Binlog.

I'm lost

There's been a lot of information, or at least a lot of words. Let's summarise how NDB$EPOCH and NDB$EPOCH_TRANS detect row conflicts by following

  • @Cluster A
    Transactions modify rows, automatically setting their hidden NDB$GCI64 column to the current epoch and their NDB$AUTHOR column to 0

    Binlogging MySQLDs record modified rows in epoch transactions in their Binlogs, together with MySQLD generated mysql.ndb_apply_status WRITE_ROW events

  • @Cluster B
    Slave MySQLDs apply replicated epoch transactions along with their generated mysql.ndb_apply_status WRITE_ROW events

    Other clients of Cluster B commit transactions against the same data.

    Binlogging MySQLDs 'reflect' the applied-replicated epoch information by recording the mysql.ndb_apply_status WRITE_ROW events in their Binlogs as a result of --ndb-log-apply-status.

    Binlogging MySQLDs also record the row changes made by local clients.

  • @Cluster A
    Slave MySQLDs track the incoming reflected epoch mysql.ndb_apply_status WRITE_ROW events to maintain their ndb_slave_max_replicated_epoch variables

    Slave MySQLDs attach NdbApi interpreted programs to UPDATE and DELETE operations as they are applied to the database, comparing the row's stored NDB$GCI64 and NDB$AUTHOR columns with constant values supplied in the program.

    If there are no conflicts, the UPDATE and DELETE operations are applied, and the row's NDB$AUTHOR columns are set to one indicating a successful Slave modification

    If there are conflicts then conflict handling for the conflicting rows begins.

Now does that make any sense? Assuming it does, then next we look at how detected conflicts are handled.

Once again, another wordy endurance test and we're not finished. Surely the end must be near?

Edit 23/12/11 : Added index


Anonymous said...

Hello. Thanks for excellent information!
I can't understand one detail. In post about epoch there was formula:

in_conflict = row_clockval >= maximum_replicated_clockval

But here conflicts detecting algorithm uses slightly different formula:

in_conflict = (ndb_gci64 > max_replicated_epoch)

Please help me to understand properly conflict resolution.
In this picture we have 2 servers in circular replication: A and B, and we have two modifications marked by 'x'. Server A is receiving such log

'0 14x 1 15 2'.

0, 1, 2 says about max_replication_epoch. And when server had received '14x' it has 'something like ndb_gci64'=0 and max_replicated_epoch=0.

We have ndb_gci64 = max_replicated_epoch (not >), and there is no conflict for cluster. Am I right?

Frazer Clement said...

I will rephrase, which will either clarify, confuse, or both :)

The goal with NDB$EPOCH()* is to have a single, serial timeline for each row, so that writes to the 'same data' occur one after the other, rather than concurrently.

To ensure this, we need to detect concurrent writes.

This is done by attaching a monotonically increasing counter value to (sets of) writes. The Primary cluster tags each set of writes with this counter value, and they keep the tag as they traverse around the replication circle. Additionally, the rows in the Primary cluster's database are tagged with this counter value, kind of like the last-modified-time on a file in Unix.

When the Primary cluster receives one of its own tags 'back' (e.g. it has traversed the replication circle), it increments its 'highest replicated counter value'. The implication is that all rows with counter values <= this value are now considered 'done', so that any changes originating *after* this are not considered to be concurrent.

This 'highest replicated counter' is used in the meantime when applying change sets originating from the other cluster(s). As each row is applied to the primary cluster, we check the 'last-modified-time' counters in the primary cluster database. If they have counter values <= the current 'highest replicated counter value' then they are stable, and these changes are arriving 'after' them, and so are serial w.r.t. them. If they have counter values > the current 'highest replicated counter value' then they are concurrent and therefore in-conflict.

In our case, the highest replicated counter value is called MaxReplicatedEpoch and the stored 'last-modified-time' is called ndb_gci64.

Once MaxReplicatedEpoch >= a particular row's ndb_gci64, further changes to *that row* from other clusters are considered to be not-in-conflict. While MaxReplicatedEpoch < a particular row's ndb_gci64, further changes to *that row* from other clusters are considered to be in-conflict.

You are right that the two formulae you quoted are not quite the same, w.r.t our implementation I would say that the second one (in_conflict = (ndb_gci64 > max_replicated_epoch) is correct. The first one is more pessimistic, finding conflicts where there may be none. Sorry for the confusion caused.