Thursday, 22 December 2011

Eventual Consistency in MySQL Cluster - implementation part 3

As promised, this is the final post in a series looking at eventual consistency with MySQL Cluster asynchronous replication. This time I'll describe the transaction dependency tracking used with NDB$EPOCH_TRANS and review some of the implementation properties.

Transaction based conflict handling with NDB$EPOCH_TRANS

NDB$EPOCH_TRANS is almost exactly the same as NDB$EPOCH, except that when a conflict is detected on a row, the whole user transaction which made the conflicting row change is marked as conflicting, along with any dependent transactions. All of these rejected row operations are then handled using inserts to an exceptions table and realignment operations. This helps avoid the row-shear problems described here.

Including user transaction ids in the Binlog

Ndb Binlog epoch transactions contain row events from all the user transactions which committed in an epoch. However there is no information in the Binlog indicating which user transaction caused each row event. To allow detected conflicts to 'rollback' the other rows modified in the same user transaction, the Slave applying an epoch transaction needs to know which user transaction was responsible for each of the row events in the epoch transaction. This information can now be recorded in the Binlog by using the --ndb-log-transaction-id MySQLD option. Logging Ndb user transaction ids against rows in-turn requires a v2 format RBR Binlog, enabled with the --log-bin-use-v1-row-events=0 option. The mysqlbinlog --verbose tool can be used to see per-row transaction information in the Binlog.

User transaction ids in the Binlog are useful for NDB$EPOCH_TRANS and more. One interesting possibility is to use the user transaction ids and same-row operation dependencies to sort the row events inside an epoch into a partial order. This could enable recovery to a consistent point other than an epoch boundary. A project for a rainy day perhaps?

NDB$EPOCH_TRANS multiple slave passes

Initially, NDB$EPOCH_TRANS proceeds in the same way as NDB$EPOCH, attempting to apply replicated row changes, with interpreted code attached to detect conflicts. If no row conflicts are detected, the epoch transaction is committed as normal with the same minimal overhead as NDB$EPOCH. However if a row conflict is detected, the epoch transaction is rolled back, and reapplied. This is where NDB$EPOCH_TRANS starts to diverge from NDB$EPOCH.

In this second pass, the user transaction ids of rows with detected conflicts are tracked, along with any inter-transaction dependencies detectable from the Binlog. At the end of the second pass, prior to commit, the set of conflicting user transactions is combined with the user transaction dependency data to get a complete set of conflicting user transactions. The epoch transaction initiated in the second pass is then rolled-back and a third pass begins.

In the third pass, only row events for non-conflicting transactions are applied, though these are still applied with conflict detecting interpreted programs attached in case a further conflict has arisen since the second pass. Conflict handling for row events belonging to conflicting transactions is performed in the same way as NDB$EPOCH. Prior to commit, the applied row events are checked for further conflicts. If further conflicts have occurred then the epoch transaction is rolled back again and we return to the second pass. If no further conflicts have occurred then the epoch transaction is committed.

These three passes, and associated rollbacks are only externally visible via new counters added to the MySQLD server. From an external observer's point of view, only non-conflicting transactions are committed, and all row events associated with conflicting transactions are handled as conflicts. As an optimisation, when transactional conflicts have been detected, further epochs are handled with just two passes (second and third) to improve efficiency. Once an epoch transaction with no conflicts has been applied, further epochs are initially handled with the more optimistic and efficient first pass.

Dependency tracking implementation

To build the set of inter-transaction dependencies and conflicts, two hash tables are used. The first is a unique hashmap mapping row event tables and primary keys to transaction ids. If two events for the same table and primary key are found in a single epoch transaction then there is a dependency between those events, specifically the second event depends on the first. If the events belong to different user transactions then there is a dependency between the transactions.

Transaction dependency detection hash :
{Table, Primary keys} -> {Transaction id}

The second hash table is a hashmap of transaction id to an in-conflict marker and a list of dependent user transactions. When transaction dependencies are discovered using the first dependency detection hash, the second hash is modified to reflect the dependency. By the end of processing the epoch transaction, all dependencies detectable from the Binlog are described.

Transaction dependency tracking and conflict marking hash :
{Transaction id} -> {in_conflict, List}

As epoch operations are applied and row conflicts are detected, the operation's user transaction id is marked in the dependency hash as in-conflict. When marking a transaction as in-conflict, all of its dependent transactions must also be transitively marked as in-conflict. This is done by a traverse through the dependency tree of the in-conflict transaction. Due to slave batching, the addition of new dependencies and the marking of conflicting transactions is interleaved, so adding a dependency can result in a sub-tree being marked as in-conflict.

After the second pass is complete, the transaction dependency hash is used as a simple hash for looking up whether a particular transaction id is in conflict or not :

Transaction in-conflict lookup hash :
{Transaction id} -> {in_conflict}

This is used in the third pass to determine whether to apply each row event, or to proceed straight to conflict handling.

The size of these hashes, and the complexity of the dependency graph is bounded by the size of the epoch transaction. There is no need to track dependencies across the boundary of two epoch transactions, as any dependencies will be discovered via conflicts on the data committed by the first epoch transaction when attempting to apply the second epoch transaction.

Event counters

Like the existing conflict detection functions, NDB$EPOCH_TRANS has a row-conflict detection counter called ndb_conflict_epoch_trans.

Additional counters have been added which specifically track the different events associated with transactional conflict detection. These can be seen with the usual SHOW GLOBAL STATUS LIKE syntax, or via the INFORMATION_SCHEMA tables.

  • ndb_conflict_trans_row_conflict_count
    This is essentially the same as ndb_conflict_epoch_trans - the number of row events with conflict detected.
  • ndb_conflict_trans_row_reject_count
    The number of row events which were handled as in-conflict. It will be at least as large as ndb_conflict_trans_row_count, and will be higher if other rows are implicated by being in a conflicting transaction, or being dependent on a row in a conflicting transaction.
    A separate ndb_conflict_trans_row_implicated_count could be constructed as ndb_conflict_trans_row_reject_count - ndb_conflict_trans_row_conflict_count
  • ndb_conflict_trans_reject_count
    The number of discrete user transactions detected as in-conflict.
  • ndb_conflict_trans_conflict_commit_count
    The number of epoch transactions which had transactional conflicts detected during application.
  • ndb_conflict_trans_detect_iter_count
    The number of iterations of the three-pass algorithm that have occurred. Each set of passes counts as one. Normally this would be the same as ndb_conflict_trans_conflict_commit_count. Where further conflicts are found on the third pass, another iteration may be required, which would increase this count. So if this count is larger than ndb_conflict_trans_conflict_commit_count then there have been some conflicts generated concurrently with conflict detection, perhaps suggesting a high conflict rate.

Performance properties of NDB$EPOCH and NDB$EPOCH_TRANS

I have tried to avoid getting involved in an explanation of Ndb replication in general which would probably fill a terabyte of posts. Comparing replication using NDB$EPOCH and NDB$EPOCH_TRANS relative to Ndb replication with no conflict detection, what can we can say?

  • Conflict detection logic is pushed down to data nodes for execution
    Minimising extra data transfer + locking
  • Slave operation batching is preserved
    Multiple row events are applied together, saving MySQLD <-> data node round trips, using data node parallelism
    For both algorithms, one extra MySQLD <-> data node round-trip is required in the no-conflicts case (best case)
  • NDB$EPOCH : One extra MySQLD <-> data node round-trip is required per *batch* in the all-conflicts case (worst case)
  • NDB$EPOCH : Minimal impact to Binlog sizes - one extra row event per epoch.
  • NDB$EPOCH : Minimal overhead to Slave SQL CPU consumption
  • NDB$EPOCH_TRANS : One extra MySQLD <-> data node round-trip is required per *batch* per *pass* in the all-conflicts case (worst case)
  • NDB$EPOCH_TRANS : One round of two passes is required for each conflict newly created since the previous pass.
  • NDB$EPOCH_TRANS : Small impact to Binlog sizes - one extra row event per epoch plus one user transaction id per row event.
  • NDB$EPOCH_TRANS : Small overhead to Slave SQL CPU consumption in no-conflict case

Current and intrinsic limitations

These functions support automatic conflict detection and handling without schema or application changes, but there are a number of limitations. Some limitations are due to the current implementation, some are just intrinsic in the asynchronous distributed consistency problem itself.

Intrinsic limitations
  • Reads from the Secondary are tentative
    Data committed on the secondary may later be rolled back. The window of potential rollback is limited, after which Secondary data can be considered stable. This is described in more detail here.
  • Writes to the Secondary may be rolled back
    If this occurs, the fact will be recorded on the Primary. Once a committed write is stable it will not be rolled back.
  • Out-of-band dependencies between transactions are out-of-scope
    For example direct communication between two clients creating a dependency between their committed transactions, not observable from their database footprints.

Current implementation limitations

  • Detected transaction dependencies are limited to dependencies between binlogged writes (Insert, Update, Delete)
    Reads are not currently included.
  • Delete vs Delete+Insert conflicts risk data divergence
    Delete vs Delete conflicts are detected, but currently do not result in conflict handling, so that Delete vs Delete + Insert can result in data divergence.
  • With NDB$EPOCH_TRANS, unplanned Primary outages may require manual steps to restore Secondary consistency
    With pending multiple, time spaced, non-overlapping transactional conflicts, an unexpected failure may need some Binlog processing to ensure consistency.

Want to try it out?

Andrew Morgan has written a great post showing how to setup NDB$EPOCH_TRANS. He's even included non-ascii art. This is probably the easiest way to get started. NDB$EPOCH is slightly easier to get started with as the --ndb-log-transaction-id (and Binlog v2) options are not required.

Edit 23/12/11 : Added index


mihasusaec said...

Thanks for detailed explanations and answers!

Sorry, I have many questions.

You wrote:
>With NDB$EPOCH_TRANS, unplanned Primary outages may require manual steps to restore Secondary consistency

I can't understand it.

1. If I have active-active replication both servers are slave and primary in same time (or may be I' wrong?). How can I set role of server (primary or secondary) for NDB_EPOCH_TRANS() conflict resolution? I know that there is new variable for NDB_EPOCH2_TRANS() - ndb_slave_conflict_role, but conflict resolution and replicatoin must work even with NDB_EPOCH_TRANS(). I must make different configs at primary and secondary clusters? In Andrew Morgan URL configs are different, but I don't sure that they must be.

2. Andrew Morgan fill mysql.ndb_replication table with rows containing only one server_id but documentation says:
>Because the conflict detection algorithms employed by NDB$EPOCH() and NDB$EPOCH_TRANS() are asymmetric, you must use different values for the primary slave's and secondary slave's server_id entries.

I suppose I must also insert same data with different server id's, but don't sure.

3. Is it necessary to create additional table some_table$EX for working replication and conflict resolution or they are just for users of database?

4. Also I don't understand if it is normal to get " The incident LOST_EVENTS occured on the master." error and if it is normal, ho to fix it.

I have two servers, let say with server-id 1 and 3. For now configs are same:

log-bin-trust-function-creators = 1

Also I have such rows in mysql.ndb_replication:
*************************** 1. row ***************************
db: devel
table_name: %
server_id: 1
binlog_type: 0
conflict_fn: NDB$EPOCH_TRANS()
*************************** 2. row ***************************
db: devel
table_name: %
server_id: 3
binlog_type: 0
conflict_fn: NDB$EPOCH_TRANS()

All seems to work. After that I'm restarting server with id 3. Slave is working at this server after restart, but it is stopped at server with id 1 (error "LOST EVENT ..."). So, I'm trying to fix it as described in documentation(? :
firstly, I run

SELECT @latest:=MAX(epoch) FROM mysql.ndb_apply_status WHERE server_id NOT IN (1);

at server with id 1


SELECT @file:=SUBSTRING_INDEX(next_file, '/', -1), @pos:=next_position FROM mysql.ndb_binlog_index WHERE epoch = 5370031969927176 ORDER BY epoch ASC LIMIT 1;

at server with id 3 (5370031969927176 is value of @latest)

And after than I running

with @file and @pos variables at server 1, but I still get "LOST EVENTS ...".

It seems to me that this is happen because of log file and position with local id are not changing in mysql.ndb_binlog_index for some reason. I have seen several times this behaviour. If I run

SELECT * from ndb_apply_status;

I get appropriate file and position of remote server (for example, 3), but old position of local(for example, 1). If I run same query at remote server (3), I will get appropriate file and position of 1, but old of 3. It is strange.

What I'm doing wrong?

5. Returning to the first quote. Even if in some way I will set up working active-active replication with conflict resolution and failover, there are still cases where I must restore consistency by hands?

I have asked some questions here,637858,637858#msg-637858 but nobody answered.

Sorry for such big comment and so many questions in it.


mihasusaec said...

Oh, sorry, it seems from your post "implementation part 1" that configs must be different and
mysql.ndb_apply_status must be only at PRIMARY cluster.

mihasusaec said...

I have read again official documentation, some of your post and Andrew Morgan posts and I suppose that only one server must resolve conflicts and that I need tables with $EX for resolving conflicts. But I still don't sure about configs and ndb_replication table.

Am I understand correctly that ndb-log-apply-status must be set to 1 only at SECONDARY cluster?

And I must not replicate $EX tables to another cluster, but must replicate mysql.ndb_replication, am I right?


Frazer Clement said...

1 + 2
For NDB$EPOCH and NDB$EPOCH_TRANS, the conflict algorithm must be specified in the ndb_replication table for the Slave Server on the cluster which is to be PRIMARY. The SECONDARY cluster needs no special entry.
For NDB$EPOCH2 and NDB$EPOCH2_TRANS, the conflict algorithm is specified for the Slave Servers on both clusters, and the ndb_slave_conflict_role variable is used to move the PRIMARY role between them.

The $EX tables are for collecting details about conflicts discovered. You may or may not wish to do this, depending on your schema, application, needs etc... They are not needed for the NDB$EPOCH algorithms to work.

LOST EVENTS occurs whenever a MySQLD server is restarted, or is disconnected from the MySQL Cluster data nodes. It inserts a 'LOST EVENTS' event into the Binlog, which causes any Slave MySQLD to stop. This is done so that 'gaps' in the flow of events are not ignored.
Normally we recommend that there are at least two separate binlogging MySQLDs, so that there is always a binlog containing events even when one is restarted for whatever reason. In this scenario, you should attempt to use the 'other' Binlogging MySQLD, which we refer to as 'replication channel cutover'. Of course, if some problem affects both MySQLDs then there may be an overlapping gap in both Binlogs, and this requires a Backup + Restore from Master -> Slave to resolve.
In your testing case I guess that you know that the gap has nothing relevant in it, and so you need to manually 'step over' it using SQL_SLAVE_SKIP_COUNTER. Another alternative is to flush the binary log so that it rotates, and then use CHANGE MASTER to position on the new log etc...

to be continued...

Frazer Clement said...


The NDB$EPOCH* algorithms work by having the PRIMARY cluster decide about whether SECONDARY changes are in conflict, and reverting them if necessary. This is done asynchronously. Once a conflict is detected on some set of rows, any further incoming SECONDARY changes to those rows will be considered to be in conflict until the PRIMARY's corrective action has been replication to the SECONDARY and back.
This means that changes committed at different times by the SECONDARY, affecting the same rows (or with the _TRANS algorithms, having some dependency on the same rows), can be found to be in-conflict. They will be reverted in the order they were committed, and this reversion can occur over time as well, subject to asynchronous replication latencies in both directions.
Consider a scenario where
- The SECONDARY has committed a conflicting transaction, followed by a number of dependent (and therefore also conflicting) transactions over time.
- The PRIMARY detects the first one or more of these and issues corrective actions, which are applied on the SECONDARY.
- The PRIMARY fails
- It will not issue corrective actions for the dependent SECONDARY transactions.
- But the SECONDARY has been partially corrected.

This is the 'unplanned PRIMARY outage' scenario that I described.
In 7.4, some new status variables were added to help with detecting this scenario including :
- The last epoch in which a conflict was detected (PRIMARY) or corrected (SECONDARY)
- The last epoch in which no corrective actions were known to be in-flight.

If the PRIMARY fails then for example the ndb_conflict_last_stable_epoch on the SECONDARY Slave can be compared to the ndb_slave_max_replicated_epoch.

If ndb_slave_max_replicated_epoch > ndb_conflict_last_stable_epoch then there were corrective actions in-flight and the SECONDARY may have conflict issues, due to changes applied since the last_stable_epoch.

If ndb_slave_max_replicated_epoch = ndb_conflict_last_stable_epoch then there were no corrective actions in-flight and the SECONDARY state is good.

Probably you would prefer a simpler answer, but this is a limitation of the system today.

Other questions :
> Am I understand correctly that ndb-log-apply-status must be set to 1 only at SECONDARY cluster?

It is only necessary to set it there, but it can be set at both without ill-effects. I would recommend setting it at both.

> And I must not replicate $EX tables to another cluster, but must replicate mysql.ndb_replication, am I right?

I think the $EX tables will not replicate - I think this is disabled. mysql.ndb_replication can be replicated if you wish, or not.

Hope this helps,