Mycat接收请求(3) - 处理命令

协议

客户端认证成功后,会进入命令执行阶段,交互过程如下:
客户端 -> 服务器:执行命令消息
服务器 -> 客户端:命令执行结果
1

报文结构

报文分为消息头和消息体两部分,其中消息头占用固定的4个字节,消息体长度由消息头中的长度字段决定。

Mysql命令

示例:show database命令

请求是前端发给Server的,所以由FrontendConnection接收请求,读取数据以后,把消息放入一个byte[] data中,然后由FrontendConnectionrawHandle方法处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void rawHandle(final byte[] data) {
//load data infile 客户端会发空包 长度为4
if(data.length==4&&data[0]==0&&data[1]==0&&data[2]==0)
{
//load in data空包
loadDataInfileEnd(data[3]);
return;
}
if (data.length>4&&data[4] == MySQLPacket.COM_QUIT) {
this.getProcessor().getCommands().doQuit();
this.close("quit cmd");
return;
}
handler.handle(data);
}
public void handle(byte[] data)
{
//data[4]是消息体的第一个字节,
switch (data[4])
{
case MySQLPacket.COM_INIT_DB:
commands.doInitDB();
source.initDB(data);
break;
case MySQLPacket.COM_QUERY:
commands.doQuery(); //CommandCount中Query的计数器加1
source.query(data);
break;
case MySQLPacket.COM_PING:
commands.doPing();
source.ping();
break;
.....
}

消息体,分为命令和参数两部分,如下图所示:
2
命令是第一个字节,部分命令的值如下所示:
3

show database属于query,所以调用source.query(data);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void query(byte[] data) {
if (queryHandler != null) {
// 取得语句
MySQLMessage mm = new MySQLMessage(data);
mm.position(5); //query的前四个字节是信息头,第五个字节是命令,所以要从第六个自己读
String sql = null;
try {
sql = mm.readString(charset); //取得Sql
} catch (UnsupportedEncodingException e) {
writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET,
"Unknown charset '" + charset + "'");
return;
}
if (sql == null || sql.length() == 0) {
writeErrMessage(ErrorCode.ER_NOT_ALLOWED_COMMAND, "Empty SQL");
return;
}
// remove last ';'
if (sql.endsWith(";")) {
sql = sql.substring(0, sql.length() - 1);
}
// 执行查询
// 设置是否只读
queryHandler.setReadOnly(privileges.isReadOnly(user));
// 最后调用了ServerQueryHandler的query(sql)方法处理sql
queryHandler.query(sql);
} else {
writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,
"Query unsupported!");
}
}

ServerQueryHandler.handler(String sql)

1
2
3
4
5
6
7
ServerConnection c = this.source;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(new StringBuilder().append(c).append(sql).toString());
}
//解析出sql的类型,例如Show,select等
int rs = ServerParse.parse(sql);
int sqlType = rs & 0xff;

ServerParse.parse方法是对sql进行词法分析,然后判断Sql的类型,sql的类型如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static final int OTHER = -1;
public static final int BEGIN = 1;
public static final int COMMIT = 2;
public static final int DELETE = 3;
public static final int INSERT = 4;
public static final int REPLACE = 5;
public static final int ROLLBACK = 6;
public static final int SELECT = 7;
public static final int SET = 8;
public static final int SHOW = 9; //show语句
public static final int START = 10;
public static final int UPDATE = 11;
public static final int KILL = 12;
public static final int SAVEPOINT = 13;
public static final int USE = 14;
public static final int EXPLAIN = 15;
public static final int KILL_QUERY = 16;
public static final int HELP = 17;
public static final int MYSQL_CMD_COMMENT = 18;
public static final int MYSQL_COMMENT = 19;
public static final int CALL = 20;
public static final int DESCRIBE = 21;
public static final int LOAD_DATA_INFILE_SQL = 99;

获取到sql类型以后,不同的类型会调用不同的handler进行处理,就会调用ShowHandler.handle(sql, c, rs >>> 8);, ShowHandler会继续判断show语句的类型,如果是show databases, 则会调用ShowDatabases.response(c);进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public static void handle(String stmt, ServerConnection c, int offset) {
// 排除 “ ` ” 符号
stmt = StringUtil.replaceChars(stmt, "`", null);
int type = ServerParseShow.parse(stmt, offset);
switch (type) {
case ServerParseShow.DATABASES:
ShowDatabases.response(c);
break;
case ServerParseShow.TABLES:
ShowTables.response(c, stmt,type);
break;
case ServerParseShow.MYCAT_STATUS:
ShowMyCatStatus.response(c);
break;
case ServerParseShow.MYCAT_CLUSTER:
ShowMyCATCluster.response(c);
break;
default:
c.execute(stmt, ServerParse.SHOW);
}
}
public static void response(ServerConnection c) {
ByteBuffer buffer = c.allocate();
// write header
buffer = header.write(buffer, c,true);
// write fields
for (FieldPacket field : fields) {
buffer = field.write(buffer, c,true);
}
// write eof
buffer = eof.write(buffer, c,true);
// write rows
byte packetId = eof.packetId;
MycatConfig conf = MycatServer.getInstance().getConfig();
Map<String, UserConfig> users = conf.getUsers();
UserConfig user = users == null ? null : users.get(c.getUser());
if (user != null) {
TreeSet<String> schemaSet = new TreeSet<String>();
Set<String> schemaList = user.getSchemas();
if (schemaList == null || schemaList.size() == 0) {
schemaSet.addAll(conf.getSchemas().keySet());
} else {
for (String schema : schemaList) {
schemaSet.add(schema);
}
}
for (String name : schemaSet) {
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(StringUtil.encode(name, c.getCharset()));
row.packetId = ++packetId;
buffer = row.write(buffer, c,true);
}
}
// write last eof
EOFPacket lastEof = new EOFPacket();
lastEof.packetId = ++packetId;
buffer = lastEof.write(buffer, c,true);
// post write
c.write(buffer);
}

select * from travelrecord

前面的处理流程都与show databases;相同,不同的是使用SelectHandler处理。select语句同样有一些类型:

1
2
3
4
5
6
7
8
9
public static final int OTHER = -1;
public static final int VERSION_COMMENT = 1;
public static final int DATABASE = 2;
public static final int USER = 3;
public static final int LAST_INSERT_ID = 4;
public static final int IDENTITY = 5;
public static final int VERSION = 6;
public static final int SESSION_INCREMENT = 7;
public static final int SESSION_ISOLATION = 8;

select * from travelrecord不属于以上类型,所以会调用c.execute(stmt, ServerParse.SELECT);