mycat分布式mysql中间件(分片join)

 传统的分片策略都是基于单表,或者分片基于主键进行分配,或者某些场景下需要多个表依赖于一个分片,或者分片的字段并不是主键。

     a.  对于传统的数据库分片方式都是基于单个表格,对于表关联这种操作,则很难处理。为了能够执行t_usert_user_detail的联合查询, MyCAT借鉴了NewSQL领域的新秀Foundation DB的设计思路,Foundation DB创新性的提出了Table Group的概念,其将子表的存储位置依赖于主表,并且物理上紧邻存放,因此彻底解决了JOIN的效率和性能问题,根据这一思路,提出了基于E-R关系的数据分片策略,子表的记录与所关联的父表记录存放在同一个数据分片上。

t_usert_user_detail例子为例,schema.xml中定义如下的分片配置:

<table name="t_user" dataNode="dn$1-32" rule="mod-long">
    
<childTable name="t_user_detail" primaryKey="id" joinKey="user_id" parentKey="user_id" />
</table>

t_user采用mod-long这个分片策略,分片在dn1-dn32上,t_user_detail依赖父表进行分片,两个表的关联关系为t_user_detail.user_id=t_user.id。于是数据分片和存储的示意图如下:

这样一来,分片dn1-32上的t_userhn1-32上的t_user_detail就可以进行局部的JOIN联合,再合并两个节点的数据即可完成整体的JOIN,试想一下,每个分片上t_user_detail表有1000万条,则10个分片就有1个亿,基于E-R映射的数据分片模式,基本上解决了80%以上的企业应用所面临的问题。

 b. 

多对多的表格如何处理?多对多的表格通常情况下,有以下几种:

l  主表+关系表+字典表

l  主表A+关系表+主表B

对于第一种,字典表可以被定义为“全局表”,字典表的记录规模可以在几千到几十万之间,基本是变动比较少的表,由MyCAT自动实时同步到所有分片,这样就可以三个表都做JOIN操作了。

对于第二种,需要从业务角度来看,关系表更偏向哪个表,即“A的关系”还是“B的关系”,来决定关系表跟从那个方向存储。目前还暂时无法很好支持这种模式下的3个表之间的关联。未来版本中将考虑将中间表进行双向复制,以实现从A-关系表 以及B-关系表的双向关联查询。

关于全局表的实现方式,全局表在数据插入或更新的时候,会自动在全局表定义的所有数据节点上执行相同的操作,以保证所有数据节点都一致,由于这个特性,全局表可以跟任何分片或不分片的表格进行JOIN操作。对数据更新不频繁的,规模不是很大的(100万之内)的表都可以定义为MyCAT的全局表,以实现用存储换性能的目标。

配置为:


<table name="t_area" primaryKey="id" type="global" dataNode="dn1,dn2" />


c.  主键分片vs 非主键分片


当你没人任何字段可以作为分片字段的时候,主键分片就是唯一选择,其优点是按照主键的查询最快,当采用自动增长的序列号作为主键时,还能比较均匀的将数据分片在不同的节点上。

若有某个合适的业务字段比较合适作为分片字段,则建议采用此业务字段分片,选择分片字段的条件如下:

  • 尽可能的比较均匀分布数据到各个节点上;
  • 该业务字段是最频繁的或者最重要的查询条件。

常见的除了主键之外的其他可能分片字段有“订单创建时间”、“店铺类别”或“所在省”等。当你找到某个合适的业务字段作为分片字段以后,不必纠结于“牺牲了按主键查询记录的性能”,因为在这种情况下,MyCAT提供了“主键到分片”的内存缓存机制,热点数据按照主键查询,丝毫不损失性能。做法如下:

<table name="t_user" primaryKey="user_id" dataNode="dn$1-32" rule="mod-long">
<childTable name="t_user_detail" primaryKey="id" joinKey="user_id" parentKey="user_id" />
</table>

对于非主键分片的table,填写属性primaryKey,此时MyCAT会将你根据主键查询的SQL语句的第一次执行结果进行分析,确定该Table 的某个主键在什么分片上,并进行主键到分片ID的缓存。

     第二次或后续查询mycat会优先从缓存中查询是否有id-->node  即主键到分片的映射,如果有直接查询,通过此种方法提高了非主键分片的查询性能。

d.分片join


     不管是按照何种规则分片数据的join都是分布式系统难题,mycat提供了几种方式:

     1. 如果是全局表,分片内部的表相关与全局表join,分片内部会使用分片内部全局表join业务表,方式跨分片join

     2. E-R 关系的分片表,同样也只会发生分片内部join 即 父表join子表,也不会跨分片

     3. catlet  既不是全局不又不是E-R关系,mycat提供了人工智能分片join即,通过程序编程的方式,通过拦截sql语句,将多个表的join分拆成多个子select,

        然后再join,这种方式需要开发支持,编写java代码作为插件,此种方法的优点是无需修改应用代码,只需要拦截对应的sql做处理即可。

     4. 目前最新版mycat引入了分片join机制,即通过在查询时刻拉取join表的数据,同步到mycat本地,导入到NoSql数据库中,再做join order limit,

        目前此种方式为开发的最新方式,可以支持2个表夸分片join,无需特殊配置。


join:


1       ShareJoin

   ShareJoin是一个简单的跨分片Join,基于HBT的方式实现。

目前支持2个表的join,原理就是解析SQL语句,拆分成单表的SQL语句执行,然后把各个节点的数据汇集。

支持任意配置的A,B

如:

A,BdataNode相同

<table name="A" dataNode="dn1,dn2,dn3" rule="auto-sharding-long" />

<table name="B" dataNode="dn1,dn2,dn3" rule="auto-sharding-long" />

A,BdataNode不同

<table name="A" dataNode="dn1,dn2 " rule="auto-sharding-long" />

<table name="B" dataNode="dn1,dn2,dn3" rule="auto-sharding-long" />

<table name="A" dataNode="dn1 " rule="auto-sharding-long" />

<table name="B" dataNode=" dn2,dn3" rule="auto-sharding-long" />

1.1     相关类图


JoinParser: SQL语句的解析

TableFilter:存解析后的各个子表

ShareJoin:执行拆分的语句管理控制,和字段,记录的管理

ShareDBJoinHandler:第一个表执行后获取数据的handler

ShareRowOutPutDataHandler:最后一个表执行后获取数据的handler

 

EnginerCtx:执行引擎

SQLJob:SQL语句执行任务

SQLJobHandler:SQL语句执行后获取数据的handler

BatchSQLJob:批量执行任务控制

AllJobFinishedListener:所有任务完成侦听器

 

1.2     测试

默认mycat的环境测试:

 

/*!mycat:catlet=demo.catlets.ShareJoin */ select a.*,b.id, b.name as tit from customer a,company b on a.company_id=b.id;

 

 

/*!mycat:catlet=demo.catlets.ShareJoin */ select a.*,b.id, b.name as name from orders a join customer b where a.customer_id=b.id;

/*!mycat:catlet=demo.catlets.ShareJoin */ select a.*,b.* from orders a join customer b where a.customer_id=b.id;

 

 

/*!mycat:catlet=demo.catlets.ShareJoin */ select a.id,a.user_id,a.traveldate,a.fee,a.days,b.id as nnid, b.title as tit from travelrecord  a  join  hotnews b on b.id=a.days order by a.id ;

 

1.3     升级

未来支持多表的跨分片Join

小表放人缓存或广播方式

 

2       全局表

一个真实的业务系统中,往往存在大量的类似字典表的表格,它们与业务表之间可能有关系,这种关系,可以理解为“标签”,而不应理解为通常的“主从关系”,这些表基本上很少变动,可以根据主键ID进行缓存,下面这张图说明了一个典型的“标签关系”图:




         在分片的情况下,当业务表因为规模而进行分片以后,业务表与这些附属的字典表之间的关联,就成了比较棘手的问题,考虑到字典表具有以下几个特性:

  • 变动不频繁
  • 数据量总体变化不大
  • 数据规模不大,很少有超过数十万条记录。

鉴于此,MyCAT定义了一种特殊的表,称之为“全局表”,全局表具有以下特性:

  • 全局表的插入、更新操作会实时在所有节点上执行,保持各个分片的数据一致性
  • 全局表的查询操作,只从一个节点获取
  • 全局表可以跟任何一个表进行JOIN操作

将字典表或者符合字典表特性的一些表定义为全局表,则从另外一个方面,很好的解决了数据JOIN的难题。通过全局表+基于E-R关系的分片策略,MyCAT可以满足80%以上的企业应用开发。

 

2.1     配置

全局表配置比较简单,不用写Rule规则,如下配置即可:

<table name="company" primaryKey="ID" type="global" dataNode="dn1,dn2,dn3" />

需要注意的是,全局表每个分片节点上都要有运行创建表的DDL语句。

 

3       ER分片

MyCAT借鉴了NewSQL领域的新秀Foundation DB的设计思路,Foundation DB创新性的提出了Table Group的概念,其将子表的存储位置依赖于主表,并且物理上紧邻存放,因此彻底解决了JION的效率和性能问题,根据这一思路,提出了基于E-R关系的数据分片策略,子表的记录与所关联的父表记录存放在同一个数据分片上。

customer采用sharding-by-intfile这个分片策略,分片在dn1,dn2上,orders依赖父表进行分片,两个表的关联关系为orders.customer_id=customer.id。于是数据分片和存储的示意图如下:


这样一来,分片Dn1上的的customerDn1上的orders就可以进行局部的JOIN联合,Dn2上也如此,再合并两个节点的数据即可完成整体的JOIN,试想一下,每个分片上orders表有100万条,则10个分片就有1个亿,基于E-R映射的数据分片模式,基本上解决了80%以上的企业应用所面临的问题。


3.1     配置

以上述例子为例,schema.xml中定义如下的分片配置:

<table name="customer" dataNode="dn1,dn2" rule="sharding-by-intfile">

<childTable name="orders"  joinKey="customer_id" parentKey="id"/>

</table>

 

4       HBT分片

解决跨分片的SQL JOIN的问题,远比想象的复杂,而且往往无法实现高效的处理,既然如此,就依靠人工的智力,去编程解决业务系统中特定几个必须跨分片的SQLJOIN逻辑,MyCAT提供特定的API供程序员调用,这就是MyCAT创新性的思路——人工智能。

以一个跨节点的SQL为例,

Select a.id,a.name,b.title from a,b where a.id=b.id

         其中a在分片123上,b456上,需要把数据全部拉到本地(MyCAT服务器),执行JOIN逻辑,具体过程如下(只是一种可能的执行逻辑):

 

EngineCtx ctx=new EngineCtx();//包含SQLEngine
String sql=,“select a.id ,a.name from a ”;
//在a表所在的所有分片上顺序执行下面的本地SQL
 
ctx.executeNativeSQLSequnceJob(allAnodes,new DirectDBJoinHandler());
DirectDBJoinHandler类是一个回调类,负责处理SQL执行过程中返回的数据包,这里的这个类,主要目的是用a表返回的ID信息,去b表上查询对于的记录,做实时的关联:
 
DirectDBJoinHandler{
  Private HashMap<byte[],byte[]> rows;//Key为id,value为一行记录的Column原始Byte数组,这里是a.id,a.name,b.title这三个要输出的字段
   Public Boolean onHeader(byte[] header)
{ 
//保存Header信息,用于从Row中获取Field字段值
}
   Public Boolean onRowData(byte[] rowData)
{
    String id=getColumnAsString(“id”);
//放入结果集,b.title字段未知,所以先空着
rows.put(getColumnRawBytes(“id”),rowData);
  //满1000条,发送一个查询请求
String sql=”select b.id, b.name  from b where id in (………….)”;
 
//此SQL在B的所有节点上并发执行,返回的结果直接输出到客户端
 ctx.executeNativeSQLParallJob(allBNodes,sql ,new MyRowOutPutDataHandler(rows));
 
}
   Public Boolean onRowFinished()
  {
 }
Public void onJobFinished()
 {
If(ctx.allJobFinished())
     {///used total time ….
 
     }
}
}
/最后,增加一个Job事件监听器,这里是所有Job完成后,往客户端发送RowEnd包,结束整个流程。
ctx.setJobEventListener(new JobEventHandler(){public void onJobFinished(){ client.writeRowEndPackage()}});

 

以上提供一个SQL执行框架,完全是异步的模式执行,并且以后会提供更多高质量的API,简化分布式数据处理,比如内存结合文件的数据JOIN算法,分组算法,排序算法等等,

期待更多的牛人一起来完善。

 

4.1     配置


  1. 全局表为什么有些节点没有数据