博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[二]RabbitMQ-客户端源码之AMQConnection
阅读量:6986 次
发布时间:2019-06-27

本文共 18240 字,大约阅读时间需要 60 分钟。

上一篇文章()中阐述了conn.start()方法完成之后客户端就已经和broker建立了正常的连接,而这个Connection的关键就在于这个start()方法之内,下面我们来慢慢分析。

首先来看看start()方法的源码,这个方法有点长,这里拆开来一一分析,首先是注释:

/** * Start up the connection, including the MainLoop thread. * Sends the protocol * version negotiation header, and runs through * Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then * calls Connection.Open and waits for the OpenOk. Sets heart-beat * and frame max values after tuning has taken place. * @throws IOException if an error is encountered * either before, or during, protocol negotiation; * sub-classes {@link ProtocolVersionMismatchException} and * {@link PossibleAuthenticationFailureException} will be thrown in the * corresponding circumstances. {@link AuthenticationFailureException} * will be thrown if the broker closes the connection with ACCESS_REFUSED. * If an exception is thrown, connection resources allocated can all be * garbage collected when the connection object is no longer referenced. */

首先来看看方法上的注释说了什么:

  • 方法的作用是启动连接(start up the connection), 包括启动MainLoop线程,这个MainLoop线程主要是和broker进行通信交互处理通信帧(Frame)的一个线程(非常的重要!!!)。
  • 这个方法会在建立连接的初始化阶段(negotiation)会进行Connection.Start/.StartOk, Connection.Tune/.TuneOk, 调用Connection.Open之后再等待Conenction.OpenOk(这里的都是指AMQP协议层面的),这个可以参考本文中第一张使用wireshark抓包的网络截图,一一对应的关系。
  • 通过broker回复的Connection.Tune帧(帧中包含Channel-Max, Frame-Max, Heartbeat三个参数)设置channelMax, frameMax以及Heartbeat的参数值。
  • 一些异常情况。

public void start()        throws IOException, TimeoutException {    initializeConsumerWorkService();    initializeHeartbeatSender();    this._running = true;    // Make sure that the first thing we do is to send the header,    // which should cause any socket errors to show up for us, rather    // than risking them pop out in the MainLoop    AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =        new AMQChannel.SimpleBlockingRpcContinuation();    // We enqueue an RPC continuation here without sending an RPC    // request, since the protocol specifies that after sending    // the version negotiation header, the client (connection    // initiator) is to wait for a connection.start method to    // arrive.    _channel0.enqueueRpc(connStartBlocker);

首先是初始化工作线程池(initializeConsumerWorkService)和初始化心跳线程(initializeHeartbeatSender)并设置运行状态为true(this._isrunning=true,这个值会在MainLoop线程中有用,控制MainLoop线程是否继续运行)。

“AMQChannel.SimpleBlockingRpcContinuation connStartBlocker = new AMQChannel.SimpleBlockingRpcContinuation();”这句代码,从命名上来说像是rpc, 其实这么理解也没错。RabbitMQ-Client这个版本(3.5.3)的客户端与broker端的通信是采用java原生socket.当然后面也改成了NIO,这个自然是后话。RabbitMQ-Client程序中会对各种帧进行处理,处理的方式也不是单一化的,这里举Connection.Start这个类型的报文做分析。当broker发送Connection.Start至client端,client收到之后进行处理(MainLoop线程中),然后将此报文存入SimpleBlockingRpcContinuation中,照着SimpleBlockingRpcContinuation深究下去,其就是一个容量为1的BlockingQueue,也就是当MainLoop主导的线程将收到的Connection.Start存入其中,然后AMQConnction类的start()线程在等待(start()方法下面的代码):

connStart = (AMQP.Connection.Start) connStartBlocker.getReply(HANDSHAKE_TIMEOUT/2).getMethod();

然后继续处理。这看上去也算是个rpc,等待别的线程(这个线程同样在等待broker的返回)处理完毕。

AMQCommand(这个之后会讲到), 下面的“_channel0.enqueueRpc(connStartBlocker)”将这个rpc任务放入Channel中,如果深入代码看的话,channel中当前至多只能enqueue一个rpc,如果当前的rpc没有处理完再enqueue的话会被阻塞(wait())直到处理完当前的rpc才能enqueue下一个rpc。


try {        // The following two lines are akin to AMQChannel's        // transmit() method for this pseudo-RPC.        _frameHandler.setTimeout(HANDSHAKE_TIMEOUT);        _frameHandler.sendHeader();    } catch (IOException ioe) {        _frameHandler.close();        throw ioe;    }

接下来“_frameHandler.sendHeader()”主要是发送Protocol-Header 0-9-1帧(可参考下图),这个客户端与broker建立连接的AMQP协议的第一帧,帧中的内容包括AMQP的版本号。这里发_frameHandler就是前面Connection提到的SocketFrameHandler对象,我们来看看sendHeader()做了什么:

//本段代码在SocketFrameHandler类中public void sendHeader(int major, int minor, int revision) throws IOException {      synchronized (_outputStream) {          _outputStream.write("AMQP".getBytes("US-ASCII"));          _outputStream.write(0);          _outputStream.write(major);          _outputStream.write(minor);          _outputStream.write(revision);          _outputStream.flush();      }  }  public void sendHeader() throws IOException {      sendHeader(AMQP.PROTOCOL.MAJOR, AMQP.PROTOCOL.MINOR, AMQP.PROTOCOL.REVISION);  }

上面这段对照着下图一目了然:

这里写图片描述


// start the main loop going    MainLoop loop = new MainLoop();    final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();    mainLoopThread = Environment.newThread(threadFactory, loop, name);    mainLoopThread.start();    // after this point clear-up of MainLoop is triggered by closing the frameHandler.

下面就是最重要的MainLoop线程了。这里先跳过,接下去看看start()方法,之后就是Connection.Start/.StartOk, Connection.Tune/.TuneOk, Connection.Open/.OpenOk的来回negotiation,以及设置channelMax, frameMax和heartbeat的参数值。当然在设置frameMax之前还初始化了ChannelManager,至于ChannelManager可以简单的理解为管理Channel的一个类,具体实现细节可以参考()

AMQP.Connection.Start connStart = null;    AMQP.Connection.Tune connTune = null;    try {        connStart =                (AMQP.Connection.Start) connStartBlocker.getReply(HANDSHAKE_TIMEOUT/2).getMethod();        _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());        Version serverVersion =                new Version(connStart.getVersionMajor(),                                   connStart.getVersionMinor());        if (!Version.checkVersion(clientVersion, serverVersion)) {            throw new ProtocolVersionMismatchException(clientVersion,                                                              serverVersion);        }        String[] mechanisms = connStart.getMechanisms().toString().split(" ");        SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);        if (sm == null) {            throw new IOException("No compatible authentication mechanism found - " +                                          "server offered [" + connStart.getMechanisms() + "]");        }        LongString challenge = null;        LongString response = sm.handleChallenge(null, this.username, this.password);        do {            Method method = (challenge == null)                                    ? new AMQP.Connection.StartOk.Builder()                                              .clientProperties(_clientProperties)                                              .mechanism(sm.getName())                                              .response(response)                                              .build()                                    : new AMQP.Connection.SecureOk.Builder().response(response).build();            try {                Method serverResponse = _channel0.rpc(method, HANDSHAKE_TIMEOUT/2).getMethod();                if (serverResponse instanceof AMQP.Connection.Tune) {                    connTune = (AMQP.Connection.Tune) serverResponse;                } else {                    challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();                    response = sm.handleChallenge(challenge, this.username, this.password);                }            } catch (ShutdownSignalException e) {                Method shutdownMethod = e.getReason();                if (shutdownMethod instanceof AMQP.Connection.Close) {                    AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;                    if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {                        throw new AuthenticationFailureException(shutdownClose.getReplyText());                    }                }                throw new PossibleAuthenticationFailureException(e);            }        } while (connTune == null);    } catch (TimeoutException te) {        _frameHandler.close();        throw te;    } catch (ShutdownSignalException sse) {        _frameHandler.close();        throw AMQChannel.wrap(sse);    } catch(IOException ioe) {        _frameHandler.close();        throw ioe;    }    try {        int channelMax =            negotiateChannelMax(this.requestedChannelMax,                                connTune.getChannelMax());        _channelManager = instantiateChannelManager(channelMax, threadFactory);        int frameMax =            negotiatedMaxValue(this.requestedFrameMax,                               connTune.getFrameMax());        this._frameMax = frameMax;        int heartbeat =            negotiatedMaxValue(this.requestedHeartbeat,                               connTune.getHeartbeat());        setHeartbeat(heartbeat);        _channel0.transmit(new AMQP.Connection.TuneOk.Builder()                            .channelMax(channelMax)                            .frameMax(frameMax)                            .heartbeat(heartbeat)                          .build());        _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()                                  .virtualHost(_virtualHost)                                .build());    } catch (IOException ioe) {        _heartbeatSender.shutdown();        _frameHandler.close();        throw ioe;    } catch (ShutdownSignalException sse) {        _heartbeatSender.shutdown();        _frameHandler.close();        throw AMQChannel.wrap(sse);    }    // We can now respond to errors having finished tailoring the connection    this._inConnectionNegotiation = false;    return;}

接着回顾MainLoop, 在start()方法中关于MainLoop的代码主要有:

// start the main loop going    MainLoop loop = new MainLoop();    final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();    mainLoopThread = Environment.newThread(threadFactory, loop, name);    mainLoopThread.start();    // after this point clear-up of MainLoop is triggered by closing the frameHandler.

这段代码主要是初始化MainLoop线程对象,然后让其运行。没有什么特别之处,而特别之处在于MainLoop本身。

MainLoop类是AMQConnection类的私有内部类:

private class MainLoop implements Runnable {
/** * Channel reader thread main loop. Reads a frame, and if it is * not a heartbeat frame, dispatches it to the channel it refers to. * Continues running until the "running" flag is set false by * shutdown(). */ public void run() { try { while (_running) { Frame frame = _frameHandler.readFrame(); if (frame != null) { _missedHeartbeats = 0; if (frame.type == AMQP.FRAME_HEARTBEAT) { // Ignore it: we've already just reset the heartbeat counter. } else { if (frame.channel == 0) { // the special channel _channel0.handleFrame(frame); } else { if (isOpen()) { // If we're still _running, but not isOpen(), then we // must be quiescing, which means any inbound frames // for non-zero channels (and any inbound commands on // channel zero that aren't Connection.CloseOk) must // be discarded. ChannelManager cm = _channelManager; if (cm != null) { cm.getChannel(frame.channel).handleFrame(frame); } } } } } else { // Socket timeout waiting for a frame. // Maybe missed heartbeat. handleSocketTimeout(); } } } catch (EOFException ex) { if (!_brokerInitiatedShutdown) shutdown(null, false, ex, true); } catch (Throwable ex) { _exceptionHandler.handleUnexpectedConnectionDriverException(AMQConnection.this, ex); shutdown(null, false, ex, true); } finally { // Finally, shut down our underlying data connection. _frameHandler.close(); _appContinuation.set(null); notifyListeners(); } }}

MainLoop线程主要用来处理通信帧(Frame,有关Frame的细节将会在()中陈述)的,可以看到当AMQConnection调用start()方法后,_isrunning就设置为true,那么线程一直在运行(while(true))。

MainLoop线程当读取到通信帧之后,判断是否是心跳帧,如果是则忽略继续监听。如果是其他帧,则判断其frame.channel值是否为0,frame.channel值为0代表的是特殊帧,这些特殊帧是和Connection有关的,而不是和Channel有关的(上面代码里的frame.channel就是Channel里的channel number, 一般Connection类型的帧的channel number为0,而其余Channel类别帧的channel number大于0。)

这里就分channel_number=0和channel_number !=0分别进行处理。

当channel_number=0即frame.channel=0则直接调用_channel0的handleFrame方法。
这个_channel0是在AMQConnection类中创建的私有变量:

private final AMQChannel _channel0 = new AMQChannel(this, 0) {    @Override public boolean processAsync(Command c) throws IOException {        return getConnection().processControlCommand(c);    }};

调用AMQChannel的handleFrame方法(有关AMQChannel的更多实现细节可以参考:()):

public void handleFrame(Frame frame) throws IOException {    AMQCommand command = _command;    if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line        _command = new AMQCommand(); // prepare for the next one        handleCompleteInboundCommand(command);    }}

对于channel number为0的帧,AMQCommand的handleFrame方法都是返回true.(有关AMQCommand的实现细节可以参考:())

进而调用AMQChannel的handleCompleteInboundCommand(command)方法:

public void handleCompleteInboundCommand(AMQCommand command) throws IOException {    // First, offer the command to the asynchronous-command    // handling mechanism, which gets to act as a filter on the    // incoming command stream.  If processAsync() returns true,    // the command has been dealt with by the filter and so should    // not be processed further.  It will return true for    // asynchronous commands (deliveries/returns/other events),    // and false for commands that should be passed on to some    // waiting RPC continuation.    if (!processAsync(command)) {        // The filter decided not to handle/consume the command,        // so it must be some reply to an earlier RPC.        nextOutstandingRpc().handleCommand(command);        markRpcFinished();    }}

进而调用AMQChannel的processAsync方法。这个方法在AMQChannel类中是一个抽象方法,而观察AMQConnection中的AMQChannel _channel0私有变量其正好实现了这个方法:

/** The special channel 0 (not managed by the_channelManager) */private final AMQChannel _channel0 = new AMQChannel(this, 0) {    @Override public boolean processAsync(Command c) throws IOException {        return getConnection().processControlCommand(c);    }};

ChannelN中也实现了processAsync方法。(有关ChannelN的实现细节可以参考:())

进而调用了AMQConnection的processControlCommand方法:

/** * Handles incoming control commands on channel zero. * @see ChannelN#processAsync */@SuppressWarnings("unused")public boolean processControlCommand(Command c) throws IOException{    // Similar trick to ChannelN.processAsync used here, except    // we're interested in whole-connection quiescing.    // See the detailed comments in ChannelN.processAsync.    Method method = c.getMethod();    if (isOpen()) {        if (method instanceof AMQP.Connection.Close) {            handleConnectionClose(c);            return true;        } else if (method instanceof AMQP.Connection.Blocked) {            AMQP.Connection.Blocked blocked = (AMQP.Connection.Blocked) method;            try {                for (BlockedListener l : this.blockedListeners) {                    l.handleBlocked(blocked.getReason());                }            } catch (Throwable ex) {                getExceptionHandler().handleBlockedListenerException(this, ex);            }            return true;        } else if (method instanceof AMQP.Connection.Unblocked) {            try {                for (BlockedListener l : this.blockedListeners) {                    l.handleUnblocked();                }            } catch (Throwable ex) {                getExceptionHandler().handleBlockedListenerException(this, ex);            }            return true;        } else {            return false;        }    } else {        if (method instanceof AMQP.Connection.Close) {            // Already shutting down, so just send back a CloseOk.            try {                _channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());            } catch (IOException _e) { } // ignore            return true;        } else if (method instanceof AMQP.Connection.CloseOk) {            // It's our final "RPC". Time to shut down.            _running = false;            // If Close was sent from within the MainLoop we            // will not have a continuation to return to, so            // we treat this as processed in that case.            return !_channel0.isOutstandingRpc();        } else { // Ignore all others.            return true;        }    }}

这个方法是用来处理AMQP控制命令的:Connection.Close/CloseOk, Connection.Blocked/.Unblocked。正常情况下(比如Connection.Start/.StartOk)直接返回false。

这样就会运行到 nextOutstandingRpc().handleCommand(command);这句代码,意思就是将从broker接受到的AMQCommand对象存入RpcContinuation对象,确切的来说是SimpleBlockingRpcContinuation这个对象中,更确切的来说是存放到容量为1的BlockingQueue中,等待其余的线程来“take()”。有关RpcContinuation或者SimpleBlockingRpcContinuation细节可以参考:。

我们假设有一个可靠的面向流的网络传输层(TCP/IP或相当)。在单个套接字连接中,可以存在多个独立控制线程,这些称之为通道。每个帧都使用通道编号来编号。通过交织他们的帧,不同的通道共享连接。对于给定的通道,帧运行在一个严格的序列,这样可以用来驱动一个协议解析器(通常是一个状态机)。

当channel_number!=0则需要从ChannelManager中根据channel number找出相应的AMQChannel再调用handleFrame方法处理。

这里的ChannelManager从何而来?
这里就还是要到AMQChannel的start()方法来看,有这么一句:_channelManager = instantiateChannelManager(channelMax, threadFactory);

//AMQConnection类中protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {    return new ChannelManager(this._workService, channelMax, threadFactory);}//ChannelManager的构造方法public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) {    if (channelMax == 0) {        // The framing encoding only allows for unsigned 16-bit integers        // for the channel number        channelMax = (1 << 16) - 1;    }    _channelMax = channelMax;    channelNumberAllocator = new IntAllocator(1, channelMax);    this.workService = workService;    this.threadFactory = threadFactory;}

ChannelManager构造方法中的ConsumerWorkService参数就是AMQConnection中start()方法第一行代码初始化的ConsumerWorkService对象。

有关ChannelManager的实现细节可以参考:()

当channel number等于0的时候是调用AMQChannel,也可以说是AQMConnection的内部成员变量AMQChannel _channel0来处理。

当channel number不等于0时,这个接下去的处理就要涉及到整个RabbitMQ-Client代码最核心的类——ChannelN。可以类别上上面channel number为0的情况,具体可以参考:。


附:本系列全集

你可能感兴趣的文章
BZOJ 1061: [Noi2008]志愿者招募【单纯形裸题】
查看>>
12月25日云栖精选夜读:阿里安全潘多拉实验室龙磊:越狱 iOS 11.2,我选了一条最难走的路...
查看>>
小菜一步一步学数据结构之(五)顺序栈
查看>>
2017年浙江省大学生高等数学 (微积分) 竞赛试题 (数学类)
查看>>
JVM上的随机数与熵池策略
查看>>
Java8并发教程:Threads和Executors
查看>>
v8世界探险(3) - v8的抽象语法树结构
查看>>
《C语言及程序设计》实践项目——用if语句实现分支结构
查看>>
“AI +跨界+技术” 看2018中国会展创新者大会的新观点
查看>>
JavaScript——数据类型转换(显式和隐式)
查看>>
【半月刊 4】前端高频面试题及答案汇总
查看>>
lc686. Repeated String Match
查看>>
MarkDown插入数学公式
查看>>
利用Windows Azure Pack创建虚拟化网络
查看>>
DBA的40条军规
查看>>
Election方法2
查看>>
@@identity , SCOPE_IDENTITY,IDENT_CURRENT 三者的异同
查看>>
RHEL 7.1操作系统安装过程说明
查看>>
基于Python的性能自动化测试框架设计思路和实现
查看>>
Spark里几个重要的概念及架构
查看>>