Wednesday 26 January 2011

Low latency distributed parallel joins

When MySQL AB bought Sun Microsystems in 2008 (or did Sun buy MySQL?), most of the MySQL team merged with the existing Database Technology Group (DBTG) within Sun. The DBTG group had been busy working on JavaDB, Postgres and other DB related projects as well as 'High Availability DB' (HADB), which was Sun's name for the database formerly known as Clustra.

Clustra originated as a University research project which spun out into a startup company and was then acquired by Sun around the era of dot-com. A number of technical papers describing aspects of Clustra's design and history can be found online, and it is in many ways similar to Ndb Cluster, not just in their shared Scandinavian roots. Both are shared-nothing parallel databases originally aimed at the Telecoms market, supporting high availability and horizontal scalability. Clustra has an impressive feature set and many years of development behind it, but limited exposure to general purpose use.

At the time of the MySQL acquisition, HADB/Clustra was embedded in a number of Sun products as a session store and metadata repository, but was not available for external customers for general purpose use. Shortly afterwards, a decision was made to move HADB into a 'sustaining' model, and most of the ex-HADB team then became available to work on other projects. MySQL has greatly benefited from the injection of skills and enthusiasm from the Sun DBTG across a number of different teams, which is maybe not well known to those outside the company.

In the Cluster team, one project which has really benefited is the SPJ (Select Project Join) project, which couldn't have happened without the expertise and energy of the ex-Clustra/HADB team working on it.

The SPJ project started around the time of the last MySQL Developers conference in Riga in September 2008. The intention at the time was to look at ways of efficiently supporting more complex queries, specifically involving table joins, reducing unnecessary data transfer, communication latencies and context switches and increasing parallelism.

The main insights at the start of the project included :
  • Join mechanisms should be based on linking existing NdbApi single-table access primitives
  • Join result sets need not and should not be fully materialised at the data nodes
  • Join mechanisms need not be fully general or fully capable initially, as full generality/capability is already available with the existing Apis
  • Small joins should be targeted (Number of rows, number of tables)

These points have simplified the project scope greatly, and allowed many painful and costly detours to be avoided.

SQL execution in MySQL Cluster

As described in a previous post, Ndb Cluster was originally designed to quickly execute small queries with a high update rate at low latency. Larger more complex queries were executed by a separate query processor. The emphasis in this design is that complex queries are possible, but not necessarily fast or efficient. A main goal is that complex queries do not adversely affect the properties of the high volume, low latency requests.

All data access in MySQL Cluster is via the NdbApi interface. The NdbApi gives access to data stored in tables in the Cluster via four primitives :

  • Read row by primary key
  • Read row by secondary unique key
  • Scan a range of rows in an ordered index with optional conditions
  • Scan all rows in a table with optional conditions

Each of these primitives operates on a single table, and any table joins must be done by the NdbApi user. Different primary key/unique key operations and individual scan operations run in parallel across the data nodes in a cluster. NdbApi allows different operations and scan requests to be batched together to minimise latency due to communication delays, and it is essential to use this batching to get minimal latencies with Ndb.

In MySQL Cluster, attached MySQL Servers act as query processors, and MySQL's SQL execution engine breaks complex queries down into calls to its generic Storage Engine (SE) Api, which also deals with data access one table at a time.

The Ndb storage engine then further decomposes these SE Api calls into NdbApi primitive operations on individual tables.

MySQL supports SQL queries by performing the SE Api calls to read data, then comparing and matching results, sorting, buffering and reformatting. This works very well and gives MySQL Cluster great SQL functionality and compatibility, although users may find that the latency of their individual queries is not as low as with other engines such as MyISAM and InnoDB, which do not have to perform inter-process communication to implement their SE Api calls.

For minimum latency, Ndb requires that the MySQL Server makes efficient requests for data, requesting as much data as possible at once, and not using the data until it is essential to make forward progress - e.g. when there is a real data dependency.

MySQL features such as Insert, Update and Delete batching, and Batched Key Access minimise the MySQLD to Data node round trips required to execute certain types of operations, but they are unable to help when there are real data dependencies in a query. For example when the server needs to read some value from table t1 to know which rows to read from table t2 then there is no alternative but to read the t1 rows into memory before issuing any reads from t2.

Linked Operations

To reduce the need for extra Api to Data node round trips for every data dependency, we must allow operations to be linked. If we can describe the data dependency as a link between NdbApi operations, then it can be resolved amongst the data nodes. For example, rather than stating :


SQL > select t1.b, t2.c from t1,t2 where t1.pk=22 and t1.b=t2.pk;
ndbapi > read column b from t1 where pk = 22;


[round trip]


(b = 15)
ndbapi > read column c from t2 where pk = 15;


[round trip]


(c = 30)
[ return b = 15, c = 30 ]


We would state the join/operation linkage at the ndbapi level :


ndbapi > read column @b:=b from t1 where pk = 22;
read column c from t2 where pk=@b;


[round trip]


(b = 15, c = 30)
[ return b = 15, c = 30 ]



We allow read operations to be parameterised on the results of previous operations, and have the linking of the operations, the flow of results into parameters, handled by the data nodes. The data dependency still results in some execution serialisation at the data node layer, but not at the api layer, so data dependencies within queries needn't result in extra round trips between the MySQL server and the data nodes. Where the dependent data happens to be on the same data node, the dependency can be resolved with no inter-process communication at all.

Viewing the database software as a stack, the execution of the join is being 'pushed down' the stack, to a lower layer. For this reason, the SPJ functionality is also sometimes referred to as pushed-down joins or just pushed joins. Pushing functionality closer to the data can result in improved performance due to lower latency, reduced data transfer etc. In the case of MySQL Cluster, it can avoid inter-process communication, as well as enable parallelism across the data nodes.

In theory, linking can occur between any of the 4 primitive operation types :
  • Primary key access (PK)
  • Unique key access (UK)
  • Ordered index range scan (OI)
    (Range bounds and optional conditions parameterised)
  • Table scan (TS)
    (Optional conditions parameterised)

In practice, linking the cardinality (0|1) operations (Primary key, Unique key) together is simpler than linking with the scans. In turn, linking a scan to a cardinality (0|1) operation is simpler than linking a scan to another scan.

Linking a table scan to a table scan results in a cross-join and is probably going to be unpleasantly expensive for anything other than small tables.

The initial SPJ implementation supports combinations of Primary/Unique key operations linked together with at most one ordered index scan.

A future implementation will support multiple ordered index scans in a single request. This is more complex to handle due to the buffering required of the different scan result sets, and the resulting result ordering versus efficiency tradeoffs.

The SPJ Api is implemented as an extension to the existing NdbApi, with similar primitive concepts, but with the addition of the means to link the primitives together. As with the existing NdbApi, the usage pattern is along the lines :

  • Define operation(s)
  • Define further linked operation(s)
  • Execute() // One round trip to the data nodes
  • Examine results

In terms of batching, a tree of linked operations, with one const-parameterised root operation, and one or more child operations, is considered to be a single operation. Multiple SPJ operations, each actually a tree of primitive operations, can be executed simultaneously in a batch, along with other 'basic' NdbApi operations.

Where a scan is included, the scan can be advanced using the normal nextResult() mechanism, which also advances the results returned by any cardinality (0|1) child operations.

NoJoins - Not only Joins

While the SPJ extensions are described here in terms of joins, at the NdbApi level they are really 'linked operations'. One design goal which is not completely aligned with the join concept was to allow scans of multiple different tables to be parameterised on a single root operation. For example :

  • read @eid:= entity_id from map_table where username="jan";
  • scan blog_titles from blog_posts where entity_id=@eid;
  • scan latest_tweets from twitter_feed where entitiy_id = @eid;
  • scan share_prices from stock_feed where entity_id = @eid;
  • ....
Here there is a data dependency between the first lookup and n peer child scans. I want to read all of this data in one round trip, but I don't necessarily want to have to express this in a single 'join' query. If we had a more relational/SQL oriented Api we might have had to create some unholy union of the different results, with masses of repeated values or nulls, or repeat the first lookup for each of n two-way joins.

With the linked operation concept, we can clearly state that the child scans are parameterised by the first lookup, without having to introduce some further unnatural coupling between the rows returned by each scan, which are otherwise independent.

So although SPJ is named after and described as supporting joins, it doesn't mean that you have to be 'join-oriented' or a SQL Samurai to benefit from it. It may be quite useful for efficiently traversing graphs, hierarchies and other links between rows where the concept of a 'join' is quite alien.

Check it out

A mysql-5.1-cluster-7.1 source tree with the SPJ enhancements can be downloaded from here. You can see the NdbApi extensions in the storage/ndb/include/ndbapi directory of the source tree. This source also includes extensions to the MySQL Ndb handler to make use of the new SPJ Api for SQL queries, which I hope to describe a little next time. If you want to download and try out SPJ then see some of the other blog posts about how to get started with it.

5 comments:

Unknown said...

Hi,

“Where the dependent data happens to be on the same data node, the dependency can be resolved with no inter-process communication at all.", however this is often not the case, I mean, the data is not co-located. Then the performance will be bad?

I have a few questions. In the era of web app, it is quite common to have pattern like:
select top (n) x, ... from tbl where search_predicate and x > bound1 order by x,
where x can be from a small set, e.g. {[UserID], [city, age], ...}. This query will provide the base data for paged search result sorted by some field choosen ad hoc by users from a small set.

This will in turn require a schema like:
a: create tabe tbl { userid, city, age, ... primary key (city, age, userid), unique key (userid) } engine ndb,
Now since this is on NDB engine, the above is actually equivalent to:
b: create tabe tbl { userid, city, age, ... primary key (city, age, userid)}
create table tbl_index { userid, city, age, primary key (userid)}
(see: http://dev.mysql.com/doc/refman/5.7/en/mysql-cluster-ndbd-definition.html, unique constraint is implemented as separate table)

Now the query will have two forms correspondingly:

a1:
select top (n) userid, ... from tbl where search_predicate and userid > bound1 order by userid,

b1:
select top (n) userid, ... from tbl_index, tbl on tbl_index.userid = tbl.user
where search_predicate and userid > bound1 order by userid

So my question is:
1. How do the two different forms of queries perform on a MySql Cluster? and which one will win or will they tie? Why?
2. As seen from schema a and b, clearly the related data are not co-located, since, one partitioned by userid, the other by (cityid,age,userid), so, How will spj or AQL try to optimize for performace?
3. For the above problem, I think batching is the way to go for optimization, so I imagine a algorithm somewhat like this:
------
declare tmp_ret_table {userid primary key, city, age, ...}
declare tmp_index_table { tbl_nodeid, city, age, userid }
declare fetchfactor: some constant
declare batched_ids: varchar

while total-rows in tmp_ret_table < n
select top (n* fetchfactor) nodeid("tbl", city, age, userid), city, age, userid
into tmp_index_table
from tbl_index where userid > bound1 order by userid -- nodeid is a sql function

for each tbl_nodeid in tmp_index_table, generate the batched_ids, formated as (pk1, pk1...)
insert into tmp_ret_table
select userid, ... from node(nodeid).tbl where pk in batched_ids and search_predicate
delete * from tmp_ret_table

if no more rows for all nodeid break
while-end

trim extra rows (if > n) in tmp_ret_table

return tmp_ret_table
-----
The 3rd question is, how do you think about the outlined algorithm, how does mysql cluster's optimization implement it?

Thanks, JX

Frazer Clement said...

Hi JX,


> “Where the dependent data happens to be on the same data node, the dependency can be resolved
> with no inter-process communication at all.", however this is often not the case, I mean, the
> data is not co-located. Then the performance will be bad?

Well inter-process communication will be required, which will increase latency, response time etc. However throughput need not be linearly affected if there are parallel queries etc.


> I have a few questions. In the era of web app, it is quite common to have pattern like:
> select top (n) x, ... from tbl where search_predicate and x > bound1 order by x,
> where x can be from a small set, e.g. {[UserID], [city, age], ...}. This query will provide
> the base data for paged search result sorted by some field choosen ad hoc by users from a
> small set.
>
> This will in turn require a schema like:
> a: create tabe tbl { userid, city, age, ... primary key (city, age, userid), unique key
> (userid) } engine ndb,
> Now since this is on NDB engine, the above is actually equivalent to:
> b: create tabe tbl { userid, city, age, ... primary key (city, age, userid)}
> create table tbl_index { userid, city, age, primary key (userid)}
> (see: http://dev.mysql.com/doc/refman/5.7/en/mysql-cluster-ndbd-definition.html, unique
> constraint is implemented as separate table)

Yes


> Now the query will have two forms correspondingly:
>
> a1:
> select top (n) userid, ... from tbl where search_predicate and userid > bound1 order by
> userid,
>
> b1:
> select top (n) userid, ... from tbl_index, tbl on tbl_index.userid = tbl.user
> where search_predicate and userid > bound1 order by userid
>

We really only 'join' with the tbl_index when we are using it to access the table (tbl). We only use the tbl_index to access the table (tbl) when we have a single (or small set) of unique index values to lookup. In this case we have an inequality on the userid (userid > bound1), so we would rather use an *Ordered Index*. Ordered indices are always defined on the main table (tbl) - never on a secondary unique index table.

So I don't think b1 would occur using MySQLD.


To be continued...

Frazer Clement said...

> So my question is:
> 1. How do the two different forms of queries perform on a MySql Cluster? and which one will
> win or will they tie? Why?

b1 not really used, as mentioned above.

Perhaps your question is rather about how MySQL Cluster would handle it if you made the unique index table explicit like you have done above, and added ordered indexes to e.g. the UserId column etc...

I guess I would first try to determine what the gain of b1 is over a1. Given that the secondary 'index' contains no data that is not already in the base table, all it provides are more efficient access methods for individual key values. As that is not required here, it seems like extra work for no gain, in which case I would expect it to be slower.


> 2. As seen from schema a and b, clearly the related data are not co-located, since, one
> partitioned by userid, the other by (cityid,age,userid), so, How will spj or AQL try to
> optimize for performace?

The ordered indices in the schema would probably be more relevant to discuss here. I would expect that query a1 would be executed without using SPJ as it may just be a range scan on an ordered index on UserId with a pushed predicate. e.g. Scan the UserId ordered index on table tbl starting at bound1 and returning rows which pass [partial] predicate function x.

The scan will probably run in parallel across all LDM instances holding an index fragment, as it will not be prunable. No joins are required. Each fragment scan will return a batch of rows. Perhaps more than top(n) will be returned to the MySQLD, but it can discard those it does not need. Only m * batchsize more rows than needed will be scanned where m = parallelism.

More generally, SPJ/AQL will :
1. Consume the output of query planning (Ordered NLJ of tables/indices) with limited feedback into the process
2. Identify query fragments that can be pushed to the data nodes (perhaps the whole query)
3. Prepare these fragments
4. Execute these fragments as the standard NLJ executor iterates the NLJ.

From the MySQLD point of view, it is standard NLJ (with MySQL enhancements for batched key access).

On the Ndb side of things we will of course use batching, so when we are asked for the first row from a query fragment we will execute the pushed portion to produce *at least* 1 row. The extra rows are buffered in NdbApi until the MySQLD iterates onto them. It may never do this.

So we will do batching, pushed, parallel joining etc when necessary. However there is currently not a lot of feedback from our SPJ to the MySQLD Query optimizer/planner about more or less efficient plans.

And down in the data nodes, data locality is just a matter of latency. Both local and remote lookups are performed using asynchronous message passing, so there are no different code paths etc. Of course local lookups are more efficient and lower latency, but the code is agnostic.

Hope this answers the question.

Frazer Clement said...

> 3. For the above problem, I think batching is the way to go for optimization, so I imagine a
> algorithm somewhat like this:
> ------
> declare tmp_ret_table {userid primary key, city, age, ...}
> declare tmp_index_table { tbl_nodeid, city, age, userid }
> declare fetchfactor: some constant
> declare batched_ids: varchar
>
> while total-rows in tmp_ret_table < n
> select top (n* fetchfactor) nodeid("tbl", city, age, userid), city, age, userid
> into tmp_index_table
> from tbl_index where userid > bound1 order by userid -- nodeid is a sql function
>
> for each tbl_nodeid in tmp_index_table, generate the batched_ids, formated as (pk1, pk1...)
> insert into tmp_ret_table
> select userid, ... from node(nodeid).tbl where pk in batched_ids and search_predicate
> delete * from tmp_ret_table
>
> if no more rows for all nodeid break
> while-end
>
> trim extra rows (if > n) in tmp_ret_table
>
> return tmp_ret_table
> -----

Right, well that is kind of what happens, but the responsibility is split across layers, and there's no join required.

1. Use an ordered index to find *batches of* candidate rows by predicate > bound1
Fetch factor above is effectively the ndb_batch_size [session] variable.

2. Use an always-local read to get the indexed data from the table tbl
(Ordered indexes are always co-located with the indexed rows - each table fragment has a colocated index fragment indexing only the rows it contains)

3. Use the data node filter executor to filter out non predicate matching rows in the data node, as far as possible

4. Return candidate rows to MySQLD

5. MySQLD applies 'full' filtering [which may reduce the candidates further]

6. MySQLD outputs 1 row at a time until LIMIT hit. Once a batch is exhausted, another batch is requested from the data nodes.

BTW, just to confuse things further, MySQLD supports a separate 'Batched Key Access' mechanism (on MyISAM, InnoDB, *) which works something like you describe, getting a batch of keys from table 1 to lookup in table 2 as an optimisation. This can also be used with Ndb, under the control of the Optimizer. IIRC, InnoDB will attempt to e.g. sort the batched keys to get a better disk/index access pattern. For Ndb, we will send the keys in a large batch, saving latency. Batched Key Access is totally separate to AQL / SPJ which is Ndb only.


> The 3rd question is, how do you think about the outlined algorithm, how does mysql cluster's
> optimization implement it?

Hopefully answered above!

Thanks for the questions, nice to hear that you are looking at MySQL Cluster, Ndb + SPJ!

Suggest reading about our ordered indexes, as they would be more relevant in executing queries of the type you mention.

Also, the EXPLAIN and EXPLAIN EXTENDED and SHOW WARNINGS commands should give a first-step into looking at how MySQL Cluster executes particular queries.

Frazer

Frazer Clement said...

Here are some ageing slides describing some of the concepts above (batching, data locality, pruning, use of EXPLAIN etc).