Wednesday, 7 December 2011

Eventual Consistency in MySQL Cluster - using epochs




Before getting to the details of how eventual consistency is implemented, we need to look at epochs. Ndb Cluster maintains an internal distributed logical clock known as the epoch, represented as a 64 bit number. This epoch serves a number of internal functions, and is atomically advanced across all data nodes.

Epochs and consistent distributed state

Ndb is a parallel database, with multiple internal transaction coordinator components starting, executing and committing transactions against rows stored in different data nodes. Concurrent transactions only interact where they attempt to lock the same row. This design minimises unnecessary system-wide synchronisation, enabling linear scalability of reads and writes.

The stream of changes made to rows stored at a data node are written to a local Redo log for node and system recovery. The change stream is also published to NdbApi event listeners, including MySQLD servers recording Binlogs. Each node's change stream contains the row changes it was involved in, as committed by multiple transactions, and coordinated by multiple independent transaction coordinators, interleaved in a partial order.

  Incoming independent transactions
affecting multiple rows

T3 T4 T7
T1 T2 T5

| | |
V V V

-------- -------- --------
| 1 | | 2 | | 3 |
| TC | | TC | | TC | Data nodes with multiple
| |--| |--| | transaction coordinators
|------| |------| |------| acting on data stored in
| | | | | | different nodes
| DATA | | DATA | | DATA |
-------- -------- --------

| | |
V V V

t4 t4 t3
t1 t7 t2
t2 t1 t7
t5

Outgoing row change event
streams by causing
transaction


These row event streams are generated independently by each data node in a cluster, but to be useful they need to be correlated together. For system recovery from a crash, the data nodes need to recover to a cluster-wide consistent state. A state which contains only whole transactions, and a state which, logically at least, existed at some point in time. This correlation could be done by an analysis of the transaction ids and row dependencies of each recorded row change to determine a valid order for the merged event streams, but this would add significant overhead. Instead, the Cluster uses a distributed logical clock known as the epoch to group large sets of committed transactions together.

Each epoch contains zero or more committed transactions. Each committed transaction is in only one epoch. The epoch clock advances periodically, every 100 milliseconds by default. When it is time for a new epoch to start, a distributed protocol known as the Global Commit Protocol (GCP) results in all of the transaction coordinators in the Cluster agreeing on a point of time in the flow of committing transactions at which to change epoch. This epoch boundary, between the commit of the last transaction in epoch n, and the commit of the first transaction in epoch n+1, is a cluster-wide consistent point in time. Obtaining this consistent point in time requires cluster-wide synchronisation, between all transaction coordinators, but it need only happen periodically.

Furthermore, each node ensures that the all events for epoch n are published before any events for epoch n+1 are published. Effectively the event streams are sorted by epoch number, and the first time a new epoch is encountered signifies a precise epoch boundary.

 Incoming independent transactions

T3 T4 T7
T1 T2 T5

| | |
V V V

-------- -------- --------
| 1 | | 2 | | 3 |
| TC | | TC | | TC | Data nodes with multiple
| |--| |--| | transaction coordinators
|------| |------| |------| acting on data stored in
| | | | | | different nodes
| DATA | | DATA | | DATA |
-------- -------- --------

| | |
V V V

t4(22) t4(22) t3(22) Epoch 22
...... ...... ......
t1(23) t7(23) t2(23) Epoch 23
t2(23) t1(23) t7(23)
......
t5(24) Epoch 24

Outgoing row change event
streams by causing transaction
with epoch numbers in ()



When these independent streams are merge-sorted by epoch number we get a unified change stream. Multiple possible orderings can result.
One Partial ordering is shown here :

      Events      Transactions
contained in epoch

t4(22)
t4(22) {T4,T3}
t3(22)

......

t1(23)
t2(23)
t7(23)
t1(23) {T1, T2, T7}
t2(23)
t7(23)

......

t5(24) {T5}



Note that we can state from this that T4 -> T1 (Happened before), and T1 -> T5. However we cannot say whether T4 -> T3 or T3 -> T4. In epoch 23 we see that the row events resulting from T1, T2 and T7 are interleaved.

Epoch boundaries act as markers in the flow of row events generated by each node, which are then used as consistent points to recover to. Epoch boundaries also allow a single system wide unified transaction log to be generated from each node's row change stream, by merge-sorting the per-node row change streams by epoch number. Note that the order of events within an epoch is still not tightly constrained. As concurrent transactions can only interact via row locks, the order of events on a single row (Table and Primary key value) signifies transaction commit order, but there is by definition no order between transactions affecting independent row sets.

To record a Binlog of Ndb row changes, MySQLD listens to the row change streams arriving from each data node, and merge-sorts them them by epoch into a single, epoch-ordered stream. When all events for a given epoch have been received, MySQLD records a single Binlog transaction containing all row events for that epoch. This Binlog transaction is referred to as an 'Epoch transaction' as it describes all row changes that occurred in an epoch.

Epoch transactions in the Binlog

Epoch transactions in the Binlog have some interesting properties :
  • Efficiency : They can be considered a kind of Binlog group commit, where multiple user transactions are recorded in one Binlog (epoch) transaction. As an epoch normally contains 100 milliseconds of row changes from a cluster, this is a significant amortisation.
  • Consistency : Each epoch transaction contains the row operations which occurred when moving the cluster from epoch boundary consistent state A to epoch boundary consistent state B
    Therefore, when applied as a transaction by a slave, the slave will atomically move from consistent state A to consistent state B
  • Inter-epoch ordering : Any row event recorded in epoch n+1 logically happened after every row event in epoch n
  • Intra-epoch disorder : Any two row events recorded in epoch n, affecting different rows, may have happened in any order.
  • Intra-epoch key-order : Any two row events recorded in epoch n, affecting the same row, happened in the order they are recorded.

The ordering properties show that epochs give only a partial order, enough to subdivide the row change streams into self-consistent chunks. Within an epoch, row changes may be interleaved in any way, except that multiple changes to the same row will be recorded in the order they were committed.

Each epoch transaction contains the row changes for a particular epoch, and that information is recorded in the epoch transaction itself, as an extra WRITE_ROW event on a system table called mysql.ndb_apply_status. This WRITE_ROW event contains the binlogging MySQLD's server id and the epoch number. This event is added so that it will be atomically applied by the Slave along with the rest of the row changes in the epoch transaction, giving an atomically reliable indicator of the replication 'position' of the Slave relative to the Master Cluster in terms of epoch number. As the epoch number is abstracted from the details of a particular Master MySQLD's binlog files and offsets, it can be used to failover to an alternative Master.

We can visualise a MySQL Cluster Binlog as looking something like this. Each Binlog transaction contains one 'artificially generated' WRITE_ROW event at the start, and then RBR row events for all row changes that occurred in that epoch.

    BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=6998
WRITE_ROW ...
UPDATE_ROW ...
DELETE_ROW ...
...
COMMIT # Consistent state of the database

BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=6999
...
COMMIT # Consistent state of the database

BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=7000
...
COMMIT # Consistent state of the database
...


A series of epoch transactions, each with a special WRITE_ROW event for recording the epoch on the Slave. You can see this structure using the mysqlbinlog tool with the --verbose option.

Rows tagged with last-commit epoch

Each row in a MySQL Cluster stores a hidden metadata column which contains the epoch at which a write to the row was last committed. This information is used internally by the Cluster during node recovery and other operations. The ndb_select_all tool can be used to see the epoch numbers for rows in a table by supplying the --gci or --gci64 options. Note that the per-row epoch is not a row version, as two updates to a row in reasonably quick succession will have the same commit epoch.

Epochs and eventual consistency

Reviewing epochs from the point of view of my previous posts on eventual consistency we see that :
  • Epochs provide an incrementing logical clock
  • Epochs are recorded in the Binlog, and therefore shipped to Slaves
  • Epoch boundaries imply happened-before relationships between events before and after them in the Binlog

The properties mean that epochs are almost perfect for monitoring conflict windows in an active-active circular replication setup, with only a few enhancements required.

I'll describe these enhancements in the next post.

Edit 23/12/11 : Added index

23 comments:

Max said...

Very nice and clear post Frazer,you should become a trainer :)

A question I have regarding parallell procesing of the transactions on the slave side. How do you know which transactions within an epoch are on using the same primary key values? or do you?

Frazer Clement said...

Hi Max,
Good to hear from you!
Thanks for the compliments,

Regarding the slave, each epoch transaction is applied as a single transaction via the Ndb table handler.
When tracking inter-transaction dependencies (independent transactions accessing the same primary key), the Ndb handler uses an internal hash table, This hash table is maintained for the duration of the (epoch) transaction. The keys of the hash are the table id and primary key values. For each key, the hash stores a transaction id.
As each row event is applied, the Ndb handler inserts an entry into the hash with the event's table id and primary key values, and the event's user transaction id. If a hash entry already exists then the Ndb handler knows that some row event has already been applied to that row in this epoch transaction. If that event had a different transaction id to the current event then there is an inter-transaction dependency between the existing entry and the new entry.
So dependency tracking within an epoch is done using an in-memory hash table.
I hope that answers your question. Let me know if it's not clear.
Frazer

Max said...

Thanks Frazer, this does partly answer my question.

The remaining question would be that using this hash table you could then execute the queries within an epoch in parallell threads. Is this done? or has any work been done to this end?

Frazer Clement said...

Hi Max,
You're correct that the transaction dependencies can be used to find sets of independent transactions which can be executed in parallel.
For Cluster, all user transactions recorded in a single epoch transaction on the Master are applied as a single transaction on the slave. The parallelism of application on the slave is therefore only limited by the Slave execution batch size. In other words, we already have parallel application of all user transaction operations within an epoch on the Slave. These parallel operations share a single TC instance thread, but are fully parallel in terms of LQH execution (and are asynchronous with respect to each other).
Monitoring dependencies between separate Binlog transactions (epoch or otherwise) could be used to increase parallelism on the Slave. For Cluster it could be used to apply epochs in parallel, and for other engines such as InnoDB, it would give the inter-transaction parallelism which is currently missing.
I believe that the replication team are working on increasing the utility of the multithreaded slave work they have started (currently only multithreaded between DBs). I'm not sure if they're looking at key-based dependency analysis yet.
Frazer

Max said...

Thanks Frazer. Yes, my assumption was that since there would only be one binlog applier on the slave cluster that could easilily become a bottleneck. I mean on the MySQL server side, not the data nodes, as the binlog execution thread is also monothreaded on the server. But it might be that the load on the MySQL server is so small compared to the data nodes that this isn't an issue?

Frazer Clement said...

Hi Max,
Yes asynchronous replication through MySQL is single threaded in a number of places including the Binlog writing, the Slave IO thread and the Slave SQL thread. For Ndb, the TC thread handling each epoch transaction is also a single-threaded choke point. The Slave SQL thread is often the biggest bottleneck as it executes and commits transactions from the Binlog serially.
For Ndb I am not sure what the first bottleneck would be. Due to the batching and intra-epoch parallelism we rarely hear of users with replication performance problems.
If we were to have a performance problem due to single threading in the slave SQL thread then we could attempt to parallelise some part of the Slave work from a single serial Binlog, such as executing multiple epochs concurrently.

However, perhaps a better fix would be to parallelise across multiple replication 'channels', e.g. Binlogging Master MySQLDs and Applying Slave MySQLDs. This could be done by splitting the row change events by some PK hash on the Master Cluster, and having each of n Binlogging MySQLDs only receive and record a subset of the row changes in an epoch. These partial Binlogs can then be pulled and applied on the Slave in parallel by independent Slave MySQLDs. The problem then becomes ensuring consistency on the Slave Cluster if required. One option is to support a single transaction with multiple contributing clients, but a single commit point. This is not too tricky to implement in Ndb, but would still have a single TC thread as a potential bottleneck. Another option is to support a kind of distributed commit of independent partial-epoch transactions so that everything is parallel up to the point of commit.

The cost of synchronising multiple processes at commit time can be amortised across one or more epochs. The balance here is to reduce synchronisation as much as possible without replication latency becoming excessive. I suppose an ideal system would 'change gears' depending on the load + size of epochs received.

Anyway, this is all theory as currently async replication performance is not a big problem for our users.

Frazer

Edward Cullen said...
This comment has been removed by the author.
Edward Cullen said...

Hi Frazer
Comprehensive and nice explanation. how do we know which transaction cause particular event? for example , if multiple transaction are executing and updating the database , when we receive the update event , how do we know the transaction id or which specific transaction caused this event ?

Thanks.

Frazer Clement said...

Hi Edward,
Good question. Each event carries an internally generated 64-bit transaction id, which can be queried using the NdbApi event Api as the events are received.


From storage/ndb/include/ndbapi/NdbEventOperation.hpp

/**
* Retrieve the TransId of the latest retrieved event
*
* Only valid for data events. If the kernel does not
* support transaction ids with events, the max Uint64
* value is returned.
*
* @return TransId
*/
Uint64 getTransId() const;


This 64-bit number is the same one which was visible to the client at transaction prepare/commit time :

storage/ndb/include/ndbapi/NdbTransaction.hpp

/**
* Get transaction identity.
*
* @return Transaction id.
*/
Uint64 getTransactionId();

So the events which result from a transaction commit will have that transaction's id when received back as events from NdbApi.

Things to note :
1. The system natively produces 1 row event per changed row per transaction. Multiple row modifications within a transaction (ins, upd, del) will always be merged into one effective operation (e.g. del, ins -> upd; upd,upd -> upd, ins,del -> NULL)
2. The merge_events NdbApi option causes NdbApi to merge all events on a table per row *per epoch*. e.g. multiple transaction's events are merged together to give one event per row. This option is switched on for tables with Blobs, which is one reason they can't currently be used with NDB$EPOCH et al

From SQL, the NdbApi transaction ids are not really visible. It might be a nice feature to expose the NdbApi transaction id as e.g. a session status variable.
The NdbApi transaction id is put into the Binlog by the MySQLD Binlog Injector. It uses a feature of the 'v2' RBR events where each event can have arbitrary extra meta data to tag each row change with its causing transaction, allowing the slave to determine transaction boundaries.

Hope this helps,
Frazer

Edward Cullen said...

Thanks Frazer for your detailed explanation.

Edward Cullen said...

Hi Frazer
I used getTransId() from NdbEventOperation class to get the transaction id and compared with NdbTransaction getTransactionId() function, both values are not same.

I looked into the implementation of NdbEventOperation getTransId() it was like this

Uint64
NdbEventOperationImpl::getTransId() const
{
/* Return 64 bit composite */
Uint32 transId1 = m_data_item->sdata->transId1;
Uint32 transId2 = m_data_item->sdata->transId2;
return Uint64(transId1) << 32 | transId2;
}

NdbTransaction was like this
tTransId1 = (Uint32) theTransactionId;
tTransId2 = (Uint32) (theTransactionId >> 32);

lets assume the following case
NdbTransaction's transaction id is - 9014895836135425
According to implementation - tTransId1 =1 and tTransId2 = 2098944

when event api constructing the transaction id Uint64(transId1) << 32 | transId2 it will return 4297066240 which is wrong.

// Event api transaction id construction suppose be like this right ?

Uint64(transId2) << 32 | transId1?

then the answer will be correct. Please advise on this .

Thanks.

Edward Cullen said...

Uint64 transId = op->getTransId();
Uint64 actualTransId= Uint64((Uint32)transid) << 32 | transid>>32;

This would be the temporary solution, but I don't know why that user has to do all this modification?

Thanks.

Frazer Clement said...

Hi Edward,
Yes, you're right. That is a bug. Sorry about that.

I guess that we've only been using the transaction id to uniquely identify transactions in the event stream, rather than correlating with the source transaction, and so had somehow not noticed.

To fix we would probably leave this function as-is (in case someone has discovered and is doing the word-swap like you) and introduce a new function to return the 64-bit transaction id in the correct order.

Thanks for pointing this out, sorry for wasting your time.
Frazer

Edward Cullen said...

Hi Frazer
I agreed what you said here "correlating with the source transaction is not necessary".
in my case , I was just curious that , why these two are not same ?

Now I am clear . thank you so much .

Frazer Clement said...

No problem.

I've raised an internal bug for this problem you found.

Thanks for getting in touch, good luck!

Edward Cullen said...

Hi Frazer
I would like to know how NDB event API buffering the subscribed events when one of the thread which is processing the event is too slow. Is there any way get notification saying NDB Event API buffer is going drop the messages because of processing the event is slow ?

Thanks.

Regards
Edwards.

Edward Cullen said...

Hi Frazer
As you mentioned, transaction are not in order in particular epch frame, can we arrange those transaction by using transaction ids.I did some experiment and observed, using transaction id , we can order the transaction in the specific epoch frame. Is there any other ways to do that.

thanks.

Frazer Clement said...

Hi Edward,
The Ndb Event API buffer generally does not drop messages because the
application is too slow - it continues buffering indefinitely. Recently
we have added a maximum buffer size, but the first iteration of this
feature will kill the API process when the limit is reached! We have a
second iteration in the 7.4 release which will instead drop events and
insert a 'TE_INCONSISTENT' event into the event stream.

In terms of notification, there are parameters which can be set which
result in the NdbApi generating cluster log events. These are :

void setReportThreshEventGCISlip(unsigned thresh);
void setReportThreshEventFreeMem(unsigned thresh);

The GCISlip is the number of buffered epochs above which the API node
will generate a warning log on *every* epoch boundary.
The FreeMem is a remaining-free percent level which will generate a
single log when it is passed. Note that the free percent is relative
to the amount of allocated buffer, but this can grow.

You can monitor these events using the MGMAPI - they can be parsed to determine the current levels.

From your NdbApi application you can locally monitor the difference between
the 'latestGCI' as reported by various Event APIs, and the most recent
GCI you have dequeued. There is not currently a way to observe the
buffer size from NdbApi.

If the NdbApi internal threads are too slow acknowledging epochs back
to the data nodes then eventually (MaxBufferedEpochs), the data nodes
will disconnect them to limit the buffering required. This
epoch-acknowledgement occurs automatically inside NdbApi and is
independent of whether the user threads are fast or slow at dequeueing
events from the Event Buffer.

Frazer

Frazer Clement said...

Hi Edward,
Using the transaction ids you can identify which events belong to which transactions.
The ids themselves do not have any meaningful ordering - even from a single client thread they only reflect transaction 'start' order, not commit order.
However you can use the order of events for transactions with same-key dependencies in an epoch to provide a partial ordering of the transactions. This will ensure that two transactions which modify the same row are ordered correctly w.r.t each other. Transactions with no dependencies can be sorted in any order.
Note that as we do not have information about read dependencies in the event stream, we may choose a partial sort order which is in fact incorrect.
e.g. :
Trans 1 reads Row 1 and inserts Row 1000
Trans 2 reads Row 1000 and writes Row 1

We would have no relevant ordering information about these transactions and so could order them (1,2) or (2,1). The (2,1) ordering is not correct, and depending on what you use the ordered sequence of transactions for, may cause problems.

Frazer

Edward Cullen said...

Thanks Frazer, Very nice and clean explanation. In your scenario, using transaction id ordering would be a mess. But , how about the following situation.

One application is just inserting the value into database
for(int i=0; i < somevalue; ++i)
{
//start new transaction
NdbTransaction *myTransaction= myNdb.startTransaction();
myOperation->insertTuple();
myOperation->equal("COL0", i);
myOperation->setValue("COL1", i+1);
..................
.................


if (myTransaction->execute( NdbTransaction::Commit ) == -1)
APIERROR(myTransaction->getNdbError());
//close transaction here
myNdb.closeTransaction(myTransaction);

}
We can't predict that how these transactions are executing at database. ( the order) , when other application listening from the event API , TransIDs might be like
GCI -1
------------------
time t - {tid - 150, values - 4,....}
time t+1 -{tid - 140, values - 2,....}
time t+2 -{tid - 155, values - 5,....}
time t+3 -{tid - 130, values - 1,....}
time t+4 -{tid - 149, values - 3,....}

In this case , application which is listening an event API has to re-order them according to transaction ids right ? otherwise, how application knows this is the first value I should consider and this is the second value ? If application thinks , 4 is the first value then , there will be problem right. This is what I was asking in the previous question.

Please explain this.

Thanks.

Frazer Clement said...

Hi Edward,

From a single client (Ndb object), the transaction ids happen to be ordered, but that's really an internal implementation detail rather than a published interface. It could change in a future release.

We do have a mechanism for transferring some arbitrary metadata from NdbApi clients to event subscribers. At the NdbApi layer the metadata is referred to as 'AnyValue'. It can be set per NdbOperation (insert, update, delete), and will be available from the event Api by calling NdbEventOperation::getAnyValue().

Within MySQL Cluster, we generally use this to indicate e.g. the slave serverid in replication, or an indicator that a change should not be binlogged.

If you need to interoperate with attached MySQL servers writing Binlogs then you can configure them to ignore some of the bits in AnyValue, which you can then use for your own purposes. The relevant parameter is --server-id-bits.

This could be used to indicate some ordering information to event Api subscribers.

Alternatively you could of course include ordering information in the data written.

Hope this helps,
Frazer

Edward Cullen said...

Hi Frazer
Thank you so much, as you said, we can simply use setAnyValue to order the events that we receiving from API.

I need to clarify one more thing in API, when I analyse the EventPAIMemoryUsage allocated , used memory fields, I couldn't understand why allocated memory is increasing every time when I am pooling.

I initially setup with 1024 MB buffer size and tried to poll every millisecond. The result was , allocated memory is keep increasing (some period , it is fixed) , used memory is actually fluctuating. does this mean API is keep allocating the memory until it reach the maximum event memory limit set by us?

Thanks.

Regards
Edward.

Frazer Clement said...

Hi Edward,
Sorry for the delay in responding. Good that you have the ordering info you need now.
The Event API internally does some 'caching' of allocated memory to minimise dependencies in the underlying allocator. This means that memory usage is generally grow-only, and usage peaks are persistent. Additionally, the cached memory is used in a FIFO fashion which can result in further allocation even though there is sufficient free memory available to the event API. This behaviour is designed for applications with a standard steady state event flow - eventually memory allocation will plateau and there will be no use made of the underlying allocator.
To better handle more dynamic situations we are looking at improving this in future releases.

Frazer