tag:blogger.com,1999:blog-2987855187574329171.post2552179066895473968..comments2023-12-23T16:47:55.839+00:00Comments on message passing: Low latency distributed parallel joinsFrazer Clementhttp://www.blogger.com/profile/05435364450772586515noreply@blogger.comBlogger5125tag:blogger.com,1999:blog-2987855187574329171.post-23282116612645133292016-10-25T10:26:35.501+01:002016-10-25T10:26:35.501+01:00Here are some ageing slides describing some of the...Here are some ageing slides describing some of the concepts above (batching, data locality, pruning, use of EXPLAIN etc).<br /><br />Frazer Clementhttps://www.blogger.com/profile/05435364450772586515noreply@blogger.comtag:blogger.com,1999:blog-2987855187574329171.post-62661640052292225342016-10-25T10:23:34.479+01:002016-10-25T10:23:34.479+01:00> 3. For the above problem, I think batching is...> 3. For the above problem, I think batching is the way to go for optimization, so I imagine a<br />> algorithm somewhat like this:<br />> ------<br />> declare tmp_ret_table {userid primary key, city, age, ...}<br />> declare tmp_index_table { tbl_nodeid, city, age, userid }<br />> declare fetchfactor: some constant<br />> declare batched_ids: varchar<br />><br />> while total-rows in tmp_ret_table < n<br />> select top (n* fetchfactor) nodeid("tbl", city, age, userid), city, age, userid<br />> into tmp_index_table<br />> from tbl_index where userid > bound1 order by userid -- nodeid is a sql function<br />><br />> for each tbl_nodeid in tmp_index_table, generate the batched_ids, formated as (pk1, pk1...)<br />> insert into tmp_ret_table<br />> select userid, ... from node(nodeid).tbl where pk in batched_ids and search_predicate<br />> delete * from tmp_ret_table<br />><br />> if no more rows for all nodeid break<br />> while-end<br />><br />> trim extra rows (if > n) in tmp_ret_table<br />><br />> return tmp_ret_table<br />> -----<br /><br />Right, well that is kind of what happens, but the responsibility is split across layers, and there's no join required.<br /><br />1. Use an ordered index to find *batches of* candidate rows by predicate > bound1<br /> Fetch factor above is effectively the ndb_batch_size [session] variable.<br /><br />2. Use an always-local read to get the indexed data from the table tbl<br /> (Ordered indexes are always co-located with the indexed rows - each table fragment has a colocated index fragment indexing only the rows it contains)<br /><br />3. Use the data node filter executor to filter out non predicate matching rows in the data node, as far as possible<br /><br />4. Return candidate rows to MySQLD<br /><br />5. MySQLD applies 'full' filtering [which may reduce the candidates further]<br /><br />6. MySQLD outputs 1 row at a time until LIMIT hit. Once a batch is exhausted, another batch is requested from the data nodes.<br /><br />BTW, just to confuse things further, MySQLD supports a separate 'Batched Key Access' mechanism (on MyISAM, InnoDB, *) which works something like you describe, getting a batch of keys from table 1 to lookup in table 2 as an optimisation. This can also be used with Ndb, under the control of the Optimizer. IIRC, InnoDB will attempt to e.g. sort the batched keys to get a better disk/index access pattern. For Ndb, we will send the keys in a large batch, saving latency. Batched Key Access is totally separate to AQL / SPJ which is Ndb only.<br /><br /><br />> The 3rd question is, how do you think about the outlined algorithm, how does mysql cluster's<br />> optimization implement it?<br /><br />Hopefully answered above!<br /><br />Thanks for the questions, nice to hear that you are looking at MySQL Cluster, Ndb + SPJ!<br /><br />Suggest reading about our ordered indexes, as they would be more relevant in executing queries of the type you mention.<br /><br />Also, the EXPLAIN and EXPLAIN EXTENDED and SHOW WARNINGS commands should give a first-step into looking at how MySQL Cluster executes particular queries.<br /><br />FrazerFrazer Clementhttps://www.blogger.com/profile/05435364450772586515noreply@blogger.comtag:blogger.com,1999:blog-2987855187574329171.post-27597109252929531642016-10-25T10:22:49.689+01:002016-10-25T10:22:49.689+01:00> So my question is:
> 1. How do the two dif...> So my question is:<br />> 1. How do the two different forms of queries perform on a MySql Cluster? and which one will<br />> win or will they tie? Why?<br /><br />b1 not really used, as mentioned above.<br /><br />Perhaps your question is rather about how MySQL Cluster would handle it if you made the unique index table explicit like you have done above, and added ordered indexes to e.g. the UserId column etc...<br /><br />I guess I would first try to determine what the gain of b1 is over a1. Given that the secondary 'index' contains no data that is not already in the base table, all it provides are more efficient access methods for individual key values. As that is not required here, it seems like extra work for no gain, in which case I would expect it to be slower.<br /><br /><br />> 2. As seen from schema a and b, clearly the related data are not co-located, since, one<br />> partitioned by userid, the other by (cityid,age,userid), so, How will spj or AQL try to<br />> optimize for performace?<br /><br />The ordered indices in the schema would probably be more relevant to discuss here. I would expect that query a1 would be executed without using SPJ as it may just be a range scan on an ordered index on UserId with a pushed predicate. e.g. Scan the UserId ordered index on table tbl starting at bound1 and returning rows which pass [partial] predicate function x. <br /><br />The scan will probably run in parallel across all LDM instances holding an index fragment, as it will not be prunable. No joins are required. Each fragment scan will return a batch of rows. Perhaps more than top(n) will be returned to the MySQLD, but it can discard those it does not need. Only m * batchsize more rows than needed will be scanned where m = parallelism.<br /><br />More generally, SPJ/AQL will :<br /> 1. Consume the output of query planning (Ordered NLJ of tables/indices) with limited feedback into the process<br /> 2. Identify query fragments that can be pushed to the data nodes (perhaps the whole query)<br /> 3. Prepare these fragments<br /> 4. Execute these fragments as the standard NLJ executor iterates the NLJ.<br /><br />From the MySQLD point of view, it is standard NLJ (with MySQL enhancements for batched key access).<br /><br />On the Ndb side of things we will of course use batching, so when we are asked for the first row from a query fragment we will execute the pushed portion to produce *at least* 1 row. The extra rows are buffered in NdbApi until the MySQLD iterates onto them. It may never do this.<br /><br />So we will do batching, pushed, parallel joining etc when necessary. However there is currently not a lot of feedback from our SPJ to the MySQLD Query optimizer/planner about more or less efficient plans.<br /><br />And down in the data nodes, data locality is just a matter of latency. Both local and remote lookups are performed using asynchronous message passing, so there are no different code paths etc. Of course local lookups are more efficient and lower latency, but the code is agnostic.<br /><br />Hope this answers the question. Frazer Clementhttps://www.blogger.com/profile/05435364450772586515noreply@blogger.comtag:blogger.com,1999:blog-2987855187574329171.post-82603546648060241482016-10-25T10:21:17.675+01:002016-10-25T10:21:17.675+01:00Hi JX,
> “Where the dependent data happens to...Hi JX,<br /><br /><br />> “Where the dependent data happens to be on the same data node, the dependency can be resolved<br />> with no inter-process communication at all.", however this is often not the case, I mean, the<br />> data is not co-located. Then the performance will be bad?<br /><br />Well inter-process communication will be required, which will increase latency, response time etc. However throughput need not be linearly affected if there are parallel queries etc.<br /><br /><br />> I have a few questions. In the era of web app, it is quite common to have pattern like:<br />> select top (n) x, ... from tbl where search_predicate and x > bound1 order by x,<br />> where x can be from a small set, e.g. {[UserID], [city, age], ...}. This query will provide<br />> the base data for paged search result sorted by some field choosen ad hoc by users from a<br />> small set.<br />><br />> This will in turn require a schema like:<br />> a: create tabe tbl { userid, city, age, ... primary key (city, age, userid), unique key<br />> (userid) } engine ndb,<br />> Now since this is on NDB engine, the above is actually equivalent to:<br />> b: create tabe tbl { userid, city, age, ... primary key (city, age, userid)}<br />> create table tbl_index { userid, city, age, primary key (userid)}<br />> (see: http://dev.mysql.com/doc/refman/5.7/en/mysql-cluster-ndbd-definition.html, unique<br />> constraint is implemented as separate table)<br /><br />Yes<br /><br /><br />> Now the query will have two forms correspondingly:<br />><br />> a1:<br />> select top (n) userid, ... from tbl where search_predicate and userid > bound1 order by<br />> userid,<br />><br />> b1:<br />> select top (n) userid, ... from tbl_index, tbl on tbl_index.userid = tbl.user<br />> where search_predicate and userid > bound1 order by userid<br />><br /><br />We really only 'join' with the tbl_index when we are using it to access the table (tbl). We only use the tbl_index to access the table (tbl) when we have a single (or small set) of unique index values to lookup. In this case we have an inequality on the userid (userid > bound1), so we would rather use an *Ordered Index*. Ordered indices are always defined on the main table (tbl) - never on a secondary unique index table.<br /><br />So I don't think b1 would occur using MySQLD.<br /><br /><br />To be continued...Frazer Clementhttps://www.blogger.com/profile/05435364450772586515noreply@blogger.comtag:blogger.com,1999:blog-2987855187574329171.post-77492812296183062952016-10-18T15:05:28.831+01:002016-10-18T15:05:28.831+01:00Hi,
“Where the dependent data happens to be on th...Hi,<br /><br />“Where the dependent data happens to be on the same data node, the dependency can be resolved with no inter-process communication at all.", however this is often not the case, I mean, the data is not co-located. Then the performance will be bad?<br /><br />I have a few questions. In the era of web app, it is quite common to have pattern like:<br />select top (n) x, ... from tbl where search_predicate and x > bound1 order by x,<br />where x can be from a small set, e.g. {[UserID], [city, age], ...}. This query will provide the base data for paged search result sorted by some field choosen ad hoc by users from a small set.<br /><br />This will in turn require a schema like: <br />a: create tabe tbl { userid, city, age, ... primary key (city, age, userid), unique key (userid) } engine ndb,<br />Now since this is on NDB engine, the above is actually equivalent to:<br />b: create tabe tbl { userid, city, age, ... primary key (city, age, userid)}<br /> create table tbl_index { userid, city, age, primary key (userid)}<br />(see: http://dev.mysql.com/doc/refman/5.7/en/mysql-cluster-ndbd-definition.html, unique constraint is implemented as separate table)<br /><br />Now the query will have two forms correspondingly:<br /><br />a1:<br />select top (n) userid, ... from tbl where search_predicate and userid > bound1 order by userid,<br /><br />b1:<br />select top (n) userid, ... from tbl_index, tbl on tbl_index.userid = tbl.user <br /> where search_predicate and userid > bound1 order by userid<br /><br />So my question is:<br />1. How do the two different forms of queries perform on a MySql Cluster? and which one will win or will they tie? Why?<br />2. As seen from schema a and b, clearly the related data are not co-located, since, one partitioned by userid, the other by (cityid,age,userid), so, How will spj or AQL try to optimize for performace?<br />3. For the above problem, I think batching is the way to go for optimization, so I imagine a algorithm somewhat like this:<br />------<br /> declare tmp_ret_table {userid primary key, city, age, ...}<br /> declare tmp_index_table { tbl_nodeid, city, age, userid }<br /> declare fetchfactor: some constant<br /> declare batched_ids: varchar<br /><br /> while total-rows in tmp_ret_table < n <br /> select top (n* fetchfactor) nodeid("tbl", city, age, userid), city, age, userid<br /> into tmp_index_table <br /> from tbl_index where userid > bound1 order by userid -- nodeid is a sql function<br /><br /> for each tbl_nodeid in tmp_index_table, generate the batched_ids, formated as (pk1, pk1...)<br /> insert into tmp_ret_table<br /> select userid, ... from node(nodeid).tbl where pk in batched_ids and search_predicate<br /> delete * from tmp_ret_table<br /><br /> if no more rows for all nodeid break<br /> while-end<br /><br /> trim extra rows (if > n) in tmp_ret_table<br /><br /> return tmp_ret_table<br />-----<br />The 3rd question is, how do you think about the outlined algorithm, how does mysql cluster's optimization implement it?<br /><br />Thanks, JX<br />Anonymoushttps://www.blogger.com/profile/06166539402064187325noreply@blogger.com