本文共 18240 字,大约阅读时间需要 60 分钟。
上一篇文章()中阐述了conn.start()方法完成之后客户端就已经和broker建立了正常的连接,而这个Connection的关键就在于这个start()方法之内,下面我们来慢慢分析。
首先来看看start()方法的源码,这个方法有点长,这里拆开来一一分析,首先是注释:
/** * Start up the connection, including the MainLoop thread. * Sends the protocol * version negotiation header, and runs through * Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then * calls Connection.Open and waits for the OpenOk. Sets heart-beat * and frame max values after tuning has taken place. * @throws IOException if an error is encountered * either before, or during, protocol negotiation; * sub-classes {@link ProtocolVersionMismatchException} and * {@link PossibleAuthenticationFailureException} will be thrown in the * corresponding circumstances. {@link AuthenticationFailureException} * will be thrown if the broker closes the connection with ACCESS_REFUSED. * If an exception is thrown, connection resources allocated can all be * garbage collected when the connection object is no longer referenced. */
首先来看看方法上的注释说了什么:
public void start() throws IOException, TimeoutException { initializeConsumerWorkService(); initializeHeartbeatSender(); this._running = true; // Make sure that the first thing we do is to send the header, // which should cause any socket errors to show up for us, rather // than risking them pop out in the MainLoop AMQChannel.SimpleBlockingRpcContinuation connStartBlocker = new AMQChannel.SimpleBlockingRpcContinuation(); // We enqueue an RPC continuation here without sending an RPC // request, since the protocol specifies that after sending // the version negotiation header, the client (connection // initiator) is to wait for a connection.start method to // arrive. _channel0.enqueueRpc(connStartBlocker);
首先是初始化工作线程池(initializeConsumerWorkService)和初始化心跳线程(initializeHeartbeatSender)并设置运行状态为true(this._isrunning=true,这个值会在MainLoop线程中有用,控制MainLoop线程是否继续运行)。
“AMQChannel.SimpleBlockingRpcContinuation connStartBlocker = new AMQChannel.SimpleBlockingRpcContinuation();”这句代码,从命名上来说像是rpc, 其实这么理解也没错。RabbitMQ-Client这个版本(3.5.3)的客户端与broker端的通信是采用java原生socket.当然后面也改成了NIO,这个自然是后话。RabbitMQ-Client程序中会对各种帧进行处理,处理的方式也不是单一化的,这里举Connection.Start这个类型的报文做分析。当broker发送Connection.Start至client端,client收到之后进行处理(MainLoop线程中),然后将此报文存入SimpleBlockingRpcContinuation中,照着SimpleBlockingRpcContinuation深究下去,其就是一个容量为1的BlockingQueue,也就是当MainLoop主导的线程将收到的Connection.Start存入其中,然后AMQConnction类的start()线程在等待(start()方法下面的代码):
connStart = (AMQP.Connection.Start) connStartBlocker.getReply(HANDSHAKE_TIMEOUT/2).getMethod();
然后继续处理。这看上去也算是个rpc,等待别的线程(这个线程同样在等待broker的返回)处理完毕。
AMQCommand(这个之后会讲到), 下面的“_channel0.enqueueRpc(connStartBlocker)”将这个rpc任务放入Channel中,如果深入代码看的话,channel中当前至多只能enqueue一个rpc,如果当前的rpc没有处理完再enqueue的话会被阻塞(wait())直到处理完当前的rpc才能enqueue下一个rpc。
try { // The following two lines are akin to AMQChannel's // transmit() method for this pseudo-RPC. _frameHandler.setTimeout(HANDSHAKE_TIMEOUT); _frameHandler.sendHeader(); } catch (IOException ioe) { _frameHandler.close(); throw ioe; }
接下来“_frameHandler.sendHeader()”主要是发送Protocol-Header 0-9-1帧(可参考下图),这个客户端与broker建立连接的AMQP协议的第一帧,帧中的内容包括AMQP的版本号。这里发_frameHandler就是前面Connection提到的SocketFrameHandler对象,我们来看看sendHeader()做了什么:
//本段代码在SocketFrameHandler类中public void sendHeader(int major, int minor, int revision) throws IOException { synchronized (_outputStream) { _outputStream.write("AMQP".getBytes("US-ASCII")); _outputStream.write(0); _outputStream.write(major); _outputStream.write(minor); _outputStream.write(revision); _outputStream.flush(); } } public void sendHeader() throws IOException { sendHeader(AMQP.PROTOCOL.MAJOR, AMQP.PROTOCOL.MINOR, AMQP.PROTOCOL.REVISION); }
上面这段对照着下图一目了然:
// start the main loop going MainLoop loop = new MainLoop(); final String name = "AMQP Connection " + getHostAddress() + ":" + getPort(); mainLoopThread = Environment.newThread(threadFactory, loop, name); mainLoopThread.start(); // after this point clear-up of MainLoop is triggered by closing the frameHandler.
下面就是最重要的MainLoop线程了。这里先跳过,接下去看看start()方法,之后就是Connection.Start/.StartOk, Connection.Tune/.TuneOk, Connection.Open/.OpenOk的来回negotiation,以及设置channelMax, frameMax和heartbeat的参数值。当然在设置frameMax之前还初始化了ChannelManager,至于ChannelManager可以简单的理解为管理Channel的一个类,具体实现细节可以参考()
AMQP.Connection.Start connStart = null; AMQP.Connection.Tune connTune = null; try { connStart = (AMQP.Connection.Start) connStartBlocker.getReply(HANDSHAKE_TIMEOUT/2).getMethod(); _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties()); Version serverVersion = new Version(connStart.getVersionMajor(), connStart.getVersionMinor()); if (!Version.checkVersion(clientVersion, serverVersion)) { throw new ProtocolVersionMismatchException(clientVersion, serverVersion); } String[] mechanisms = connStart.getMechanisms().toString().split(" "); SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms); if (sm == null) { throw new IOException("No compatible authentication mechanism found - " + "server offered [" + connStart.getMechanisms() + "]"); } LongString challenge = null; LongString response = sm.handleChallenge(null, this.username, this.password); do { Method method = (challenge == null) ? new AMQP.Connection.StartOk.Builder() .clientProperties(_clientProperties) .mechanism(sm.getName()) .response(response) .build() : new AMQP.Connection.SecureOk.Builder().response(response).build(); try { Method serverResponse = _channel0.rpc(method, HANDSHAKE_TIMEOUT/2).getMethod(); if (serverResponse instanceof AMQP.Connection.Tune) { connTune = (AMQP.Connection.Tune) serverResponse; } else { challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge(); response = sm.handleChallenge(challenge, this.username, this.password); } } catch (ShutdownSignalException e) { Method shutdownMethod = e.getReason(); if (shutdownMethod instanceof AMQP.Connection.Close) { AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod; if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) { throw new AuthenticationFailureException(shutdownClose.getReplyText()); } } throw new PossibleAuthenticationFailureException(e); } } while (connTune == null); } catch (TimeoutException te) { _frameHandler.close(); throw te; } catch (ShutdownSignalException sse) { _frameHandler.close(); throw AMQChannel.wrap(sse); } catch(IOException ioe) { _frameHandler.close(); throw ioe; } try { int channelMax = negotiateChannelMax(this.requestedChannelMax, connTune.getChannelMax()); _channelManager = instantiateChannelManager(channelMax, threadFactory); int frameMax = negotiatedMaxValue(this.requestedFrameMax, connTune.getFrameMax()); this._frameMax = frameMax; int heartbeat = negotiatedMaxValue(this.requestedHeartbeat, connTune.getHeartbeat()); setHeartbeat(heartbeat); _channel0.transmit(new AMQP.Connection.TuneOk.Builder() .channelMax(channelMax) .frameMax(frameMax) .heartbeat(heartbeat) .build()); _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder() .virtualHost(_virtualHost) .build()); } catch (IOException ioe) { _heartbeatSender.shutdown(); _frameHandler.close(); throw ioe; } catch (ShutdownSignalException sse) { _heartbeatSender.shutdown(); _frameHandler.close(); throw AMQChannel.wrap(sse); } // We can now respond to errors having finished tailoring the connection this._inConnectionNegotiation = false; return;}
接着回顾MainLoop, 在start()方法中关于MainLoop的代码主要有:
// start the main loop going MainLoop loop = new MainLoop(); final String name = "AMQP Connection " + getHostAddress() + ":" + getPort(); mainLoopThread = Environment.newThread(threadFactory, loop, name); mainLoopThread.start(); // after this point clear-up of MainLoop is triggered by closing the frameHandler.
这段代码主要是初始化MainLoop线程对象,然后让其运行。没有什么特别之处,而特别之处在于MainLoop本身。
MainLoop类是AMQConnection类的私有内部类:
private class MainLoop implements Runnable { /** * Channel reader thread main loop. Reads a frame, and if it is * not a heartbeat frame, dispatches it to the channel it refers to. * Continues running until the "running" flag is set false by * shutdown(). */ public void run() { try { while (_running) { Frame frame = _frameHandler.readFrame(); if (frame != null) { _missedHeartbeats = 0; if (frame.type == AMQP.FRAME_HEARTBEAT) { // Ignore it: we've already just reset the heartbeat counter. } else { if (frame.channel == 0) { // the special channel _channel0.handleFrame(frame); } else { if (isOpen()) { // If we're still _running, but not isOpen(), then we // must be quiescing, which means any inbound frames // for non-zero channels (and any inbound commands on // channel zero that aren't Connection.CloseOk) must // be discarded. ChannelManager cm = _channelManager; if (cm != null) { cm.getChannel(frame.channel).handleFrame(frame); } } } } } else { // Socket timeout waiting for a frame. // Maybe missed heartbeat. handleSocketTimeout(); } } } catch (EOFException ex) { if (!_brokerInitiatedShutdown) shutdown(null, false, ex, true); } catch (Throwable ex) { _exceptionHandler.handleUnexpectedConnectionDriverException(AMQConnection.this, ex); shutdown(null, false, ex, true); } finally { // Finally, shut down our underlying data connection. _frameHandler.close(); _appContinuation.set(null); notifyListeners(); } }}
MainLoop线程主要用来处理通信帧(Frame,有关Frame的细节将会在()中陈述)的,可以看到当AMQConnection调用start()方法后,_isrunning就设置为true,那么线程一直在运行(while(true))。
MainLoop线程当读取到通信帧之后,判断是否是心跳帧,如果是则忽略继续监听。如果是其他帧,则判断其frame.channel值是否为0,frame.channel值为0代表的是特殊帧,这些特殊帧是和Connection有关的,而不是和Channel有关的(上面代码里的frame.channel就是Channel里的channel number, 一般Connection类型的帧的channel number为0,而其余Channel类别帧的channel number大于0。)
这里就分channel_number=0和channel_number !=0分别进行处理。
当channel_number=0即frame.channel=0则直接调用_channel0的handleFrame方法。 这个_channel0是在AMQConnection类中创建的私有变量:private final AMQChannel _channel0 = new AMQChannel(this, 0) { @Override public boolean processAsync(Command c) throws IOException { return getConnection().processControlCommand(c); }};
调用AMQChannel的handleFrame方法(有关AMQChannel的更多实现细节可以参考:()):
public void handleFrame(Frame frame) throws IOException { AMQCommand command = _command; if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line _command = new AMQCommand(); // prepare for the next one handleCompleteInboundCommand(command); }}
对于channel number为0的帧,AMQCommand的handleFrame方法都是返回true.(有关AMQCommand的实现细节可以参考:())
进而调用AMQChannel的handleCompleteInboundCommand(command)方法:public void handleCompleteInboundCommand(AMQCommand command) throws IOException { // First, offer the command to the asynchronous-command // handling mechanism, which gets to act as a filter on the // incoming command stream. If processAsync() returns true, // the command has been dealt with by the filter and so should // not be processed further. It will return true for // asynchronous commands (deliveries/returns/other events), // and false for commands that should be passed on to some // waiting RPC continuation. if (!processAsync(command)) { // The filter decided not to handle/consume the command, // so it must be some reply to an earlier RPC. nextOutstandingRpc().handleCommand(command); markRpcFinished(); }}
进而调用AMQChannel的processAsync方法。这个方法在AMQChannel类中是一个抽象方法,而观察AMQConnection中的AMQChannel _channel0私有变量其正好实现了这个方法:
/** The special channel 0 (not managed by the_channelManager) */private final AMQChannel _channel0 = new AMQChannel(this, 0) { @Override public boolean processAsync(Command c) throws IOException { return getConnection().processControlCommand(c); }};
ChannelN中也实现了processAsync方法。(有关ChannelN的实现细节可以参考:())
进而调用了AMQConnection的processControlCommand方法:
/** * Handles incoming control commands on channel zero. * @see ChannelN#processAsync */@SuppressWarnings("unused")public boolean processControlCommand(Command c) throws IOException{ // Similar trick to ChannelN.processAsync used here, except // we're interested in whole-connection quiescing. // See the detailed comments in ChannelN.processAsync. Method method = c.getMethod(); if (isOpen()) { if (method instanceof AMQP.Connection.Close) { handleConnectionClose(c); return true; } else if (method instanceof AMQP.Connection.Blocked) { AMQP.Connection.Blocked blocked = (AMQP.Connection.Blocked) method; try { for (BlockedListener l : this.blockedListeners) { l.handleBlocked(blocked.getReason()); } } catch (Throwable ex) { getExceptionHandler().handleBlockedListenerException(this, ex); } return true; } else if (method instanceof AMQP.Connection.Unblocked) { try { for (BlockedListener l : this.blockedListeners) { l.handleUnblocked(); } } catch (Throwable ex) { getExceptionHandler().handleBlockedListenerException(this, ex); } return true; } else { return false; } } else { if (method instanceof AMQP.Connection.Close) { // Already shutting down, so just send back a CloseOk. try { _channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build()); } catch (IOException _e) { } // ignore return true; } else if (method instanceof AMQP.Connection.CloseOk) { // It's our final "RPC". Time to shut down. _running = false; // If Close was sent from within the MainLoop we // will not have a continuation to return to, so // we treat this as processed in that case. return !_channel0.isOutstandingRpc(); } else { // Ignore all others. return true; } }}
这个方法是用来处理AMQP控制命令的:Connection.Close/CloseOk, Connection.Blocked/.Unblocked。正常情况下(比如Connection.Start/.StartOk)直接返回false。
这样就会运行到 nextOutstandingRpc().handleCommand(command);这句代码,意思就是将从broker接受到的AMQCommand对象存入RpcContinuation对象,确切的来说是SimpleBlockingRpcContinuation这个对象中,更确切的来说是存放到容量为1的BlockingQueue中,等待其余的线程来“take()”。有关RpcContinuation或者SimpleBlockingRpcContinuation细节可以参考:。
我们假设有一个可靠的面向流的网络传输层(TCP/IP或相当)。在单个套接字连接中,可以存在多个独立控制线程,这些称之为通道。每个帧都使用通道编号来编号。通过交织他们的帧,不同的通道共享连接。对于给定的通道,帧运行在一个严格的序列,这样可以用来驱动一个协议解析器(通常是一个状态机)。
当channel_number!=0则需要从ChannelManager中根据channel number找出相应的AMQChannel再调用handleFrame方法处理。
这里的ChannelManager从何而来? 这里就还是要到AMQChannel的start()方法来看,有这么一句:_channelManager = instantiateChannelManager(channelMax, threadFactory);//AMQConnection类中protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) { return new ChannelManager(this._workService, channelMax, threadFactory);}//ChannelManager的构造方法public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) { if (channelMax == 0) { // The framing encoding only allows for unsigned 16-bit integers // for the channel number channelMax = (1 << 16) - 1; } _channelMax = channelMax; channelNumberAllocator = new IntAllocator(1, channelMax); this.workService = workService; this.threadFactory = threadFactory;}
ChannelManager构造方法中的ConsumerWorkService参数就是AMQConnection中start()方法第一行代码初始化的ConsumerWorkService对象。
有关ChannelManager的实现细节可以参考:()
当channel number等于0的时候是调用AMQChannel,也可以说是AQMConnection的内部成员变量AMQChannel _channel0来处理。
当channel number不等于0时,这个接下去的处理就要涉及到整个RabbitMQ-Client代码最核心的类——ChannelN。可以类别上上面channel number为0的情况,具体可以参考:。