Wednesday 7 December 2011

Eventual Consistency in MySQL Cluster - using epochs




Before getting to the details of how eventual consistency is implemented, we need to look at epochs. Ndb Cluster maintains an internal distributed logical clock known as the epoch, represented as a 64 bit number. This epoch serves a number of internal functions, and is atomically advanced across all data nodes.

Epochs and consistent distributed state

Ndb is a parallel database, with multiple internal transaction coordinator components starting, executing and committing transactions against rows stored in different data nodes. Concurrent transactions only interact where they attempt to lock the same row. This design minimises unnecessary system-wide synchronisation, enabling linear scalability of reads and writes.

The stream of changes made to rows stored at a data node are written to a local Redo log for node and system recovery. The change stream is also published to NdbApi event listeners, including MySQLD servers recording Binlogs. Each node's change stream contains the row changes it was involved in, as committed by multiple transactions, and coordinated by multiple independent transaction coordinators, interleaved in a partial order.

  Incoming independent transactions
affecting multiple rows

T3 T4 T7
T1 T2 T5

| | |
V V V

-------- -------- --------
| 1 | | 2 | | 3 |
| TC | | TC | | TC | Data nodes with multiple
| |--| |--| | transaction coordinators
|------| |------| |------| acting on data stored in
| | | | | | different nodes
| DATA | | DATA | | DATA |
-------- -------- --------

| | |
V V V

t4 t4 t3
t1 t7 t2
t2 t1 t7
t5

Outgoing row change event
streams by causing
transaction


These row event streams are generated independently by each data node in a cluster, but to be useful they need to be correlated together. For system recovery from a crash, the data nodes need to recover to a cluster-wide consistent state. A state which contains only whole transactions, and a state which, logically at least, existed at some point in time. This correlation could be done by an analysis of the transaction ids and row dependencies of each recorded row change to determine a valid order for the merged event streams, but this would add significant overhead. Instead, the Cluster uses a distributed logical clock known as the epoch to group large sets of committed transactions together.

Each epoch contains zero or more committed transactions. Each committed transaction is in only one epoch. The epoch clock advances periodically, every 100 milliseconds by default. When it is time for a new epoch to start, a distributed protocol known as the Global Commit Protocol (GCP) results in all of the transaction coordinators in the Cluster agreeing on a point of time in the flow of committing transactions at which to change epoch. This epoch boundary, between the commit of the last transaction in epoch n, and the commit of the first transaction in epoch n+1, is a cluster-wide consistent point in time. Obtaining this consistent point in time requires cluster-wide synchronisation, between all transaction coordinators, but it need only happen periodically.

Furthermore, each node ensures that the all events for epoch n are published before any events for epoch n+1 are published. Effectively the event streams are sorted by epoch number, and the first time a new epoch is encountered signifies a precise epoch boundary.

 Incoming independent transactions

T3 T4 T7
T1 T2 T5

| | |
V V V

-------- -------- --------
| 1 | | 2 | | 3 |
| TC | | TC | | TC | Data nodes with multiple
| |--| |--| | transaction coordinators
|------| |------| |------| acting on data stored in
| | | | | | different nodes
| DATA | | DATA | | DATA |
-------- -------- --------

| | |
V V V

t4(22) t4(22) t3(22) Epoch 22
...... ...... ......
t1(23) t7(23) t2(23) Epoch 23
t2(23) t1(23) t7(23)
......
t5(24) Epoch 24

Outgoing row change event
streams by causing transaction
with epoch numbers in ()



When these independent streams are merge-sorted by epoch number we get a unified change stream. Multiple possible orderings can result.
One Partial ordering is shown here :

      Events      Transactions
contained in epoch

t4(22)
t4(22) {T4,T3}
t3(22)

......

t1(23)
t2(23)
t7(23)
t1(23) {T1, T2, T7}
t2(23)
t7(23)

......

t5(24) {T5}



Note that we can state from this that T4 -> T1 (Happened before), and T1 -> T5. However we cannot say whether T4 -> T3 or T3 -> T4. In epoch 23 we see that the row events resulting from T1, T2 and T7 are interleaved.

Epoch boundaries act as markers in the flow of row events generated by each node, which are then used as consistent points to recover to. Epoch boundaries also allow a single system wide unified transaction log to be generated from each node's row change stream, by merge-sorting the per-node row change streams by epoch number. Note that the order of events within an epoch is still not tightly constrained. As concurrent transactions can only interact via row locks, the order of events on a single row (Table and Primary key value) signifies transaction commit order, but there is by definition no order between transactions affecting independent row sets.

To record a Binlog of Ndb row changes, MySQLD listens to the row change streams arriving from each data node, and merge-sorts them them by epoch into a single, epoch-ordered stream. When all events for a given epoch have been received, MySQLD records a single Binlog transaction containing all row events for that epoch. This Binlog transaction is referred to as an 'Epoch transaction' as it describes all row changes that occurred in an epoch.

Epoch transactions in the Binlog

Epoch transactions in the Binlog have some interesting properties :
  • Efficiency : They can be considered a kind of Binlog group commit, where multiple user transactions are recorded in one Binlog (epoch) transaction. As an epoch normally contains 100 milliseconds of row changes from a cluster, this is a significant amortisation.
  • Consistency : Each epoch transaction contains the row operations which occurred when moving the cluster from epoch boundary consistent state A to epoch boundary consistent state B
    Therefore, when applied as a transaction by a slave, the slave will atomically move from consistent state A to consistent state B
  • Inter-epoch ordering : Any row event recorded in epoch n+1 logically happened after every row event in epoch n
  • Intra-epoch disorder : Any two row events recorded in epoch n, affecting different rows, may have happened in any order.
  • Intra-epoch key-order : Any two row events recorded in epoch n, affecting the same row, happened in the order they are recorded.

The ordering properties show that epochs give only a partial order, enough to subdivide the row change streams into self-consistent chunks. Within an epoch, row changes may be interleaved in any way, except that multiple changes to the same row will be recorded in the order they were committed.

Each epoch transaction contains the row changes for a particular epoch, and that information is recorded in the epoch transaction itself, as an extra WRITE_ROW event on a system table called mysql.ndb_apply_status. This WRITE_ROW event contains the binlogging MySQLD's server id and the epoch number. This event is added so that it will be atomically applied by the Slave along with the rest of the row changes in the epoch transaction, giving an atomically reliable indicator of the replication 'position' of the Slave relative to the Master Cluster in terms of epoch number. As the epoch number is abstracted from the details of a particular Master MySQLD's binlog files and offsets, it can be used to failover to an alternative Master.

We can visualise a MySQL Cluster Binlog as looking something like this. Each Binlog transaction contains one 'artificially generated' WRITE_ROW event at the start, and then RBR row events for all row changes that occurred in that epoch.

    BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=6998
WRITE_ROW ...
UPDATE_ROW ...
DELETE_ROW ...
...
COMMIT # Consistent state of the database

BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=6999
...
COMMIT # Consistent state of the database

BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=7000
...
COMMIT # Consistent state of the database
...


A series of epoch transactions, each with a special WRITE_ROW event for recording the epoch on the Slave. You can see this structure using the mysqlbinlog tool with the --verbose option.

Rows tagged with last-commit epoch

Each row in a MySQL Cluster stores a hidden metadata column which contains the epoch at which a write to the row was last committed. This information is used internally by the Cluster during node recovery and other operations. The ndb_select_all tool can be used to see the epoch numbers for rows in a table by supplying the --gci or --gci64 options. Note that the per-row epoch is not a row version, as two updates to a row in reasonably quick succession will have the same commit epoch.

Epochs and eventual consistency

Reviewing epochs from the point of view of my previous posts on eventual consistency we see that :
  • Epochs provide an incrementing logical clock
  • Epochs are recorded in the Binlog, and therefore shipped to Slaves
  • Epoch boundaries imply happened-before relationships between events before and after them in the Binlog

The properties mean that epochs are almost perfect for monitoring conflict windows in an active-active circular replication setup, with only a few enhancements required.

I'll describe these enhancements in the next post.

Edit 23/12/11 : Added index

50 comments:

Max said...

Very nice and clear post Frazer,you should become a trainer :)

A question I have regarding parallell procesing of the transactions on the slave side. How do you know which transactions within an epoch are on using the same primary key values? or do you?

Frazer Clement said...

Hi Max,
Good to hear from you!
Thanks for the compliments,

Regarding the slave, each epoch transaction is applied as a single transaction via the Ndb table handler.
When tracking inter-transaction dependencies (independent transactions accessing the same primary key), the Ndb handler uses an internal hash table, This hash table is maintained for the duration of the (epoch) transaction. The keys of the hash are the table id and primary key values. For each key, the hash stores a transaction id.
As each row event is applied, the Ndb handler inserts an entry into the hash with the event's table id and primary key values, and the event's user transaction id. If a hash entry already exists then the Ndb handler knows that some row event has already been applied to that row in this epoch transaction. If that event had a different transaction id to the current event then there is an inter-transaction dependency between the existing entry and the new entry.
So dependency tracking within an epoch is done using an in-memory hash table.
I hope that answers your question. Let me know if it's not clear.
Frazer

Max said...

Thanks Frazer, this does partly answer my question.

The remaining question would be that using this hash table you could then execute the queries within an epoch in parallell threads. Is this done? or has any work been done to this end?

Frazer Clement said...

Hi Max,
You're correct that the transaction dependencies can be used to find sets of independent transactions which can be executed in parallel.
For Cluster, all user transactions recorded in a single epoch transaction on the Master are applied as a single transaction on the slave. The parallelism of application on the slave is therefore only limited by the Slave execution batch size. In other words, we already have parallel application of all user transaction operations within an epoch on the Slave. These parallel operations share a single TC instance thread, but are fully parallel in terms of LQH execution (and are asynchronous with respect to each other).
Monitoring dependencies between separate Binlog transactions (epoch or otherwise) could be used to increase parallelism on the Slave. For Cluster it could be used to apply epochs in parallel, and for other engines such as InnoDB, it would give the inter-transaction parallelism which is currently missing.
I believe that the replication team are working on increasing the utility of the multithreaded slave work they have started (currently only multithreaded between DBs). I'm not sure if they're looking at key-based dependency analysis yet.
Frazer

Max said...

Thanks Frazer. Yes, my assumption was that since there would only be one binlog applier on the slave cluster that could easilily become a bottleneck. I mean on the MySQL server side, not the data nodes, as the binlog execution thread is also monothreaded on the server. But it might be that the load on the MySQL server is so small compared to the data nodes that this isn't an issue?

Frazer Clement said...

Hi Max,
Yes asynchronous replication through MySQL is single threaded in a number of places including the Binlog writing, the Slave IO thread and the Slave SQL thread. For Ndb, the TC thread handling each epoch transaction is also a single-threaded choke point. The Slave SQL thread is often the biggest bottleneck as it executes and commits transactions from the Binlog serially.
For Ndb I am not sure what the first bottleneck would be. Due to the batching and intra-epoch parallelism we rarely hear of users with replication performance problems.
If we were to have a performance problem due to single threading in the slave SQL thread then we could attempt to parallelise some part of the Slave work from a single serial Binlog, such as executing multiple epochs concurrently.

However, perhaps a better fix would be to parallelise across multiple replication 'channels', e.g. Binlogging Master MySQLDs and Applying Slave MySQLDs. This could be done by splitting the row change events by some PK hash on the Master Cluster, and having each of n Binlogging MySQLDs only receive and record a subset of the row changes in an epoch. These partial Binlogs can then be pulled and applied on the Slave in parallel by independent Slave MySQLDs. The problem then becomes ensuring consistency on the Slave Cluster if required. One option is to support a single transaction with multiple contributing clients, but a single commit point. This is not too tricky to implement in Ndb, but would still have a single TC thread as a potential bottleneck. Another option is to support a kind of distributed commit of independent partial-epoch transactions so that everything is parallel up to the point of commit.

The cost of synchronising multiple processes at commit time can be amortised across one or more epochs. The balance here is to reduce synchronisation as much as possible without replication latency becoming excessive. I suppose an ideal system would 'change gears' depending on the load + size of epochs received.

Anyway, this is all theory as currently async replication performance is not a big problem for our users.

Frazer

Unknown said...
This comment has been removed by the author.
Unknown said...

Hi Frazer
Comprehensive and nice explanation. how do we know which transaction cause particular event? for example , if multiple transaction are executing and updating the database , when we receive the update event , how do we know the transaction id or which specific transaction caused this event ?

Thanks.

Frazer Clement said...

Hi Edward,
Good question. Each event carries an internally generated 64-bit transaction id, which can be queried using the NdbApi event Api as the events are received.


From storage/ndb/include/ndbapi/NdbEventOperation.hpp

/**
* Retrieve the TransId of the latest retrieved event
*
* Only valid for data events. If the kernel does not
* support transaction ids with events, the max Uint64
* value is returned.
*
* @return TransId
*/
Uint64 getTransId() const;


This 64-bit number is the same one which was visible to the client at transaction prepare/commit time :

storage/ndb/include/ndbapi/NdbTransaction.hpp

/**
* Get transaction identity.
*
* @return Transaction id.
*/
Uint64 getTransactionId();

So the events which result from a transaction commit will have that transaction's id when received back as events from NdbApi.

Things to note :
1. The system natively produces 1 row event per changed row per transaction. Multiple row modifications within a transaction (ins, upd, del) will always be merged into one effective operation (e.g. del, ins -> upd; upd,upd -> upd, ins,del -> NULL)
2. The merge_events NdbApi option causes NdbApi to merge all events on a table per row *per epoch*. e.g. multiple transaction's events are merged together to give one event per row. This option is switched on for tables with Blobs, which is one reason they can't currently be used with NDB$EPOCH et al

From SQL, the NdbApi transaction ids are not really visible. It might be a nice feature to expose the NdbApi transaction id as e.g. a session status variable.
The NdbApi transaction id is put into the Binlog by the MySQLD Binlog Injector. It uses a feature of the 'v2' RBR events where each event can have arbitrary extra meta data to tag each row change with its causing transaction, allowing the slave to determine transaction boundaries.

Hope this helps,
Frazer

Unknown said...

Thanks Frazer for your detailed explanation.

Unknown said...

Hi Frazer
I used getTransId() from NdbEventOperation class to get the transaction id and compared with NdbTransaction getTransactionId() function, both values are not same.

I looked into the implementation of NdbEventOperation getTransId() it was like this

Uint64
NdbEventOperationImpl::getTransId() const
{
/* Return 64 bit composite */
Uint32 transId1 = m_data_item->sdata->transId1;
Uint32 transId2 = m_data_item->sdata->transId2;
return Uint64(transId1) << 32 | transId2;
}

NdbTransaction was like this
tTransId1 = (Uint32) theTransactionId;
tTransId2 = (Uint32) (theTransactionId >> 32);

lets assume the following case
NdbTransaction's transaction id is - 9014895836135425
According to implementation - tTransId1 =1 and tTransId2 = 2098944

when event api constructing the transaction id Uint64(transId1) << 32 | transId2 it will return 4297066240 which is wrong.

// Event api transaction id construction suppose be like this right ?

Uint64(transId2) << 32 | transId1?

then the answer will be correct. Please advise on this .

Thanks.

Unknown said...

Uint64 transId = op->getTransId();
Uint64 actualTransId= Uint64((Uint32)transid) << 32 | transid>>32;

This would be the temporary solution, but I don't know why that user has to do all this modification?

Thanks.

Frazer Clement said...

Hi Edward,
Yes, you're right. That is a bug. Sorry about that.

I guess that we've only been using the transaction id to uniquely identify transactions in the event stream, rather than correlating with the source transaction, and so had somehow not noticed.

To fix we would probably leave this function as-is (in case someone has discovered and is doing the word-swap like you) and introduce a new function to return the 64-bit transaction id in the correct order.

Thanks for pointing this out, sorry for wasting your time.
Frazer

Unknown said...

Hi Frazer
I agreed what you said here "correlating with the source transaction is not necessary".
in my case , I was just curious that , why these two are not same ?

Now I am clear . thank you so much .

Frazer Clement said...

No problem.

I've raised an internal bug for this problem you found.

Thanks for getting in touch, good luck!

Unknown said...

Hi Frazer
I would like to know how NDB event API buffering the subscribed events when one of the thread which is processing the event is too slow. Is there any way get notification saying NDB Event API buffer is going drop the messages because of processing the event is slow ?

Thanks.

Regards
Edwards.

Unknown said...

Hi Frazer
As you mentioned, transaction are not in order in particular epch frame, can we arrange those transaction by using transaction ids.I did some experiment and observed, using transaction id , we can order the transaction in the specific epoch frame. Is there any other ways to do that.

thanks.

Frazer Clement said...

Hi Edward,
The Ndb Event API buffer generally does not drop messages because the
application is too slow - it continues buffering indefinitely. Recently
we have added a maximum buffer size, but the first iteration of this
feature will kill the API process when the limit is reached! We have a
second iteration in the 7.4 release which will instead drop events and
insert a 'TE_INCONSISTENT' event into the event stream.

In terms of notification, there are parameters which can be set which
result in the NdbApi generating cluster log events. These are :

void setReportThreshEventGCISlip(unsigned thresh);
void setReportThreshEventFreeMem(unsigned thresh);

The GCISlip is the number of buffered epochs above which the API node
will generate a warning log on *every* epoch boundary.
The FreeMem is a remaining-free percent level which will generate a
single log when it is passed. Note that the free percent is relative
to the amount of allocated buffer, but this can grow.

You can monitor these events using the MGMAPI - they can be parsed to determine the current levels.

From your NdbApi application you can locally monitor the difference between
the 'latestGCI' as reported by various Event APIs, and the most recent
GCI you have dequeued. There is not currently a way to observe the
buffer size from NdbApi.

If the NdbApi internal threads are too slow acknowledging epochs back
to the data nodes then eventually (MaxBufferedEpochs), the data nodes
will disconnect them to limit the buffering required. This
epoch-acknowledgement occurs automatically inside NdbApi and is
independent of whether the user threads are fast or slow at dequeueing
events from the Event Buffer.

Frazer

Frazer Clement said...

Hi Edward,
Using the transaction ids you can identify which events belong to which transactions.
The ids themselves do not have any meaningful ordering - even from a single client thread they only reflect transaction 'start' order, not commit order.
However you can use the order of events for transactions with same-key dependencies in an epoch to provide a partial ordering of the transactions. This will ensure that two transactions which modify the same row are ordered correctly w.r.t each other. Transactions with no dependencies can be sorted in any order.
Note that as we do not have information about read dependencies in the event stream, we may choose a partial sort order which is in fact incorrect.
e.g. :
Trans 1 reads Row 1 and inserts Row 1000
Trans 2 reads Row 1000 and writes Row 1

We would have no relevant ordering information about these transactions and so could order them (1,2) or (2,1). The (2,1) ordering is not correct, and depending on what you use the ordered sequence of transactions for, may cause problems.

Frazer

Unknown said...

Thanks Frazer, Very nice and clean explanation. In your scenario, using transaction id ordering would be a mess. But , how about the following situation.

One application is just inserting the value into database
for(int i=0; i < somevalue; ++i)
{
//start new transaction
NdbTransaction *myTransaction= myNdb.startTransaction();
myOperation->insertTuple();
myOperation->equal("COL0", i);
myOperation->setValue("COL1", i+1);
..................
.................


if (myTransaction->execute( NdbTransaction::Commit ) == -1)
APIERROR(myTransaction->getNdbError());
//close transaction here
myNdb.closeTransaction(myTransaction);

}
We can't predict that how these transactions are executing at database. ( the order) , when other application listening from the event API , TransIDs might be like
GCI -1
------------------
time t - {tid - 150, values - 4,....}
time t+1 -{tid - 140, values - 2,....}
time t+2 -{tid - 155, values - 5,....}
time t+3 -{tid - 130, values - 1,....}
time t+4 -{tid - 149, values - 3,....}

In this case , application which is listening an event API has to re-order them according to transaction ids right ? otherwise, how application knows this is the first value I should consider and this is the second value ? If application thinks , 4 is the first value then , there will be problem right. This is what I was asking in the previous question.

Please explain this.

Thanks.

Frazer Clement said...

Hi Edward,

From a single client (Ndb object), the transaction ids happen to be ordered, but that's really an internal implementation detail rather than a published interface. It could change in a future release.

We do have a mechanism for transferring some arbitrary metadata from NdbApi clients to event subscribers. At the NdbApi layer the metadata is referred to as 'AnyValue'. It can be set per NdbOperation (insert, update, delete), and will be available from the event Api by calling NdbEventOperation::getAnyValue().

Within MySQL Cluster, we generally use this to indicate e.g. the slave serverid in replication, or an indicator that a change should not be binlogged.

If you need to interoperate with attached MySQL servers writing Binlogs then you can configure them to ignore some of the bits in AnyValue, which you can then use for your own purposes. The relevant parameter is --server-id-bits.

This could be used to indicate some ordering information to event Api subscribers.

Alternatively you could of course include ordering information in the data written.

Hope this helps,
Frazer

Unknown said...

Hi Frazer
Thank you so much, as you said, we can simply use setAnyValue to order the events that we receiving from API.

I need to clarify one more thing in API, when I analyse the EventPAIMemoryUsage allocated , used memory fields, I couldn't understand why allocated memory is increasing every time when I am pooling.

I initially setup with 1024 MB buffer size and tried to poll every millisecond. The result was , allocated memory is keep increasing (some period , it is fixed) , used memory is actually fluctuating. does this mean API is keep allocating the memory until it reach the maximum event memory limit set by us?

Thanks.

Regards
Edward.

Frazer Clement said...

Hi Edward,
Sorry for the delay in responding. Good that you have the ordering info you need now.
The Event API internally does some 'caching' of allocated memory to minimise dependencies in the underlying allocator. This means that memory usage is generally grow-only, and usage peaks are persistent. Additionally, the cached memory is used in a FIFO fashion which can result in further allocation even though there is sufficient free memory available to the event API. This behaviour is designed for applications with a standard steady state event flow - eventually memory allocation will plateau and there will be no use made of the underlying allocator.
To better handle more dynamic situations we are looking at improving this in future releases.

Frazer

Alex Ou said...

Hi Frazer,
What algorithm do you use for the internal logical clock? It looks like a variant of Chandy-Lamport's snapshot algorithm to me, but I am not sure.

David

Frazer Clement said...

Hi David,
Interesting question. I am not sure of the theoretic underpinnings of our 'Global checkpoint' algorithm, but perhaps I can explain it a little more, and you can consider yourself how similar it is to the Chandy-Lamport, or other algorithms?

To increment the 'logical clock', we use a 2-phase protocol called Global Checkpoint. This protocol is initiated periodically by a node in the cluster currently 'elected' as DIH Master. A new round of the protocol can only begin when the previous round has completed. The DIH Master role is moved if the current DIH Master fails. Inter-node communication channels in MySQL Cluster are FIFO channels with guaranteed delivery.

DIH Master first sends a GCP_PREPARE_REQ signal to all nodes including itself.

On receiving a GCP_PREPARE signal, the node immediately blocks any prepared transactions from *beginning to commit*. This causes the Transaction Coordinator (TC) blocks in each node to queue transactions which request commit. Already committing transactions can continue to commit, transactions can continue to be prepared and aborted. Once commit is blocked, the node immediately sends a GCP_PREPARE_CONF back to the DIH Master.

When all GCP_PREPARE_CONF signals have been received, the DIH Master immediately sends a GCP_COMMIT signal to all nodes, which unblocks COMMIT request processing and increments the Global Checkpoint Index (Number) (GCI) associated with every transaction which commits after this point. So the duration of commit blocking is related to the worst case round trip between DIH Master and the other data nodes.

After unblocking Commit initiation, GCP_COMMIT processing then waits until all of the transactions with the previous GCI have completed, before sending GCP_NODEFINISH back to the DIH Master.

Once all GCP_NODEFINISH are received, the DIH Master considers the GCP round complete.

Observations :
- The GCP_PREPARE phase is not like Chandy-Lamport in that it is synchronously blocking COMMIT start across all nodes to get a consistent set of transactions.
- The GCP_COMMIT phase is somewhat like Chandy-Lamport in that it is causing a 'marker' (new GCI) to be injected into a stream (of committing transactions) which represents a consistent cut in the parallel flow of committing transactions. However this is more abstract than getting a consistent cut in the distributed flow of inter-node messages. The flow of committing transactions can be considered a message stream flowing 'down' from the TC component.
- The GCP protocol was probably designed as a '2 phase commit' for the job of incrementing the GCI.
- The GCI increment must occur so that the set of transactions included in a GCP are consistent - e.g. they have no missing dependencies, where a dependency is a previously committed transaction which writes to some subset of the same rows. This is not really the same as a consistent view of a message passing system which is interested in capturing the total system state.
- Stopping commit from starting, node by node, is kind of like 'freezing' the set of committed transactions. As each node freezes, it guarantees that its as-yet-uncommitted transactions will not commit. Therefore the rows that they have locked+modified, but not begun committing, will not be committed, and therefore no new dependencies, not included in the already-begun committing set, can be committed until the unfreeze.
Unfrozen nodes can continue committing as long as they have no dependencies on frozen node's uncommitted transactions.
Therefore whenever all nodes are frozen we are guaranteed that the set of already-begun-committing transactions are consistent (though may exclude many other prepared + committable transactions).

So probably GCP is not much like Chandy-Lamport, as it is related to a specific topic (finding a consistent set of committed transactions), and is rather coarsely implemented by blocking commit-start globally.

Does this clarify things?
Frazer

Anonymous said...

Hello.

You have written:
>Each epoch contains zero or more committed transactions. Each committed transaction is in only one epoch. The epoch clock advances periodically, every 100 milliseconds by default.

What about long transactions? For example, if I want to modify several thousands of rows it will take ~ one second. What if all cluster will fail in the middle of transaction?

Frazer Clement said...

Hello,
Transactions are included in an epoch at commit time. So transactions can prepare for a long time while many epochs complete without any problem.
Once a transaction begins committing, it is mapped into the current open epoch.
This is where the timing gets interesting. The open epoch cannot be fully completed until the transactions which started committing have completed committing.
For transactions affecting in-memory data, commit involves buffering Redo log entries and making memory state changes which generally is quite fast.
For transactions affecting on-disk data, commit involves writing to the disk pages in the buffer pool, which may in turn page reading + flushing, requiring Undo log flushing. This can take some time.
If commit takes longer than the Epoch period, then epochs become larger / longer.

Timing summary :
- Transaction commit should fit within an Epoch period
- Transaction prepare / execution time need not fit within an Epoch period.

Regarding node failures / cluster failure, this is not affected by the duration or size of epochs or transactions. In a survivable node failure case, the TC role(s) of the failed node are taken over by a surviving node, and it will query the surviving data nodes to determine whether the transaction began committing. If it did then it will continue the commit, otherwise it will rollback the transaction.

For cluster failure, recovery will be from disk, and the redo log is relevant here. Redo logs are made durable periodically, by default every two seconds. An epoch boundary is chosen to be made durable. When recovering a cluster from disk, recovery will attempt to recover to the last durable epoch in Redo, which by definition is a point in time not including any transactions in the process of committing at the cluster failure time.

Durability summary
- Transactions become network-durable as part of commit processing
- Node failures rollback any in-progress transactions that had not begun committing
- Transactions become disk-durable after commit, periodically
- Full Cluster failure and recovery recovers the cluster to some point in time preceding the failure time. Any transaction committing at cluster failure time will not be retained.

Hope this helps,
Frazer

Alex Ou said...

Hi Frazer,

Thanks for the derailed response and the logical clock in Mysql Cluster is not Chandy-Lamport's since Chandy-Lamport's is not a centralized algorithm and every process participating can initialize it at any time. I was asking this question 'coz I was trying to understand better the Read-Committed isolation level Mysql Cluster implements. Your comments answered a few questions but with one remaining: Let transactions T1 and T2 commit in the same Epoch e, say T1 modifies a row r and commits. After that(In the sense of local clock,say T1 and T2 are initialized in the same TC) T2 starts,read r and commits. Naturally T2 should read the version of r T1 just installed. To do so, having just the Epoch e in the last-modified field of r is not enough since both transactions are in the same Epoch. Does Mysql Cluster use something like the concatenation of e and local time of transaction commit in the last-modified field to decide T1 commits r before T2 reads it?

cheers,
David

Frazer Clement said...

Hi David,
Ok thanks for the feedback.

In terms of visibility of versions, this is handled entirely by the Local Data Manager (LDM) / Local Query Handler (LQH) component, on a row-by-row basis. There is no attempt to use a distributed versioning system (yet ;) ).

LDM maintains per row lock information (Shared, Exclusive) and also supports lock-free reads (Read-Committed) of the last committed version of a row. While a transaction has uncommitted changes affecting a row, LDM maintains 2 copies - the last committed version, and the uncommitted changed version.

When a read request arrives at LDM, the steps are something like this :

1. If the request requires an Exclusive or Shared lock, attempt to claim it.
If it is already claimed by a different transaction (Transaction Id), the request must wait until the lock is free, or it is aborted. If the lock is held due to uncommitted writes, then when the commit occurs, the lock will be released and the changes will be visible.

2. If the request is lock-free (Read-Committed), we read the 'last-committed' version, according to our state at the time of execution.

In both cases, if the current transaction has some uncommitted changes made to the row, then these are visible to the transaction itself, but not other transactions. There is also a concept of a 'savepoint' within a transaction allowing reads to read different uncommitted versions within a transaction, avoiding e.g. the Halloween problem.

So the visibility of a change to Read-Committed comes down to whether the row is locked at LDM when the read request arrives.

With our commit protocol we have the following effects w.r.t. locking + writes :

1. Prepare phase : Locks are claimed
2. Commit phase : Locks are held at Backup replicas, but released at Primary replicas
3. Complete phase : Locks are released at Backup replicas

We currently route all Read-Committed reads to Primary fragment replicas, so row changes become visible to further Read-Committed reads after step 2, which is also when we send a Commit acknowledgement to the API node. This ensures that any properly 'following' transaction can read all the writes of the first transaction. (Here I assume that to follow requires waiting until the Commit phase completes). Another possible implementation is to defer sending a Commit acknowledgement to the API node until after phase 3, which would enable Read-Committed requests to be serviced by Backup replicas, at the cost of write commit latency.

continued...

Frazer Clement said...

...

If some transaction attempts to read rows written by a transaction at Read-Committed level while it is committing (Step 2) then each row read will refer to either the before or after commit version of the row, but it is not guaranteed that *all* row reads will be from before or after.

The Commit phase involves distributed commit and is a per-row operation, so it is possible for a concurrent transaction to read some pre-commit and some post-commit row versions. This is sometimes surprising for users used to implementations which install committed changes atomically and so never expose partially committed states to Read-Committed reads.

Of course if it is necessary to see only fully stable states then row locks can be used. These protect against seeing partial states, but not against phantoms.

It would be interesting to explore alternative implementations which support a higher level of consistency on reads without excessive locking. As you hint, each TC could export information about its view of the set of committed transactions which LDMs could use to give consistent views if they maintained > 2 versions per row.

However it seems that a surprising number of users are happy with the consistency offered by our current Read-Committed implementation. Whether this is a conscious state is debatable! Some users use explicit locking of 'guard' rows to get a more serialised experience.

Summary

- Read-Committed is handled entirely at LDM using transaction id info, per-row locks and a last-committed version

- Read-Committed is consistent only on a per-row basis. Each row read is as-of some consistent point in time, but different row reads may be from different times.

- Row locks are available to obtain higher consistency.

Frazer

Alex Ou said...

Thanks Frazer, it is really appreciated you type all these to answer my question,it does clarify things. Speaking of using 'savepoint' to resolve the Halloween problem, is this 'savepoint' the same as that in the 'ROLLBACK TO SAVEPOINT' statement? What is more, are the four types of Halloween problems in MS SQL server described in the following article also resolved in the query optimizer in Mysql Cluster?
http://sqlperformance.com/2013/02/t-sql-queries/halloween-problem-part-1

Cheers,
David

Frazer Clement said...

Hi David,

The 'savepoints' I mentioned are an internal concept only, MySQL Cluster does not support the ROLLBACK TO SAVEPOINT statement currently. However they are similar in some ways as they represent stable states within the execution of a single transaction.

I would need to check more thoroughly to be sure how well we cover all of the 'Halloween problem' examples in the article. MySQL does not support the SQL Server MERGE syntax, so it's not an exact mapping. MySQL Cluster is acting as a multi-version engine (but within a single transaction), so the solution is different to the SQL Server solution, and avoids the 'spooling / blocking' operator.

Frazer

Alex Ou said...

Hi Frazer,
Thanks.I can wait.I heard that Insert...on duplicate key update in Mysql is an equivalence to merge in MS sql server.

Cheers,
David

Alex Ou said...

Hi Frazer,
I have a further question about the Read-Committed isolation level.Say transaction T1 executes the following two statements:update table t1 set...;update table t2 set...,and transaction T2 executes the following two statements:select something from t1 ...lock in share mode;select something else from t2. Let's assume that T1 and T2 were both accessing rows r1 in t1 and r2 in t2 respectively so that two conflicts arise between T1 and T2 and further assume that the locking read in T2 claimed the lock for r1 after T1 released it,do you think T2 would see the version of r2 installed by T1? From your answer to my question about the Read-Committed isolation level, either the before or after version of T1's write to r2 is possible since it only depends on the local time when the Read-Committed read comes in, but now the second read in T2 is after the complete phase of T1(in the sense of T2's coordinator's local clock,not necessarily the local clock where r2 resides),isn't that a little bit unreasonable?

If the answer is still possible to see either the before or the after version of T1's write to r2, could you please describe how other people achieve a "more serialized" version of a Mysql Cluster program as you told me in your answers? I am really having some hard time about it.

Regards,
David

Alex Ou said...

Hi Frazer,
I may not have explained very clearly about what I meant in my last comment. I want to make sure the following fact: if I got a way to know a transaction T has entered the complete phase(say,by claiming a lock that T acquired),it can be guaranteed that all distributed parts of T have finished the commit phase,so that if I now read a row modified by T without claiming a lock, I can read the version after T's modification,not a version before it.

Cheers,
David

Frazer Clement said...

Hi David,
Your comments and questions are very good as usual!
Sorry for the slow reply.

Your second comment captures the problem well : a read lock is available when the relevant operations have committed, which may be before all operations have committed, which allows subsequent reads to race with the remaining operation commits, reading a mix of pre + post commit values.

On a system with 2 or more replicas of every fragment, the locks on the Backup replicas are not released until the Complete phase. This can be used to 'detect' when the Complete phase has started (and therefore all operations are committed + visible at the primary replicas). For example an update operation will need locks at all replicas and so will not become prepared until any holding transaction has entered its complete phase. This would guarantee a view of the post-commit row versions, or later.

More generally a transaction needs to take row locks to get a consistent view across rows. MySQL Cluster supports 'short' shared read locks (aka SimpleRead) which are used in some cases to get a consistent view of data - a shared 'root' row is locked with an exclusive lock of some sort, and then all other rows are read with the short shared read locks, so that the reads wait for any concurrent commits to complete.

Perhaps it would be useful to support a type of read lock that is only claimed after the complete phase of a lock holding operation - giving a visibility guarantee for the other rows in the transaction via CommittedRead which could be consistent if all modifications to the set of rows are serialised in the same way.

Frazer

Alex Ou said...

Hi,Frazer
In a locking read, if it is routed to the primary replica like a committed read(Is it always routed to the primary replica?), it can claim the lock while the same lock is still being held in the secondary replica(since it is just committed,not completed),is this correct?

Thanks,
David

Frazer Clement said...

Hi David,
There are 3 types of locking read defined in NdbApi :

- SimpleRead
This takes a shared read lock for the duration of the read, then releases it - e.g. a Short Lock
This can be routed to the primary or the backup replica (it will go to any replica co-located with the TC, or else the primary).

- Read*
This takes a shared read lock and holds it until the transaction commits, e.g. a Long Lock.
This is always routed to the primary.

- Exclusive read lock
This takes an exclusive read lock on the primary and holds it until the transaction commits, e.g. a Long Lock.
This is always routed to the primary.

There are also exclusive 'write' locks taken on all replicas.

SimpleRead is generally used to cause a read to occur after any concurrently prepared/committing write, as a slightly stronger version of CommittedRead.

* There is also an 'Unlock' operation which can be used to release a Shared read lock at some discretionary point before a transaction commits. This is used when reading Blobs.

As reads only lock one replica, it can be the case that some other operations lock other replicas.

Potentially suprising things that can occur :
- Stable : SimpleRead processed on Backup replica while Exclusive read lock is held on Primary replica
yet SimpleRead stalls on Primary replica while Exclusive read lock is held on Primary replica and control over reading Primary/Backup is not explicit.

- Transient : Simple/Read/ExclusiveRead held on Primary replica while Write is committing and still holds lock on Backup replicas

Routing locking reads to the Primary gives a simpler set of behaviours, but there can still be temporal overlap of lock claims with other transactions.

Are you attempting to solve a problem with MySQL Cluster, or just analysing the implementation? Always interesting to hear any feedback you have.
Thanks,
Frazer

Anonymous said...

Hello. Epoch must increment by one every TimeBetweenEpoch if I understand correctly this article. But something strange happens in real world:

mysql> SET @epoch1=(SELECT MAX(epoch) FROM mysql.ndb_binlog_index WHERE orig_server_id=2); SELECT SLEEP(60); SET @epoch2=(SELECT MAX(epoch) FROM mysql.ndb_binlog_index WHERE orig_server_id=2); SELECT (@epoch2 - @epoch1) / 60;
Query OK, 0 rows affected (0,35 sec)

+-----------+
| SLEEP(60) |
+-----------+
| 0 |
+-----------+
1 row in set (1 min 0,00 sec)

Query OK, 0 rows affected (0,36 sec)

+--------------------------+
| (@epoch2 - @epoch1) / 60 |
+--------------------------+
| 2075900859.8333 |
+--------------------------+
1 row in set (0,00 sec)

I have TimeBetweenEpoch=100 and expect 10 in result of query (10 epoch per second).
I want to clean this table periodically (for example every 30 days) but I don't know how to count N days from current moment. I see that gci incremented every second and I think that it is due to the fact that I have TimeBetweenGlobalCheckpoint=2000 and two clusters. I get 1 GCP from 2 clusters every two seconds and mean value is 1 gci per second. Am I right?

What way to calculate time interval from current moment in this table is best?
Thanks.

Frazer Clement said...

Hello,
The epoch is 'incremented' every TimeBetweenEpochs, but it is not always incremented by 1, sometimes the jump is larger.
This is because the epoch has two 32-bit components - the GCI and the microGCI.
The GCI is the top 32 bits, and this is incremented once every TimeBetweenGlobalCheckpoints. When the GCI is incremented, the microGCI is reset to 0.
Viewing these as separate components the sequence in your cluster might be :

GCI microGCI Epoch 0xEpoch ~elapsed millis
0 0 0 0x0 0
0 1 1 0x1 10
0 2 2 0x2 20
0 3 3 0x3 30
...
0 19 19 0x13 190
1 0 4294967296 0x100000000 200
1 1 4294967297 0x100000001 210
1 2 4294967298 0x100000002 220
1 3 4294967299 0x100000003 230
...
1 19 4294967315 0x100000013 390
2 0 8589934592 0x200000000 400
3 1 8589934593 0x200000001 410
3 2 8589934594 0x200000002 420
3 3 8589934595 0x200000003 430
...

Hopefully you can see the pattern above.

Rather than attempting to track the passage of time using the full epoch, I think it makes more sense to use the gci part (epoch >> 32), assuming that the gci is incremented by 1 every TimeBetweenGlobalCheckpoints. This is generally the case, though there will be some error in the measurement, so don't expect great precision.

If you have TimeBetweenGlobalCheckpoints set to 2000 then I would expect gci to be incremented once per 2000 millis, so I am surprised that you state that it is incremented once per second. Epochs and Gcis are unique to each cluster, so it does not matter how many clusters you have. If your serverids are unique then the data should show this.

With GCI incrementing once every two seconds, and 60*60*24 = 86400s in a day, we can expect roughly 86400 * 30 / 2 = 1,296,000 GCI increments in 30 days. You could use this to determine a cut-off GCI (and therefore epoch) to delete from.

Beware that epochs track time to some extent on the cluster where they are generated, but writing them to a binlog + ndb_binlog_index, replicating them to a slave, applying them, and then binlogging them there + writing to the slave's ndb_binlog_index are all asynchronous processes occurring some time later, and subject to arbitrary delays. So e.g. epochs may 'arrive' in a Slave's ndb_binlog_index table at a time + rate decoupled from the time + rate they were generated at.

Hope this helps,
Frazer

Anonymous said...

Hello.
Thanks, I had mistake in my query checking GCI's. You are right about GCI's.

Frazer Clement said...

Late update regarding the Global checkpoint algorithm. It seems to be based on one described here :

Two Epoch Algorithms for Disaster Recovery. Hector Garcia-Molina, Christos A Polyzois and Robert Hagmann. Proceedings of the 16th VLDB conference, Brisbane, Australia 1990.

The paper has a section 8 'Another Application of the Epoch Algorithm : Distributed Group Commit' which describes something similar to the MySQL Cluster Global Checkpoint design, especially the 'Single Mark' variant.

Unknown said...

Hello,

Assume that we have 4 data nodes and the replication factor is 2 ( 2 node groups). I have one API node that subscribes to changes on a table.

- Only one data node from each node group will send its events to the subscribers once the epoch is completed, so 2 data nodes are sending events to the subscribers, right? and if one data node failed, the other data node from the same node group will take over, right?
Or
- Only one data node elected as a leader will collect the events and send them to the subscriber

- Can an API node subscribe for only a partition of a table, to receive events from only one partition?

Frazer Clement said...

Hi Mahmoud,
I will describe how it works :

- Each live node in a nodegroup participates in all changes occurring to data managed by that nodegroup due to NoOfReplicas and 2 phase commit.
- Each live node in a nodegroup sends only a fraction (normally 1/NoOfReplicas) of the changes occurring in the nodegroup to subscribing APIs.
- Therefore a subscribing API is receiving data from all live data nodes
- Therefore all live data nodes are participating in a balanced way

In your example, each data node will send 1/2 of the changes in their nodegroup (aka 1/4 of the changes in the entire cluster) to the subscribing API. The subscribing Api gathers these and releases them to the user interface one epoch at a time.

This mechanism is implemented in the data nodes by (re)hashing the row changes across a set of virtual buckets, and spreading the responsibility for publishing each bucket across the live nodes in a nodegroup. Note that the row->bucket mappings are similar to, but independent of the fragment Primary/Backup roles.

Each data node will therefore have a set of buckets that it is responsible for publishing to the API subscribers, and a set of buckets that it is not.

When a row change is hashed to a 'publish' bucket, it is published. When it is hashed to a '!publish' bucket, it is stored in an in-memory buffer on the data node, organised by epoch#.

All data nodes send 'end of epoch' stream markers to subscribing APIs. When all subscribing APIs acknowledge receipt of all row events in some epoch n, the data nodes release the in-memory buffer content for that epoch (the content in the !publish buckets), so this buffering does not grow without bound. MaxBufferedEpochs et al can be used to configure limits here.

The reason for buffering unpublished buckets in the data nodes is for handling data node failures. As part of data node failure handling, responsibility for publishing the failed node's published buckets is spread across the surviving node(s) in the nodegroup. These nodes then resend all buffered content for unacknowledged epochs to the subscribers, to avoid any gaps in the event stream due to node failures.

During node recovery, this responsibility is handed back towards the end of node restart (phase 101).

IIRC there are 24 virtual buckets per nodegroup normally, allowing each node in the nodegroup to have a balanced share with 4,3,2,1 nodes live.

For your second question : "Can an API node subscribe for only a parition of a table, to receive events from only one partition?". That is not currently supported in the API, but it could be. It makes sense to allow a single table's events to be spread over a number of independent listeners, on a partition or other basis but currently it is only possible to subscribe to a whole table's events.

If this is something that you are interested in then you could help by describing what your preferred API would look like. How would the (set of?) partitions be specified? Is it easy to double subscribe / miss some partitions out? How is online add partition handled? How do redundant listeners ensure that they are redundant for the same subset of partitions etc?

Thanks for the questions!
Frazer

Unknown said...

Hi Frazer,

Great! Thanks for the extensive response.
“the data nodes release the in-memory buffer content for that epoch (the content in the !publish buckets),” I guess both types of buckets publish and !publish should be released when receiving the ack epochs. Does the buckets assignment happen when the API node creates an event, or is it something fixed when the node join at the startup/restart?

I was thinking more or less of a load balancing scenario where i have a table which is partitioned by some column assuming that the column have a fixed number of values, and then start multiple subscribers with different partition keys set (similar to setting the partition key in a transaction) where each subscriber would watch for only a partition of that table. Also, maybe a more generic way if the subscribers could specify a conditional subscribe based on the partition key maybe something like (key % 5 == 1).

Regarding miss/hit and redundant partitions, i guess it is the same with the current API, if you start multiple subscribers for the same table, it is the subscriber responsibility to handle such situations.

Thanks!
Mahmoud

Unknown said...

Hi Frazer,

Another question, does the subscribers send their acks to all datanodes or only to one datanode in a node group and then it forward it to the other nodes in the node group, at least that what i understood from suma.txt documentation in the code.

USER SUMA XXX
========================================================
SUB_GCP_COMPLETE_REP
<----------------------
SUB_GCP_COMPLETE_REP
<-------------------------

for event only:
SUB_GCP_COMPLETE_ACK
------------------------->
when all subscribers have sent ACK on gci
send to all other suma's in node group:
SUB_GCP_COMPLETE_ACK
---------------------->

Thanks!
Mahmoud

Frazer Clement said...

Hi Mahmoud,
The 'publish buckets' are not buffered in memory, they are just forwarded on to subscribers, so there is nothing to release for them when the epoch is acknowledged.
The bucket assignment is connected to data node join + leave of the cluster, and is independent of any particular table or event.
Thanks for the detail on the partitioning ideas.
Frazer

Frazer Clement said...

For the SUB_GCP_COMPLETE_REP/ACK stuff :

Within data nodes :

1. When each SUMA instance first hears of a GCI(epoch), it allocates a GCI record internally

2. As row events for the epoch arrive, they are buffered / forwarded according to the bucket responsibilities (publish / !publish above)
Note that row events for different epochs can arrive at SUMA interleaved.
Note that row buffering is distinct from the GCI record

3. Eventually each LDM/TUP instance in a data node finishes processing the COMMIT + COMPLETE stages of transactions in an epoch, and forwards on a SUB_GCP_COMPLETE_REP (Tail end of the GCP_COMMIT protocol)

4. When a SUB_GCP_COMPLETE_REP from every LDM has been gathered for an epoch, a SUMA instance knows that it will receive no more changes for that epoch, and so it sends a SUB_GCP_COMPLETE_REP to each API node which is subscribed

At API nodes :

5. Each API node receives a SUB_GCP_COMPLETE_REP from each live data node and forwards it on to all internal Ndb objects which are receiving events, then responds with SUB_GCP_COMPLETE_ACK to the sending node.

6. Each Ndb object subscribing to events tracks each incoming SUB_GCP_COMPLETE_REP, and when all have been received, publishes the epoch worth of events to the user, who can begin to iterate them via Ndb::nextEvent().

At Data nodes :

7. Each SUMA instance eventually receives a SUB_GCP_COMPLETE_ACK from each API node which it sent a SUB_GCP_COMPLETE_REP to

8. When a SUMA instance has received an ACK for every REP that *it* has sent, it releases the GCI record for the epoch, and then sends a SUB_GCP_COMPLETE_ACK to all the SUMA blocks on the other nodes in its nodegroup

9. On receiving a SUB_GCP_COMPLETE_ACK from another data node's SUMA block, a SUMA instance will release the buffered row content for buckets handled by that node

10. When all nodes in the nodegroup have received SUB_GCP_COMPLETE_ACK from all other nodes in the nodegroup, all buffering for the epoch will be released.

The inter-SUMA sending of SUB_GCP_COMPLETE_ACK is there to allow the buffered data to remain until it is clear that it is no longer needed for handling a potential node failure.

The overall design follows from the general Ndb principles of shared nothing + balance, which are especially valuable on the 'data path'. It is quite nice to avoid having a special coordinator nodes etc when they are not necessary.

Unknown said...

Great! Thanks for the comprehensive answer. Another issue i was thinking about, how much does the TimeBetweenEpochs affect the performance of the whole cluster?, for example if i set it for example to as low as 5 or 10 milliseconds, that will spawn a GCP every 5 or 10 millisecond.

Frazer Clement said...

IIRC the lowerbound in config is 10 millis.

The overheads will scale up as the frequency increases, but they will not necessarily be prohibitive.

The most obvious problem is GCP Prepare stalling commit processing for ~ 1 RTT more frequently. That could affect commit latency variability and potentially commit throughput.

Try it and see!