Mycat接收请求(2) - 握手认证

建立连接,三次握手

这里其实是tcp协议的内容,在这里我们用wireshark抓到的数据包来回忆一下.

SYN

从Flag可以看出,这里发送的是一个SYN数据包,传输的序列号是3970234579
1

SYN,ACK

从Flag可以看出,这是由服务器端返回给客户端的SYN/ACK包,其中ACK号是3970234580(3970234579+1); 并且发送了序列号:3742881048
2

ACK

从Flag可以看出,这是有客户端发送给服务端的ACK包,其中序列号是3970234580(SYN/ACK发送的ACK号),ACK号是3742881049(3742881048+1)
3

握手认证阶段

mycat的握手认证与mysql一致,握手认证阶段为客户端与服务器建立连接后进行,交互过程如下:
服务器 -> 客户端:握手初始化消息
客户端 -> 服务器:登陆认证消息
服务器 -> 客户端:认证结果消息
4

从下图抓到数据包的顺序可以看出,在握手之后(前三个包),服务器端先发给客户端了一个握手初始化报文(第四个数据包),为什么要服务端先发送一个初始化,有什么作用?
5

我们查看Mycat的代码可以找到部分答案。在“处理连接”的章节中,我们看到accept的过程是异步的,先放到一个queue中,然后在wakeup Selector,在主循环中调用register方法,避免加锁。register的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void register(Selector selector) {
AbstractConnection c = null;
if (registerQueue.isEmpty()) {
return;
}
while ((c = registerQueue.poll()) != null) {
try {
//注册读
((NIOSocketWR) c.getSocketWR()).register(selector);
//向客户端发送握手初始化
c.register();
} catch (Exception e) {
LOGGER.warn("register error ", e);
c.close("register err");
}
}
}

FrontendConnection的register方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void register() throws IOException {
if (!isClosed.get()) {
// 生成认证数据
byte[] rand1 = RandomUtil.randomBytes(8);
byte[] rand2 = RandomUtil.randomBytes(12);
// 保存认证数据
byte[] seed = new byte[rand1.length + rand2.length];
System.arraycopy(rand1, 0, seed, 0, rand1.length);
System.arraycopy(rand2, 0, seed, rand1.length, rand2.length);
this.seed = seed;
// 发送握手数据包
HandshakePacket hs = new HandshakePacket();
hs.packetId = 0;
hs.protocolVersion = Versions.PROTOCOL_VERSION;
hs.serverVersion = Versions.SERVER_VERSION;
hs.threadId = id;
hs.seed = rand1;
hs.serverCapabilities = getServerCapabilities();
hs.serverCharsetIndex = (byte) (charsetIndex & 0xff);
hs.serverStatus = 2;
hs.restOfScrambleBuff = rand2;
//把构造好的握手初始化包发送给客户端
hs.write(this);
// asynread response, 如何保证这里一定可以读到?为什么不等这Reactor循环中的asynRead处理?注释掉可以吗?
//经过实验,注释后完全不影响,那为什么要读一次呢?
this.asynRead();
}
}

构造好HandShakePacket以后,通过hs.write(this)发送给客户端。发送数据的方法是AbstractConnection的write方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public final void write(ByteBuffer buffer) {
if(isSupportCompress())
{
ByteBuffer newBuffer= CompressUtil.compressMysqlPacket(buffer,this,compressUnfinishedDataQueue);
writeQueue.offer(newBuffer);
} else
{
//把buffer放进写队列
writeQueue.offer(buffer);
}
// if ansyn write finish event got lock before me ,then writing
// flag is set false but not start a write request
// so we check again
try {
this.socketWR.doNextWriteCheck();
} catch (Exception e) {
LOGGER.warn("write err:", e);
this.close("write err:" + e);
}
}
public void doNextWriteCheck() {
if (!writing.compareAndSet(false, true)) {
return;
}
try {
//尝试写操作
boolean noMoreData = write0();
writing.set(false);
//如果写成完了,则取消关注OP_WRITE
if (noMoreData && con.writeQueue.isEmpty()) {
if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) != 0)) {
disableWrite();
}
} else {
//如果没写完,则注册OP_WRITE,等主线程循环时再写
if ((processKey.isValid() && (processKey.interestOps() & SelectionKey.OP_WRITE) == 0)) {
enableWrite(false);
}
}
} catch (IOException e) {
if (AbstractConnection.LOGGER.isDebugEnabled()) {
AbstractConnection.LOGGER.debug("caught err:", e);
}
con.close("err:" + e);
}
}
private boolean write0() throws IOException {
int written = 0;
ByteBuffer buffer = con.writeBuffer;
//什么情况下buffer!=null?con.writeBuffer何时赋值?
//应该是written = channel.write(buffer)写,written <= 0时,赋值给writeBuffer,下次有机会再写
if (buffer != null) {
while (buffer.hasRemaining()) {
written = channel.write(buffer);
if (written > 0) {
con.netOutBytes += written;
con.processor.addNetOutBytes(written);
con.lastWriteTime = TimeUtil.currentTimeMillis();
} else {
break;
}
}
//之前的buffer还是没有写完,退出
if (buffer.hasRemaining()) {
con.writeAttempts++;
return false;
} else {
con.writeBuffer = null;
con.recycle(buffer);
}
}
//writeBuffer为null,所以从writeQueue获取一个buffer
while ((buffer = con.writeQueue.poll()) != null) {
if (buffer.limit() == 0) {
con.recycle(buffer);
con.close("quit send");
return true;
}
//开始写
buffer.flip();
while (buffer.hasRemaining()) {
written = channel.write(buffer);
if (written > 0) {
con.lastWriteTime = TimeUtil.currentTimeMillis();
con.netOutBytes += written;
con.processor.addNetOutBytes(written);
con.lastWriteTime = TimeUtil.currentTimeMillis();
} else {
break;
}
}
//可能因为网络繁忙等原因,没有写成功,把buffer赋值给writeBuffer
if (buffer.hasRemaining()) {
con.writeBuffer = buffer;
con.writeAttempts++;
return false;
} else {
con.recycle(buffer);
}
}
return true;
}

  • OP_WRITE事件的注册放在NIOSocketWR.doNextWriteCheck()函数中,doNextWriteCheck既被selector线程调用,也会被其它的业务线程调用,此时就会存在lock竞争的问题,所以对于 OP_WRITE事件也建议用队列缓存的方式,不过对于MyCAT的流量场景,大部分写操作是由业务线程直接写入,只有在网络繁忙时,业务线程不能一次全部写完,才会通过OP_WRITE注册方式进行候补写。哪样性能更好?为什么?可否统一使用异步写?
  • 先判断是否正在写,如果正在写,退出(之前已经把写内容放到缓冲队列,那么此处是否可以优化呢,即当发送缓冲队列为 空的时候,可以直接往channel写数据,不能写再放缓冲队列,理论上可以优化,但是写代码时要注意,因为必需要保证协议 包的顺序,还要考虑到前一次写时,是否有buffer没有写完,若前一次写入时,最后一个buffer没有写完,记得退回缓冲队 列;MyCAT当前的实现方式是增加了一个变量专门存放上次未写完的buffer)
  • write0()方法是只要buffer中还有,就不停写入;直到写完所有buffer,或者写入时,返回写入字节为零,表示网络繁忙,就 回临时退出写操作。
  • 没有完全写入并且缓冲队列为空,取消注册写事件
  • 没有完全写入或者缓冲队列有代写对象,继续注册写时间
  • 特别说明,writing.set(false)必须要在boolean noMoreData = write0()之后和if (noMoreData &&
    con.writeQueue.isEmpty())之前,否则会导致当网络流量较低时,消息包缓存在内存中迟迟发不出去的现象。

登陆认证交互报文

握手初始化报文(服务器 -> 客户端)

6

服务协议版本号:该值由 PROTOCOL_VERSION 宏定义决定(参考MySQL源代码/include/mysql_version.h头文件定义)

服务版本信息:该值为字符串,由 MYSQL_SERVER_VERSION 宏定义决定(参考MySQL源代码/include/mysql_version.h头文件定义)

服务器线程ID:服务器为当前连接所创建的线程ID。

挑战随机数:MySQL数据库用户认证采用的是挑战/应答的方式,服务器生成该挑战数并发送给客户端,由客户端进行处理并返回相应结果,然后服务器检查是否与预期的结果相同,从而完成用户认证的过程。

服务器权能标志:用于与客户端协商通讯方式,各标志位含义如下(参考MySQL源代码/include/mysql_com.h中的宏定义):
7

字符编码:标识服务器所使用的字符集。

服务器状态:状态值定义如下(参考MySQL源代码/include/mysql_com.h中的宏定义):
8

登陆认证报文(客户端 -> 服务器)

9
10

客户端在接收了服务器发送的握手初始化请求以后,向服务器端发送认证报文。前面接收消息的过程都一样,在FrontendConnection的rawHandle方法中,调用handler.handle(data)处理,而handlerFrontendAuthenticator.
FrontendAuthenticator的handle方法代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void handle(byte[] data) {
// check quit packet
if (data.length == QuitPacket.QUIT.length && data[4] == MySQLPacket.COM_QUIT) {
source.close("quit packet");
return;
}
//根据客户端发送过来的数据生成AuthPacket对象,包含了认证信息
AuthPacket auth = new AuthPacket();
auth.read(data);
// check user,检查User是否存在
if (!checkUser(auth.user, source.getHost())) {
failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "'");
return;
}
// check password,检查密码是否正确
if (!checkPassword(auth.password, auth.user)) {
failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "'");
return;
}
// check schema,查看思念是否有权限
switch (checkSchema(auth.database, auth.user)) {
case ErrorCode.ER_BAD_DB_ERROR:
failure(ErrorCode.ER_BAD_DB_ERROR, "Unknown database '" + auth.database + "'");
break;
case ErrorCode.ER_DBACCESS_DENIED_ERROR:
String s = "Access denied for user '" + auth.user + "' to database '" + auth.database + "'";
failure(ErrorCode.ER_DBACCESS_DENIED_ERROR, s);
break;
default:
//
success(auth);
}
}

认证结果

通过source.write(source.writeToBuffer(AUTH_OK, buffer));把AUTH_OK发送回了客户端,AUTH_OK是private static final byte[] AUTH_OK = new byte[] { 7, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0 };