久久人人97超碰超碰窝窝_国产精品久久久久久搜索_AV在线网站无码不卡的_亚洲AV永久精品爱情岛论坛

rexian

咨詢(xún)電話(huà):023-6276-4481

熱門(mén)文章

聯(lián)系方式

電 話(huà):023-6276-4481

郵箱:broiling@qq.com

地址:重慶市南岸區(qū)亞太商谷6幢25-2

當(dāng)前位置:網(wǎng)站首頁(yè) > 技術(shù)文章 > MINA,xSocket同樣的性能缺陷及陷阱,Grizzly better

MINA,xSocket同樣的性能缺陷及陷阱,Grizzly better

編輯:Ethan 發(fā)表時(shí)間:2017-10-25 12:33:46
Ethan

MINA,Grizzly[grizzly-nio-framework],xSocket都是基于 java nio的 server framework.
這里的性能缺陷的焦點(diǎn)是指當(dāng)一條channel上的SelectionKey.OP_READ ready時(shí),1.是由select thread讀完數(shù)據(jù)之后再分發(fā)給應(yīng)用程序的handler,2.還是直接就分發(fā),由handler thread來(lái)負(fù)責(zé)讀數(shù)據(jù)和handle.
mina,xsocket是1. grizzly-nio-framework是2.
盡管讀channel buffer中bytes是很快的,但是如果我們放大,當(dāng)連接channel達(dá)到上萬(wàn)數(shù)量級(jí),甚至更多,這種延遲響應(yīng)的效果將會(huì)愈加明顯.
MINA:
for all selectedKeys 
{
    read data then fireMessageReceived.

xSocket:
for all selectedKeys 
{
    read data ,append it to readQueue then performOnData.

其中mina在fireMessageReceived時(shí)沒(méi)有使用threadpool來(lái)分發(fā),所以需要應(yīng)用程序在handler.messageReceived中再分發(fā).而xsocket的performOnData默認(rèn)是分發(fā)給threadpool[WorkerPool],WorkerPool雖然解決了線(xiàn)程池中的線(xiàn)程不能充到最大的問(wèn)題[跟tomcat6的做法一樣],但是它的調(diào)度機(jī)制依然缺乏靈活性.
Grizzly:
for all selectedKeys 
{
   [NIOContext---filterChain.execute--->our filter.execute]<------run In DefaultThreadPool
}
grizzly的DefaultThreadPool幾乎重寫(xiě)了java util concurrent threadpool,并使用自己的LinkedTransferQueue,但同樣缺乏靈活的池中線(xiàn)程的調(diào)度機(jī)制

下面分別是MINA,xSocket,Grizzly的源碼分析:
Apache MINA (mina-2.0.0-M6源碼為例):
    我們使用mina nio tcp最常用的樣例如下:
        NioSocketAcceptor acceptor = new NioSocketAcceptor(/*NioProcessorPool's size*/);
        DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();        
        //chain.addLast("codec", new ProtocolCodecFilter(
                //new TextLineCodecFactory()));
        ......
        // Bind
        acceptor.setHandler(/*our IoHandler*/);
        acceptor.bind(new InetSocketAddress(port));
------------------------------------------------------------------------------------
    首先從NioSocketAcceptor(extends AbstractPollingIoAcceptor)開(kāi)始,
bind(SocketAddress)--->bindInternal--->startupAcceptor:啟動(dòng)AbstractPollingIoAcceptor.Acceptor.run使用executor[Executor]的線(xiàn)程,注冊(cè)[interestOps:SelectionKey.OP_ACCEPT],然后wakeup selector.
一旦有連接進(jìn)來(lái)就構(gòu)建NioSocketSession--對(duì)應(yīng)--channal,然后session.getProcessor().add(session)將當(dāng)前的channal加入到NioProcessor的selector中去[interestOps:SelectionKey.OP_READ],這樣每個(gè)連接中有請(qǐng)求過(guò)來(lái)就由相應(yīng)的NioProcessor來(lái)處理.

這里有幾點(diǎn)要說(shuō)明的是:
1.一個(gè)NioSocketAcceptor對(duì)應(yīng)了多個(gè)NioProcessor,比如NioSocketAcceptor就使用了SimpleIoProcessorPool DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1.當(dāng)然這個(gè)size在new NioSocketAcceptor的時(shí)候可以設(shè)定.
2.一個(gè)NioSocketAcceptor對(duì)應(yīng)一個(gè)java nio selector[OP_ACCEPT],一個(gè)NioProcessor也對(duì)應(yīng)一個(gè)java nio selector[OP_READ].
3.一個(gè)NioSocketAcceptor對(duì)應(yīng)一個(gè)內(nèi)部的AbstractPollingIoAcceptor.Acceptor---thread.
4.一個(gè)NioProcessor也對(duì)應(yīng)一個(gè)內(nèi)部的AbstractPollingIoProcessor.Processor---thread.
5.在new NioSocketAcceptor的時(shí)候如果你不提供Executor(線(xiàn)程池)的話(huà),那么默認(rèn)使用Executors.newCachedThreadPool().
這個(gè)Executor將被NioSocketAcceptor和NioProcessor公用,也就是說(shuō)上面的Acceptor---thread(一條)和Processor---thread(多條)都是源于這個(gè)Executor.
      當(dāng)一個(gè)連接java nio channal--NioSession被加到ProcessorPool[i]--NioProcessor中去后就轉(zhuǎn)入了AbstractPollingIoProcessor.Processor.run,
AbstractPollingIoProcessor.Processor.run方法是運(yùn)行在上面的Executor中的一條線(xiàn)程中的,當(dāng)前的NioProcessor將處理注冊(cè)在它的selector上的所有連接的請(qǐng)求[interestOps:SelectionKey.OP_READ].

AbstractPollingIoProcessor.Processor.run的主要執(zhí)行流程:
for (;;) {      
       ......
       int selected = selector(final SELECT_TIMEOUT = 1000L);
       .......
       if (selected > 0) {
          process();
       }
       ......
}

process()-->for all session-channal:OP_READ -->read(session):這個(gè)read方法是AbstractPollingIoProcessor.private void read(T session)方法.
read(session)的主要執(zhí)行流程是read channal-data to buf,if readBytes>0 then IoFilterChain.fireMessageReceived(buf)/*我們的IoHandler.messageReceived將在其中被調(diào)用*/;
    到此mina Nio 處理請(qǐng)求的流程已經(jīng)明了.
    mina處理請(qǐng)求的線(xiàn)程模型也出來(lái)了,性能問(wèn)題也來(lái)了,那就是在AbstractPollingIoProcessor.Processor.run-->process-->read(per session)中,在process的時(shí)候mina是for all selected-channals 逐次read data再fireMessageReceived到我們的IoHandler.messageReceived中,而不是并發(fā)處理,這樣一來(lái)很明顯后來(lái)的請(qǐng)求將被延遲處理.
我們假設(shè):如果NioProcessorPool's size=2 現(xiàn)在有200個(gè)客戶(hù)端同時(shí)連接過(guò)來(lái),假設(shè)每個(gè)NioProcessor都注冊(cè)了100個(gè)連接,對(duì)于每個(gè)NioProcessor將依次順序處理這100個(gè)請(qǐng)求,那么這其中的第100個(gè)請(qǐng)求要得到處理,那它只有等到前面的99個(gè)被處理完了.
    有人提出了改進(jìn)方案,那就是在我們自己的IoHandler.messageReceived中利用線(xiàn)程池再進(jìn)行分發(fā)dispatching,這個(gè)當(dāng)然是個(gè)好主意.
    但是請(qǐng)求還是被延遲處理了,因?yàn)檫€有read data所消耗的時(shí)間,這樣第100個(gè)請(qǐng)求它的數(shù)據(jù)要被讀,就要等前面的99個(gè)都被讀完才行,即便是增加ProcessorPool的尺寸也不能解決這個(gè)問(wèn)題.
    此外mina的陷阱(這個(gè)詞較時(shí)髦)也出來(lái)了,就是在read(session)中,在說(shuō)這個(gè)陷阱之前先說(shuō)明一下,我們的client端向server端發(fā)送一個(gè)消息體的時(shí)候不一定是完整的只發(fā)送一次,可能分多次發(fā)送,特別是在client端忙或要發(fā)送的消息體的長(zhǎng)度較長(zhǎng)的時(shí)候.而mina在這種情況下就會(huì)call我們的IoHandler.messageReceived多次,結(jié)果就是消息體被分割了若干份,等于我們?cè)贗oHandler.messageReceived中每次處理的數(shù)據(jù)都是不完整的,這會(huì)導(dǎo)致數(shù)據(jù)丟失,無(wú)效.
下面是read(session)的源碼:
private void read(T session) {
        IoSessionConfig config = session.getConfig();
        IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());

        final boolean hasFragmentation =
            session.getTransportMetadata().hasFragmentation();

        try {
            int readBytes = 0;
            int ret;

            try {
                if (hasFragmentation/*hasFragmentation一定為ture,也許mina的開(kāi)發(fā)人員也意識(shí)到了傳輸數(shù)據(jù)的碎片問(wèn)題,但是靠下面的處理是遠(yuǎn)遠(yuǎn)不夠的,因?yàn)閏lient一旦間隔發(fā)送,ret就可能為0,退出while,不完整的readBytes將被fire*/) {
                    while ((ret = read(session, buf)) > 0) {
                        readBytes += ret;
                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);
                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }

            if (readBytes > 0) {
                IoFilterChain filterChain = session.getFilterChain(); 
                filterChain.fireMessageReceived(buf);
                buf = null;

                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            }
            if (ret < 0) {
                scheduleRemove(session);
            }
        } catch (Throwable e) {
            if (e instanceof IOException) {
                scheduleRemove(session);
            }
            IoFilterChain filterChain = session.getFilterChain(); 
            filterChain.fireExceptionCaught(e);
        }
    }
這個(gè)陷阱大家可以測(cè)試一下,看會(huì)不會(huì)一個(gè)完整的消息被多次發(fā)送,你的IoHandler.messageReceived有沒(méi)有被多次調(diào)用.
要保持我們應(yīng)用程序消息體的完整性也很簡(jiǎn)單只需創(chuàng)建一個(gè)斷點(diǎn)breakpoint,然后set it to the current IoSession,一旦消息體數(shù)據(jù)完整就dispatching it and remove it from the current session.
-------------------------------------------------------------------------------------------------- 
下面以xSocket v2_8_8源碼為例:
tcp usage e.g:
IServer srv = new Server(8090, new EchoHandler());
srv.start() or run(); 
-----------------------------------------------------------------------
class EchoHandler implements IDataHandler {   
    public boolean onData(INonBlockingConnection nbc) 
             throws IOException, 
             BufferUnderflowException, 
             MaxReadSizeExceededException {
       String data = nbc.readStringByDelimiter("\r\n");
       nbc.write(data + "\r\n");
       return true;
    }
  }
------------------------------------------------------------------------
說(shuō)明1.Server:Acceptor:IDataHandler ------1:1:1
Server.run-->IoAcceptor.accept()在port上阻塞,一旦有channel就從IoSocketDispatcherPool中獲取一個(gè)IoSocketDispatcher,同時(shí)構(gòu)建一個(gè)IoSocketHandler和NonBlockingConnection,調(diào)用Server.LifeCycleHandler.onConnectionAccepted(ioHandler)  initialize the IoSocketHandler.注意:IoSocketDispatcherPool.size默認(rèn)為2,也就是說(shuō)只有2條do select的線(xiàn)程和相應(yīng)的2個(gè)IoSocketDispatcher.這個(gè)和MINA的NioProcessor數(shù)是一樣的.
說(shuō)明2.IoSocketDispatcher[java nio Selector]:IoSocketHandler:NonBlockingConnection------1:1:1
在IoSocketDispatcher[對(duì)應(yīng)一個(gè)Selector].run中--->IoSocketDispatcher.handleReadWriteKeys:
for all selectedKeys 
{
    IoSocketHandler.onReadableEvent/onWriteableEvent.

IoSocketHandler.onReadableEvent的處理過(guò)程如下:
1.readSocket();
2.NonBlockingConnection.IoHandlerCallback.onData 
NonBlockingConnection.onData--->appendDataToReadBuffer: readQueue append data
3.NonBlockingConnection.IoHandlerCallback.onPostData
NonBlockingConnection.onPostData--->HandlerAdapter.onData[our dataHandler] performOnData in WorkerPool[threadpool]. 

因?yàn)槭前裞hannel中的數(shù)據(jù)讀到readQueue中,應(yīng)用程序的dataHandler.onData會(huì)被多次調(diào)用直到readQueue中的數(shù)據(jù)讀完為止.所以依然存在類(lèi)似mina的陷阱.解決的方法依然類(lèi)似,因?yàn)檫@里有NonBlockingConnection.
----------------------------------------------------------------------------------------------
再下面以grizzly-nio-framework v1.9.18源碼為例:
tcp usage e.g:
Controller sel = new Controller();
         sel.setProtocolChainInstanceHandler(new DefaultProtocolChainInstanceHandler(){
             public ProtocolChain poll() {
                 ProtocolChain protocolChain = protocolChains.poll();
                 if (protocolChain == null){
                     protocolChain = new DefaultProtocolChain();
                     //protocolChain.addFilter(our app's filter/*應(yīng)用程序的處理從filter開(kāi)始,類(lèi)似mina.ioHandler,xSocket.dataHandler*/);
                     //protocolChain.addFilter(new ReadFilter());
                 }
                 return protocolChain;
             }
         });
         //如果你不增加自己的SelectorHandler,Controller就默認(rèn)使用TCPSelectorHandler port:18888
         sel.addSelectorHandler(our app's selectorHandler on special port);         
  sel.start();
------------------------------------------------------------------------------------------------------------
 說(shuō)明1.Controller:ProtocolChain:Filter------1:1:n,Controller:SelectorHandler------1:n,
SelectorHandler[對(duì)應(yīng)一個(gè)Selector]:SelectorHandlerRunner------1:1,
Controller. start()--->for per SelectorHandler start SelectorHandlerRunner to run.
SelectorHandlerRunner.run()--->selectorHandler.select()  then handleSelectedKeys:
for all selectedKeys 
{
   NIOContext.execute:dispatching to threadpool for ProtocolChain.execute--->our filter.execute.

你會(huì)發(fā)現(xiàn)這里沒(méi)有read data from channel的動(dòng)作,因?yàn)檫@將由你的filter來(lái)完成.所以自然沒(méi)有mina,xsocket它們的陷阱問(wèn)題,分發(fā)提前了.但是你要注意SelectorHandler:Selector:SelectorHandlerRunner:Thread[SelectorHandlerRunner.run]都是1:1:1:1,也就是說(shuō)只有一條線(xiàn)程在doSelect then handleSelectedKeys.

    相比之下雖然grizzly在并發(fā)性能上更優(yōu),但是在易用性方面卻不如mina,xsocket,比如類(lèi)似mina,xsocket中表示當(dāng)前連接或會(huì)話(huà)的IoSession,INonBlockingConnection對(duì)象在grizzly中由NIOContext來(lái)負(fù)責(zé),但是NIOContext并沒(méi)有提供session/connection lifecycle event,以及常規(guī)的read/write操作,這些都需要你自己去擴(kuò)展SelectorHandler和ProtocolFilter,從另一個(gè)方面也可以說(shuō)明grizzly的可擴(kuò)展性,靈活性更勝一籌.