Mycat接收请求(1) - 处理连接

NIOAcceptor接收请求

连接请求首先会通过NIOAccpetor,通过accept方法,accept方法得到一个SocketChannel后,会调用FrontendConnection c = factory.make(channel);的到一个FrontendConnection, 这里的factory实际上是ServerConnectionFactory。随后,还会给connetion设置一个NIOprocessor属性NIOProcessor processor = (NIOProcessor) MycatServer.getInstance().nextProcessor();
接着,在ReactorPool中选择一个Reactor,然后调用postRegister方法。每个Reactor都拥有一个ConcurrentLinkedQueue register做为等待注册的队列,postRegister方法首先把Connection添加到queue中,然后调用selector.wakeup唤醒阻塞中的Selector,在NIOReactor线程中调用register方法进行真正的注册,避免加锁。

FrontendConnection

FrontendConnection封装了Mycat接收客户端请求的连接,有两个子类,分别是ServerConnectionManagerConnection,我们着重需要看的是ServerConnection

NIOReactor

wakeup以后就进入register方法,register方法会从队列中取出Connection,然后在Selector注册,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
try {
selector.select(500L);
register(selector);//wakeup以后就进入register方法
keys = selector.selectedKeys();
for (SelectionKey key : keys) {
AbstractConnection con = null;
......
//
private void register(Selector selector) {
AbstractConnection c = null;
if (registerQueue.isEmpty()) {
return;
}
while ((c = registerQueue.poll()) != null) {
try {
((NIOSocketWR) c.getSocketWR()).register(selector);//向Selector注册channle和SelectionKey.OP_READ
c.register();//如果是FrontConnection,会发送认证数据,并且读取response
} catch (Exception e) {
LOGGER.warn("register error ", e);
c.close("register err");
}
}
}

在NIOReactor中等待并响应read事件,利用con.asynRead()处理。con.asynRead()最终还是调用socketWR的asynRead方法处理,asynRead有aio和nio两种实现,目前我们只看nio的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Object att = key.attachment();
if (att != null && key.isValid()) {
con = (AbstractConnection) att;
if (key.isReadable()) {
try {
//connction异步读取数据
con.asynRead();
} catch (IOException e) {
LOGGER.warn("caught err:", e);
con.close("program err:" + e.toString());
} catch (Exception e) {
con.close("program err:" + e.toString());
}
}
if (key.isWritable()) {
con.doNextWriteCheck();
}
} else {
key.cancel();
}

NIOSocketWR

首先获取到一个theBuffer, 把con.readBuffertheBuffer指向同一个变量,con.readBuffer = theBuffer, 然后从channel中读取数据放到con.readBuffer中,然后,调用AbstarctConnection的onReadData方法。疑问:1. 为什么需要theBuffer,而不是直接向con.readBuffer赋值?

1
2
3
4
5
6
7
8
9
10
@Override
public void asynRead() throws IOException {
ByteBuffer theBuffer = con.readBuffer;
if (theBuffer == null) {
theBuffer = con.processor.getBufferPool().allocate();
con.readBuffer = theBuffer;
}
int got = channel.read(theBuffer);
con.onReadData(got);
}

AbstractConnection

onReadData方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public void onReadData(int got) throws IOException {
if (isClosed.get()) {
return;
}
ByteBuffer buffer = this.readBuffer;
lastReadTime = TimeUtil.currentTimeMillis();
if (got < 0) { //数据读取到end-of-stream
this.close("stream closed");
} else if (got == 0) {
if (!this.channel.isOpen()) {
this.close("socket closed");
return;
}
}
netInBytes += got;
processor.addNetInBytes(got);
// 循环处理字节信息
// position保存了当前的位置,后面有可能会修改position,所以先保存然后用buffer.position(position)重置position,才能够整体读取
int offset = readBufferOffset, length = 0, position = buffer.position();
for (;;) {
//获取到消息的整体长度,前4个字节是可以看作消息头信息,头信息是消息体的长度
length = getPacketLength(buffer, offset);
if (length == -1) {//header没有完全读入buffer,继续读
if (!buffer.hasRemaining()) {
buffer = checkReadBuffer(buffer, offset, position);
}
break;
}
//消息已经完全读入buffer,可以进行处理
if (position >= offset + length) {
buffer.position(offset);
byte[] data = new byte[length];
//获取消息
buffer.get(data, 0, length);
//处理消息
handle(data);
offset += length;
if (position == offset) { //读取到信息末尾
if (readBufferOffset != 0) {
readBufferOffset = 0;
}
buffer.clear();
break;
} else { //继续读取
readBufferOffset = offset;
buffer.position(position);
continue;
}
} else { //消息没有完全读入buffer,继续读
if (!buffer.hasRemaining()) {
buffer = checkReadBuffer(buffer, offset, position);
}
break;
}
}
}
```
当消息没有完整读入的时候,都是采用`checkReadBuffer(ByteBuffer buffer, int offset int posotion)`做进一步读取处理的,下面是该方法的代码:
``` java
private ByteBuffer checkReadBuffer(ByteBuffer buffer, int offset,int position) {
//如果buffer当前信息全是未读取的,则直接扩容;否则对buffer进行compact
if (offset == 0) {
if (buffer.capacity() >= maxPacketSize) { //这里为什么是>=??,异常信息的说明没有问题?packet size大与buffer的capacity的话可以扩容吧
throw new IllegalArgumentException(
"Packet size over the limit.");
}
//扩容为maxPacketSize或者当前大小的两倍
int size = buffer.capacity() << 1;
size = (size > maxPacketSize) ? maxPacketSize : size;
ByteBuffer newBuffer = processor.getBufferPool().allocate(size); //扩容以后是一个临时buffer,不会被重用
buffer.position(offset);
newBuffer.put(buffer);
readBuffer = newBuffer;
recycle(buffer);
return newBuffer;
} else {
buffer.position(offset);
buffer.compact();
readBufferOffset = 0;
return buffer;
}
}

最终,读取到的消息会放在byte[] data中,然后调用handle(data)方法进行处理,handle(data)会调用NIOHandler的handle(data)方法处理。NIOHandler的实现类有很多,分别用来处理不同类型的请求。
1