Thursday, 22 December 2011

Eventual Consistency in MySQL Cluster - implementation part 3




As promised, this is the final post in a series looking at eventual consistency with MySQL Cluster asynchronous replication. This time I'll describe the transaction dependency tracking used with NDB$EPOCH_TRANS and review some of the implementation properties.

Transaction based conflict handling with NDB$EPOCH_TRANS

NDB$EPOCH_TRANS is almost exactly the same as NDB$EPOCH, except that when a conflict is detected on a row, the whole user transaction which made the conflicting row change is marked as conflicting, along with any dependent transactions. All of these rejected row operations are then handled using inserts to an exceptions table and realignment operations. This helps avoid the row-shear problems described here.

Including user transaction ids in the Binlog

Ndb Binlog epoch transactions contain row events from all the user transactions which committed in an epoch. However there is no information in the Binlog indicating which user transaction caused each row event. To allow detected conflicts to 'rollback' the other rows modified in the same user transaction, the Slave applying an epoch transaction needs to know which user transaction was responsible for each of the row events in the epoch transaction. This information can now be recorded in the Binlog by using the --ndb-log-transaction-id MySQLD option. Logging Ndb user transaction ids against rows in-turn requires a v2 format RBR Binlog, enabled with the --log-bin-use-v1-row-events=0 option. The mysqlbinlog --verbose tool can be used to see per-row transaction information in the Binlog.

User transaction ids in the Binlog are useful for NDB$EPOCH_TRANS and more. One interesting possibility is to use the user transaction ids and same-row operation dependencies to sort the row events inside an epoch into a partial order. This could enable recovery to a consistent point other than an epoch boundary. A project for a rainy day perhaps?

NDB$EPOCH_TRANS multiple slave passes

Initially, NDB$EPOCH_TRANS proceeds in the same way as NDB$EPOCH, attempting to apply replicated row changes, with interpreted code attached to detect conflicts. If no row conflicts are detected, the epoch transaction is committed as normal with the same minimal overhead as NDB$EPOCH. However if a row conflict is detected, the epoch transaction is rolled back, and reapplied. This is where NDB$EPOCH_TRANS starts to diverge from NDB$EPOCH.

In this second pass, the user transaction ids of rows with detected conflicts are tracked, along with any inter-transaction dependencies detectable from the Binlog. At the end of the second pass, prior to commit, the set of conflicting user transactions is combined with the user transaction dependency data to get a complete set of conflicting user transactions. The epoch transaction initiated in the second pass is then rolled-back and a third pass begins.

In the third pass, only row events for non-conflicting transactions are applied, though these are still applied with conflict detecting interpreted programs attached in case a further conflict has arisen since the second pass. Conflict handling for row events belonging to conflicting transactions is performed in the same way as NDB$EPOCH. Prior to commit, the applied row events are checked for further conflicts. If further conflicts have occurred then the epoch transaction is rolled back again and we return to the second pass. If no further conflicts have occurred then the epoch transaction is committed.

These three passes, and associated rollbacks are only externally visible via new counters added to the MySQLD server. From an external observer's point of view, only non-conflicting transactions are committed, and all row events associated with conflicting transactions are handled as conflicts. As an optimisation, when transactional conflicts have been detected, further epochs are handled with just two passes (second and third) to improve efficiency. Once an epoch transaction with no conflicts has been applied, further epochs are initially handled with the more optimistic and efficient first pass.

Dependency tracking implementation

To build the set of inter-transaction dependencies and conflicts, two hash tables are used. The first is a unique hashmap mapping row event tables and primary keys to transaction ids. If two events for the same table and primary key are found in a single epoch transaction then there is a dependency between those events, specifically the second event depends on the first. If the events belong to different user transactions then there is a dependency between the transactions.

Transaction dependency detection hash :
{Table, Primary keys} -> {Transaction id}

The second hash table is a hashmap of transaction id to an in-conflict marker and a list of dependent user transactions. When transaction dependencies are discovered using the first dependency detection hash, the second hash is modified to reflect the dependency. By the end of processing the epoch transaction, all dependencies detectable from the Binlog are described.

Transaction dependency tracking and conflict marking hash :
{Transaction id} -> {in_conflict, List}

As epoch operations are applied and row conflicts are detected, the operation's user transaction id is marked in the dependency hash as in-conflict. When marking a transaction as in-conflict, all of its dependent transactions must also be transitively marked as in-conflict. This is done by a traverse through the dependency tree of the in-conflict transaction. Due to slave batching, the addition of new dependencies and the marking of conflicting transactions is interleaved, so adding a dependency can result in a sub-tree being marked as in-conflict.

After the second pass is complete, the transaction dependency hash is used as a simple hash for looking up whether a particular transaction id is in conflict or not :

Transaction in-conflict lookup hash :
{Transaction id} -> {in_conflict}

This is used in the third pass to determine whether to apply each row event, or to proceed straight to conflict handling.

The size of these hashes, and the complexity of the dependency graph is bounded by the size of the epoch transaction. There is no need to track dependencies across the boundary of two epoch transactions, as any dependencies will be discovered via conflicts on the data committed by the first epoch transaction when attempting to apply the second epoch transaction.

Event counters

Like the existing conflict detection functions, NDB$EPOCH_TRANS has a row-conflict detection counter called ndb_conflict_epoch_trans.

Additional counters have been added which specifically track the different events associated with transactional conflict detection. These can be seen with the usual SHOW GLOBAL STATUS LIKE syntax, or via the INFORMATION_SCHEMA tables.

  • ndb_conflict_trans_row_conflict_count
    This is essentially the same as ndb_conflict_epoch_trans - the number of row events with conflict detected.
  • ndb_conflict_trans_row_reject_count
    The number of row events which were handled as in-conflict. It will be at least as large as ndb_conflict_trans_row_count, and will be higher if other rows are implicated by being in a conflicting transaction, or being dependent on a row in a conflicting transaction.
    A separate ndb_conflict_trans_row_implicated_count could be constructed as ndb_conflict_trans_row_reject_count - ndb_conflict_trans_row_conflict_count
  • ndb_conflict_trans_reject_count
    The number of discrete user transactions detected as in-conflict.
  • ndb_conflict_trans_conflict_commit_count
    The number of epoch transactions which had transactional conflicts detected during application.
  • ndb_conflict_trans_detect_iter_count
    The number of iterations of the three-pass algorithm that have occurred. Each set of passes counts as one. Normally this would be the same as ndb_conflict_trans_conflict_commit_count. Where further conflicts are found on the third pass, another iteration may be required, which would increase this count. So if this count is larger than ndb_conflict_trans_conflict_commit_count then there have been some conflicts generated concurrently with conflict detection, perhaps suggesting a high conflict rate.


Performance properties of NDB$EPOCH and NDB$EPOCH_TRANS

I have tried to avoid getting involved in an explanation of Ndb replication in general which would probably fill a terabyte of posts. Comparing replication using NDB$EPOCH and NDB$EPOCH_TRANS relative to Ndb replication with no conflict detection, what can we can say?

  • Conflict detection logic is pushed down to data nodes for execution
    Minimising extra data transfer + locking
  • Slave operation batching is preserved
    Multiple row events are applied together, saving MySQLD <-> data node round trips, using data node parallelism
    For both algorithms, one extra MySQLD <-> data node round-trip is required in the no-conflicts case (best case)
  • NDB$EPOCH : One extra MySQLD <-> data node round-trip is required per *batch* in the all-conflicts case (worst case)
  • NDB$EPOCH : Minimal impact to Binlog sizes - one extra row event per epoch.
  • NDB$EPOCH : Minimal overhead to Slave SQL CPU consumption
  • NDB$EPOCH_TRANS : One extra MySQLD <-> data node round-trip is required per *batch* per *pass* in the all-conflicts case (worst case)
  • NDB$EPOCH_TRANS : One round of two passes is required for each conflict newly created since the previous pass.
  • NDB$EPOCH_TRANS : Small impact to Binlog sizes - one extra row event per epoch plus one user transaction id per row event.
  • NDB$EPOCH_TRANS : Small overhead to Slave SQL CPU consumption in no-conflict case

Current and intrinsic limitations

These functions support automatic conflict detection and handling without schema or application changes, but there are a number of limitations. Some limitations are due to the current implementation, some are just intrinsic in the asynchronous distributed consistency problem itself.

Intrinsic limitations
  • Reads from the Secondary are tentative
    Data committed on the secondary may later be rolled back. The window of potential rollback is limited, after which Secondary data can be considered stable. This is described in more detail here.
  • Writes to the Secondary may be rolled back
    If this occurs, the fact will be recorded on the Primary. Once a committed write is stable it will not be rolled back.
  • Out-of-band dependencies between transactions are out-of-scope
    For example direct communication between two clients creating a dependency between their committed transactions, not observable from their database footprints.

Current implementation limitations

  • Detected transaction dependencies are limited to dependencies between binlogged writes (Insert, Update, Delete)
    Reads are not currently included.
  • Delete vs Delete+Insert conflicts risk data divergence
    Delete vs Delete conflicts are detected, but currently do not result in conflict handling, so that Delete vs Delete + Insert can result in data divergence.
  • With NDB$EPOCH_TRANS, unplanned Primary outages may require manual steps to restore Secondary consistency
    With pending multiple, time spaced, non-overlapping transactional conflicts, an unexpected failure may need some Binlog processing to ensure consistency.

Want to try it out?

Andrew Morgan has written a great post showing how to setup NDB$EPOCH_TRANS. He's even included non-ascii art. This is probably the easiest way to get started. NDB$EPOCH is slightly easier to get started with as the --ndb-log-transaction-id (and Binlog v2) options are not required.

Edit 23/12/11 : Added index

No comments: