Mycat 启动流程

MycatStartUp main()

  1. String home = SystemConfig.getHomePath(); //获取MYCAT_HOME
  2. MycatServer server = MycatServer.getInstance(); //获取MycatServer实例
  3. server.beforeStart(); //在启动前,先设置Log4j的watchDog
  4. server.startup(); //server启动
  5. 死循环阻塞进程,不退出
1
2
3
while (true) {
Thread.sleep(300 * 1000);
}

MycatServer start()

获取项目基础配置项

SystemConfig system = config.getSystem(); //获取项目基础配置项

- SystemConfig中的一些属性说明
    + processors: 系统的cpu的核心数量 
    + processorExecutor: accessor的数量 ,默认=processors * 2 或者等于4
    + processorBufferChunk: Buffer Chunk size, 默认为4096
    + processorBufferPool: processorBufferPool = DEFAULT_BUFFER_CHUNK_SIZE * processors * 1000; buffer pool大小

新建ConnectionFactory

新建ManagerConnectionFactory mfServerConnectionFactory sf, 新建NIOAcceptor的时候,factory会做为参数传递进去,用来新建Connection。

新建BufferPool

为了性能的考虑,Mycat使用了DirectByteBuffer,Direct适合要求高吞吐、低延迟的请求处理,但是DirectByteBuffer的新建和销毁的开销都比HeapByteBuffer大,所以一般都会尽量的做池化处理\

1
2
bufferPool = new BufferPool(processBuferPool, processBufferChunk,
socketBufferLocalPercent / processorCount);

processBuferPool默认的大小是4096processorCount1000, processBufferChunk的大小是4096,socketBufferLocalPercent 默认是100%,processorCount是CPU核心数量。BufferPool的构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public BufferPool(int bufferSize, int chunkSize, int threadLocalPercent) {
//每个buffer的大小
this.chunkSize = chunkSize;
int size = bufferSize / chunkSize;
size = (bufferSize % chunkSize == 0) ? size : size + 1;、
//普通bufferPool的大小
this.capactiy = size;
//threadlocal buffer queue的大小
threadLocalCount = threadLocalPercent * capactiy / 100;
//新建DirectByteBuffer,放在queue中
for (int i = 0; i < capactiy; i++) {
items.offer(createDirectBuffer(chunkSize));
}
//新建ThreadLocalBufferPool,并没有初始化内部的queue
localBufferPool = new ThreadLocalBufferPool(
threadLocalCount);
}

BufferPool生成了两个Queue,一个是private final ConcurrentLinkedQueue<ByteBuffer> items = new ConcurrentLinkedQueue<ByteBuffer>();, 另一个是private final ThreadLocalBufferPool localBufferPool;, ThreadLocalBufferPool继承了ThreadLocal<BufferQueue>, 所以也可以看作一个buffer queue。在使用ByteBuffer的地方,只需要bufferpool.allocate(), 具体的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static final boolean isLocalCacheThread() {
final String thname = Thread.currentThread().getName();
return (thname.length() < LOCAL_BUF_THREAD_PREX.length()) ? false
: (thname.charAt(0) == '$' && thname.charAt(1) == '_');
}
public ByteBuffer allocate() {
ByteBuffer node = null;
if (isLocalCacheThread()) {
// allocate from threadlocal
node = localBufferPool.get().poll();
if (node != null) {
return node;
}
}
node = items.poll();
if (node == null) {
newCreated++;
node = this.createDirectBuffer(chunkSize);
}
return node;
}

可以看到,allocate的时候,如果发现是前面提到的Acceptor、Connector,或者Reactor的时候,都会先从ThreadLocal,如果没有,再从items中获取。(为什么会先从取ThreadLocal,应该是尽量避免锁)

初始化线程执行器

businessExecutor主要用来执行一些比较耗时的操作

1
2
3
businessExecutor = ExecutorUtil.create("BusinessExecutor",
threadPoolSize);
timerExecutor = ExecutorUtil.create("Timer",system.getTimerExecutor());

新建NIOProcessor

1
2
processors[i] = new NIOProcessor("Processor" + i, bufferPool,
businessExecutor);

新建ReactorPool

如果是NIO,则分别新建了一个NIOReactorPool、一个NIOConnector和两个NIOAccpetor,并且每个线程都名字,以$_开头。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
LOGGER.info("using nio network handler ");
//新建NIOReactorPool
NIOReactorPool reactorPool = new NIOReactorPool(
BufferPool.LOCAL_BUF_THREAD_PREX + "NIOREACTOR",processors.length);
//新建NIOConnector
connector = new NIOConnector(BufferPool.LOCAL_BUF_THREAD_PREX+ "NIOConnector", reactorPool);
((NIOConnector) connector).start();
//接收管理请求的NIOAcceptor
manager = new NIOAcceptor(BufferPool.LOCAL_BUF_THREAD_PREX + NAME
+ "Manager", system.getBindIp(), system.getManagerPort(),
mf, reactorPool);
//接收客户访问请求的NIOAcceptor
server = new NIOAcceptor(BufferPool.LOCAL_BUF_THREAD_PREX + NAME
+ "Server", system.getBindIp(), system.getServerPort(), sf,
reactorPool);

其中, reactorPool是大家公共的reactor连接池;connector用于从Mycat向mysql发起请求,acceptor用于接收请求,manager用于接收管理端请求,server用户接收客户程序的连接请求。
ReactorPool的实现比较简单,就是用一个数组保存了多个Reactor,在使用构造函数新建Pool的时候就初始化每个Reactor,并且调用start方法启动。ReactorPool还提供了一个getNextReactor()方法来选取一个Reactor,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class NIOReactorPool {
private final NIOReactor[] reactors;
private volatile int nextReactor;
public NIOReactorPool(String name, int poolSize) throws IOException {
reactors = new NIOReactor[poolSize];
for (int i = 0; i < poolSize; i++) {
NIOReactor reactor = new NIOReactor(name + "-" + i);
reactors[i] = reactor;
reactor.startup();
}
}
public NIOReactor getNextReactor() {
if (++nextReactor == reactors.length) {
nextReactor = 0;
}
return reactors[nextReactor];
}
}

然后分别调用manager和server的start方法。manager.start()server.start()
manager是处理管理端的连接请求的,我们暂且搁置一下,来看看server.start()
当使用NIO时,start方法启动了NIOAcceptor线程,NIOAcceptor的主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Override
public void run() {
final Selector tSelector = this.selector;
for (;;) {
++acceptCount;
try {
tSelector.select(1000L);
Set<SelectionKey> keys = tSelector.selectedKeys();
try {
for (SelectionKey key : keys) {
if (key.isValid() && key.isAcceptable()) {
//当有Channel的accept就绪的时候,处理accept
accept();
} else {
key.cancel();//这里为什么要cancel?
}
}
} finally {
keys.clear();
}
} catch (Exception e) {
LOGGER.warn(getName(), e);
}
}
}
private void accept() {
SocketChannel channel = null;
try {
//通过抓包可以看出,在accept调用之前,其实TCP建立连接的三次握手就已经完成了
channel = serverChannel.accept();
channel.configureBlocking(false);
//FrontendConnection处理客户端连接Mycat的请求, Connection封装了Channel,用来完成整个操作,实际是一个ServerCOnnectionFactory
FrontendConnection c = factory.make(channel);
c.setAccepted(true);
c.setId(ID_GENERATOR.getId());
NIOProcessor processor = (NIOProcessor) MycatServer.getInstance()
.nextProcessor();
c.setProcessor(processor);
//从ReactorPool中选取一个Reactor,把Connection异步注册到Reactor中
NIOReactor reactor = reactorPool.getNextReactor();
//postRegister会先把Connection添加到registerQueue中,然后wakeup Selector,在NIOReactor的主循环中注册,这样省去了加锁同步的操作。否则,因为Selector的select()方法和Channel的register方法都会操作SelectionKey的集合,需要加锁同步避免相互阻塞。
reactor.postRegister(c);
} catch (Exception e) {
LOGGER.warn(getName(), e);
closeChannel(channel);
}
}

初始化DataHost

1
2
3
4
5
6
7
8
9
10
11
12
// init datahost
Map<String, PhysicalDBPool> dataHosts = config.getDataHosts();
LOGGER.info("Initialize dataHost ...");
for (PhysicalDBPool node : dataHosts.values()) {
String index = dnIndexProperties.getProperty(node.getHostName(),
"0");
if (!"0".equals(index)) {
LOGGER.info("init datahost: " + node.getHostName()
+ " to use datasource index:" + index);
}
node.init(Integer.valueOf(index));
}

PhysicalDBPool是一个连接一个数据库实例的一个连接池,其中有两个非常重要的属性,分别是一个写数据源PhysicalDatasource[] writeSources;和一个或多个读数据源protected Map<Integer, PhysicalDatasource[]> readSources;, 这两个属性都是在构造函数中通过参数传递进来的。真正的初始化是在ConfigInitializer类的getPhysicalDBPool(DataHostConfig conf, ConfigLoader configLoader)方法中。关于PhysicalDBPool更具体的实现就不在这里详述了。