Monday 19 December 2011

Eventual consistency in MySQL Cluster - implementation part 2




In previous posts I described how row conflicts are detected using epochs. In this post I describe how they are handled.

Row based conflict handling with NDB$EPOCH


Once a row conflict is detected, as well as rejecting the row change, row based conflict handling in the Slave will :
  • Increment conflict counters
  • Optionally insert a row into an exceptions table
For NDB$EPOCH, conflict detection and handling operates on one Cluster in an Active-Active pair designated as the Primary. When a Slave MySQLD attached to the Primary Cluster detects a conflict between data stored in the Primary and a replicated event from the Secondary, it needs to realign the Secondary to store the same values for the conflicting data. Realignment involves injecting an event into the Primary Cluster's Binlog which, when applied idempotently on the Secondary Cluster, will force the row on the Secondary Cluster to take the supplied values. This requires either a WRITE_ROW event, with all columns, or a DELETE_ROW event with just the primary key columns. These events can be thought of as compensating events used to revert the original effect of the rejected events.

Conflicts are detected by a Slave MySQLD attached to the Primary Cluster, and realignment events must appear in Binlogs recorded by the same MySQLD and/or other Binlogging MySQLDs attached to the Primary Cluster. This is achieved using a new NdbApi primary key operation type called refreshTuple.

When a refreshTuple operation is executed it will :
  1. Lock the affected row/primary key until transaction commit time, even if it does not exist (much as an Insert would).
  2. Set the affected row's author metacolum to 0
    The refresh is logically a local change
  3. On commit
    - Row exists case : Set the row's last committed epoch to the current epoch
    - Cause a WRITE_ROW (row exists case) or DELETE_ROW (no row exists) event to be generated by attached Binlogging MySQLDs.

Locking the row as part of refreshTuple serialises the conflicting epoch transaction with other potentially conflicting local transactions. Updating the stored epoch and author metacolumns results in the conflicting row conflicting with any further replicated changes occurring while the realignment event is 'in flight'. The compensating row events are effectively new row changes originating at the Primary cluster which need to be monitored for conflicts in the same way as normal row changes.

It is important that the Slave running at the Secondary Cluster where the realignment events will be applied, is running in idempotent mode, so that it can handle the realignment events correctly. If this is not the case then WRITE_ROW realignment events may hit 'Row already exists' errors, and DELETE_ROW realignment events may hit 'Row does not exist' errors.

Observations on conflict windows and consistency

When a conflict is detected, the refresh process results in the row's epoch and author metacolumns being modified so that the window of potential conflict is extended, until the epoch in which the refresh operation was recorded has itself been reflected. If ongoing updates at both clusters continually conflict then refresh operations will continue to be generated, and the conflict window will remain open until a refresh operation manages to propagate with no further conflicts occurring. As with any eventually consistent system, consistency is only guaranteed when the system (or at least the data of interest) is quiescent for a period.

From the Primary cluster's point of view, the conflict window length is the time between committing a local transaction in epoch n, and the attached Slave committing a replicated epoch transaction indicating that epoch n has been applied at the Secondary. Any Secondary-sourced overlapping change applied in this time is in-conflict.

This Cluster conflict window length is comprised of :

  • Time between commit of transaction, and next Primary Cluster epoch boundary
    (Worst = 1 * TimeBetweenEpochs, Best = 0, Avg = 0.5 * TimeBetweenEpochs)
  • Time required to log event in Primary Cluster's Binlogging MySQLDs Binlog (~negligible)
  • Time required for Secondary Slave MySQLD IO thread to
    - Minimum : Detect new Binlog data - negligible
    - Maximum : Consume queued Binlog prior to the new data - unbounded
    - Pull new epoch transaction
    - Record in Relay log
  • Time required for Secondary Slave MySQLD SQL thread to
    - Minimum : Detect new events in relay log
    - Maximum : Consume queued Relay log prior to new data - unbounded
    - Read and apply events
    - Potentially multiple batches.
    - Commit epoch transaction at Secondary
  • Time between commit of replicated epoch transaction and next Secondary Cluster epoch boundary
    (Worst = 1 * TimeBetweenEpochs, Best = 0, Avg = 0.5 * TimeBetweenEpochs)
  • After this point a Secondary-local commit on the data is possible without conflict
  • Time required to log event in Secondary Cluster's Binlogging MySQLDs Binlog (~negligible)
  • Time required for Primary Slave MySQLD IO thread to
    - Minimum : Detect new Binlog data
    - Maximum : Consume queued Binlog data prior to the new data - unbounded
    - Pull new epoch transaction
    - Record in Relay log
  • Time required for Primary Slave MySQLD SQL thread to
    - Minimum : Detect new events in relay log
    - Maximum : Consume queued Relay log prior to new data - unbounded
    - Read and apply events
    - Potentially multiple batches.
    - For NDB$EPOCH_TRANS, potentially multiple passes
    - Commit epoch transaction
    - Update max replicated epoch to reflect new maximum.
  • Further Secondary sourced modifications to the rows are now considered not-in-conflict

From the point of view of an external client with access to both Primary and Secondary clusters, the conflict window only extends from the time transaction commit occurs at the Primary to the time the replicated operations are applied at the Secondary, and its commit time Secondary epoch ends. Changes committed at the Secondary after this will clearly appear to the Primary to have occurred after its epoch was applied on the Secondary and therefore are not in-conflict.

Assuming that both Clusters have the same TimeBetweenEpochs, we can simplify the Cluster conflict window to :
  Cluster_conflict_window_length = EpochDelay +
P_Binlog_lag +
S_Relay_lag +
S_Binlog_lag +
P_Relay_lag

Where
EpochDelay minimum is 0
EpochDelay avg is TimeBetweenEpochs
EpochDelay maximum is 2 * TimeBetweenEpochs


Substituting the default value of TimeBetweenEpochs of 100 millis, we get :
     EpochDelay minimum is 0
EpochDelay avg is 100 millis
EpochDelay maximum is 200 millis


Note that TimeBetweenEpochs is an epoch-increment trigger delay. The actual experienced time between epochs can be longer depending on system load. The various Binlog and Relay log delays can vary from close to zero up to infinity. Infinity occurs when replication stops in either direction.

The Cluster conflict window length can be thought of as both
  • The time taken to detect a conflict with a Primary transaction
  • The time taken for a committed Secondary transaction to become stable or be reverted

We can define a Client conflict window length as either :
 Primary->Secondary

Client_conflict_window_length = EpochDelay +
P_Binlog_lag +
S_Relay_lag +
EpochDelay

or

Secondary->Primary

Client_conflict_window_length = EpochDelay +
S_Binlog_lag +
P_Relay_lag

Where EpochDelay is defined as above.


These definitions are asymmetric. They represent the time taken by the system to determine that a particular change at one cluster definitely happened-before another change at the other cluster. The asymmetry is due to the need for the Secondary part of a Primary->Secondary conflict to be recorded in a different Secondary epoch. The first definition considers an initial change at the Primary cluster, and a following change at the Secondary. The second definition is for the inverse case.

An interesting observation is that for a single pair of near-concurrent updates at different clusters, happened-before depends only on latencies in one direction. For example, an update to the Primary at time Ta, followed by an update to the Secondary at time Tb will not be considered in conflict if:

 Tb - Ta > Client_conflict_window_length(Primary->Secondary)


Client_conflict_window_length(Primary->Secondary) depends on the EpochDelay, the P_Binlog_lag and S_Relay_lag, but not on the S_Binlog_lag or P_Relay_lag. This can mean that high replication latency, or a complete outage in one direction does not always result in increased conflict rates. However, in the case of multiple sequences of near-concurrent updates at different sites, it probably will.

A general property of the NDB$EPOCH family is that the conflict rate has some dependency on the replication latency. Whether two updates to the same row at times Ta and Tb are considered to be in conflict depends on the relationship between those times and the current system replication latencies. This can remove the need for highly synchronised real-time clocks as recommended for NDB$MAX, but can mean that the observed conflict rate increases when the system is lagging. This also implies that more work is required to catch up, which could further affect lag. NDB$MAX requires manual timestamp maintenance, and will not detect incorrect behaviour, but the basic decision on whether two updates are in-conflict is decided at commit time and is independent of the system replication latency.

In summary :
  • The Client_conflict_window_length in either direction will on average not be less than the EpochDelay (100 millis by default)
  • Clients racing against replication to update both clusters need only beat the current Client_conflict_window_length to cause a conflict
  • Replication latencies in either direction are potentially independent
  • Detected conflict rates partly depend on replication latencies

Stability of reads from the Primary Cluster

In the case of a conflict, the rows at the Primary Cluster will tentatively have replicated operations applied against them by a Slave MySQLD. These conflicting operations will fail prior to commit as their interpreted precondition checks will fail, therefore the conflicting rows will not be modified on the Primary. One effect of this is that a read from the Primary Cluster only ever returns stable data, as conflicting changes are never committed there. In contrast, a read from the Secondary Cluster returns data which has been committed, but may be subject to later 'rollback' via refresh operations from the Primary Cluster.

The same stability of reads observation applies to a row change event stream on the Primary Cluster - events received for a single key will be received in the order they were committed, and no later-to-be-rolled-back events will be observed in the stream.

Stability of reads from the Secondary Cluster

If the Secondary Cluster is also receiving reflected applied epoch information back from the Primary then it will know when it's epoch x has been applied successfully at the Primary. Therefore a read of some row y on the Secondary can be considered tentative while Max_Replicated_Epoch(Secondary) < row_epoch(y), but once Max_Replicated_Epoch(Secondary) >= row_epoch(y) then the read can be considered stable. This is because if the Primary were going to detect a conflict with a Secondary change committed in epoch x, then the refresh events associated with the conflict would be recorded in the same Primary epoch as the notification of the application of epoch x. So if the Secondary observes the notification of epoch x (and updates Max_Replicated_Epoch accordingly), and row y is not modified in the same epoch transaction, then it is stable. The time taken to reach stability after a Secondary Cluster commit will be the Cluster conflict window length.

Perhaps some applications can make better use of the potentially transiently inconsistent Secondary data by categorising their reads from the Secondary as either potentially-inconsistent or stable. To do this, they need to maintain Max_replicated_epoch(Secondary) (By listening to row change events on the ndb_apply_status table) and read the NDB$GCI_64 metacolumn when reading row data. A read from the Secondary is stable if all the NDB$GCI_64 values for all rows read are <= the Secondary's Max_Replicated_Epoch.

In the next post (final post I promise!) I will describe the implementation of the transaction dependency tracking in NDB$EPOCH_TRANS, and review the implementation of both NDB$EPOCH and NDB$EPOCH_TRANS.

Edit 23/12/11 : Added index

No comments: