91久久精品id_国产成人Av片无码免费_电影资源免费观看网站_久久夜色精品国产AV麻豆精国品_亚洲AV无码乱码在线观看烟雨楼

Reactor 模式與Tomcat中的Reactor

  • 2023-06-23 17:14:01
  • 來源:博客園

系列文章目錄和關(guān)于我

參考:[nio.pdf (oswego.edu)](https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf)
一丶什么是Reactor

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

Reactor模式是一種用于處理高并發(fā)的設(shè)計模式,也被稱為事件驅(qū)動模式。在這種模式中,應(yīng)用程序會將輸入事件交給一個事件處理器,稱為Reactor,Reactor會監(jiān)聽所有輸入事件,并將它們分發(fā)給相應(yīng)的處理程序進行處理。這種模式可以大大提高應(yīng)用程序的性能和可擴展性,因為它使用了非阻塞I/O和異步處理技術(shù),使得一個進程可以同時處理多個事件,而不會因為某個事件的處理時間過長而影響其他事件的處理。Reactor模式被廣泛應(yīng)用于網(wǎng)絡(luò)編程和操作系統(tǒng)級別的事件驅(qū)動程序。


(資料圖)

二丶為什么需要Reactor1.傳統(tǒng)BIO

在傳統(tǒng)BIO模式中有多少個客戶端請求,就需要多少個對于的線程進行一對一的處理。

這種模型有如下缺點:

同步阻塞IO,讀寫阻塞,大量線程掛起指定線程數(shù)的時候,只能依據(jù)系統(tǒng)的cpu核心數(shù),無法根據(jù)并發(fā)請求數(shù)來指定。大量線程導(dǎo)致上下文切換開銷大,線程占用內(nèi)存大。2.NIO

Java NIO 帶來非阻塞IO,和IO多路復(fù)用。

得益于非阻塞IO和IO多路復(fù)用,讓服務(wù)可以處理更多的并發(fā)請,不再受限于一個客戶端一個線程來處理,而是一個線程可以維護多個客戶端。

可以看到j(luò)ava 中NIO有點reactor的意思:

Selector多路復(fù)用器監(jiān)聽IO事件進行分發(fā),針對連接事件,讀寫事件進行不同的處理。

Reactor核心是Reactor加上對應(yīng)的處理器Handler,Reactor在一個單獨的線程中運行,負責(zé)監(jiān)聽和分發(fā)事件,將接收到的事件交給不同的Handler來處理,Handler是處理程序執(zhí)行I/O事件的實際操作。

高并發(fā):Reactor模式可以在同一時間內(nèi)處理大量的客戶端請求,提高了系統(tǒng)的并發(fā)處理能力。得益于Java NIO 非阻塞IO 于 IO多路復(fù)用??蓴U展性:Reactor模式可以很容易地擴展到更多的處理器,以滿足更高的并發(fā)量。編碼簡單:Reactor模式可以使編碼更加簡單明了,因為它將不同的事件分離開來處理,降低了代碼的復(fù)雜度。例如Netty就使用了Reactor模式,程序員只需要寫如何處理事件效率高:Reactor模式采用非阻塞I/O和異步處理技術(shù),可以使得一個進程可以同時處理多個事件,而不會因為某個事件的處理時間過長而影響其他事件的處理,從而提高了系統(tǒng)的效率??梢浦残院茫篟eactor模式可以很方便地移植到不同的平臺上,因為它遵循了標準的Java NIO接口,可以在不同的操作系統(tǒng)上實現(xiàn)。三丶Reactor模型于簡單代碼實現(xiàn)1.單Reactor單線程模型

這個模型詮釋了Reactor模式的組成部分:

Reactor 負責(zé)分離套接字,對于觸發(fā)connect的io事件交給Acceptor處理,對于IO讀寫事件交給Handler處理Acceptor負責(zé)創(chuàng)建Handler,將Handler和socketChannel進行綁定,當socketChannel讀事件觸發(fā)后,Reactor進行分發(fā)給對應(yīng)Handler處理。
public class Reactor implements Runnable {        //多路復(fù)用器    final Selector selector;//服務(wù)端Channel    final ServerSocketChannel serverSocket;    Reactor(int port) throws IOException {        selector = Selector.open();        serverSocket = ServerSocketChannel.open();        serverSocket.socket().bind(                new InetSocketAddress(port));        serverSocket.configureBlocking(false);        // 注冊io多路復(fù)用器連接事件        SelectionKey sk =                serverSocket.register(selector,                        SelectionKey.OP_ACCEPT);        // 將服務(wù)端Channel 關(guān)聯(lián)一個Acceptor        sk.attach(new Acceptor());    }    @Override    public void run() {        try {            while (!Thread.interrupted()) {                selector.select();                Set selected = selector.selectedKeys();                Iterator it = selected.iterator();                while (it.hasNext())                    //分發(fā)                    dispatch(it.next());                selected.clear();            }        } catch (IOException ex) { /* ... */ }    }    void dispatch(SelectionKey k) {        // 拿到關(guān)聯(lián)的acceptor 或者handler        Runnable r = (Runnable) (k.attachment());        if (r != null)            r.run();    }    //內(nèi)部類 負責(zé)處理連接事件    class Acceptor implements Runnable {        public void run() {            try {                // 拿到Channel                SocketChannel c = serverSocket.accept();                if (c != null)                    // 創(chuàng)建handler                    new Handler(selector, c);            } catch (IOException ex) { /* ... */ }        }    }    final class Handler implements Runnable {        final SocketChannel socket;        final SelectionKey sk;        ByteBuffer input = ByteBuffer.allocate(1024);        ByteBuffer output = ByteBuffer.allocate(1024);        static final int READING = 0, SENDING = 1;        int state = READING;        //設(shè)置非阻塞        //監(jiān)聽可讀事件        Handler(Selector sel, SocketChannel c)                throws IOException {            socket = c;            c.configureBlocking(false);            sk = socket.register(sel, 0);            sk.attach(this);            sk.interestOps(SelectionKey.OP_READ);            sel.wakeup();        }        boolean inputIsComplete() {            return false;        }        boolean outputIsComplete() {            return false;        }        void process() {        }        public void run() {            try {                //如果可讀                if (state == READING) read();//如果可寫                else if (state == SENDING) send();            } catch (IOException ex) { /* ... */ }        }        void read() throws IOException {            socket.read(input);            if (inputIsComplete()) {                process();                state = SENDING;                sk.interestOps(SelectionKey.OP_WRITE);            }        }        void send() throws IOException {            socket.write(output);            if (outputIsComplete()) sk.cancel();        }    }}

可以看到Reactor模式將Channel和Acceptor,Handler進行綁定依賴于SelectionKey#attach方法,通過此方法在不同的事件發(fā)生時調(diào)用SelectionKey#attachment方法,獲取到對應(yīng)的處理程序進行處理。

Reactor由單線程運行,通過IO多路復(fù)用Selector監(jiān)聽多個事件是否就緒,得益于Channel提供的非阻塞IO能力,當IO沒有就緒的時候,單線程不會阻塞而是繼續(xù)處理下一個。

由于其單線程的原因,無法利用計算機多核心資源,并且如果讀取請求內(nèi)容處理的過程存在耗時操作(比如數(shù)據(jù)庫,rpc等)那么回導(dǎo)致下一個事件得不到快速的響應(yīng)。

2.單Reactor多線程模型

引入多線程解決單線程Reactor的不足

可以看到多線程模型引入了線程池,對于就緒的可讀,可寫IO事件交給線程池進行處理。

主要是對單線程模型中的Handler進行改造,將處理邏輯提交到線程池中。

多線程模型涉及到共享資源的使用,不如讀寫Channel依賴的Buffer如何分配。

可以看到多線程模型的缺點:線程通信和同步邏輯復(fù)雜,需要處理多線程安全問題。

3.多Reactor多線程模型

在這種模型中,mainReactor負責(zé)處理連接建立事件,只需要一個線程即可。subReactor負責(zé)和建立連接的socket進行數(shù)據(jù)交互并處理業(yè)務(wù)邏輯,并且每一個subReactor可持有一個獨立的Selector進行IO多路復(fù)用事件監(jiān)聽。

// SubReactor 池子,負責(zé)負載均衡的選擇SubReactorpublic class SubReactorPool {    final static SubReactor[] subReactors;    static final AtomicInteger count = new AtomicInteger();    static {        int availableProcessors = Runtime.getRuntime().availableProcessors();        subReactors = new SubReactor[availableProcessors];        for (int i = 0; i < subReactors.length; i++) {            subReactors[i] = new SubReactor();        }    }    static class SubReactor implements Runnable{        // 業(yè)務(wù)處理線程池        final static Executor poolExecutor = Executors.newCachedThreadPool();// io多路復(fù)用        Selector selector;        SubReactor()  {            try {                selector = Selector.open();            } catch (IOException e) {                throw new RuntimeException(e);            }        }        public void registry(SocketChannel socketChannel) throws ClosedChannelException {            socketChannel.register(selector,SelectionKey.OP_READ);        }        @Override        public void run() {            while (true){                try {                    selector.select();                } catch (IOException e) {                    throw new RuntimeException(e);                }                Set selectionKeys = selector.selectedKeys();                Iterator iterator = selectionKeys.iterator();                while (iterator.hasNext()){                    SelectionKey sk = iterator.next();                    if (sk.isReadable()) {                        poolExecutor.execute(()->new Handler(sk));                    }                    // 可寫,。。。。                    iterator.remove();                }            }        }    }        //選擇合適的SubReactor    static SubReactor loadBalanceChoose(SocketChannel socketChannel){        int countInt = count.getAndAdd(1);        return subReactors[countInt % subReactors.length];    }}

多Reactor解決了單個Selector注冊連接,讀寫事件,導(dǎo)致內(nèi)核輪詢的時候需要判斷太多fd而效率緩慢的問題。

四丶Tomcat中Reactor

在Tomcat請求處理流程與源碼淺析 - Cuzzz - 博客園 (cnblogs.com)中,說到Tomcat Connector的設(shè)計

其中

Endpoint:tomcat中沒有這個接口,只有AbstractEndpoint,它負責(zé)啟動線程來監(jiān)聽服務(wù)器端口,并且在接受到數(shù)據(jù)后交給Processor處理Processor:Processor讀取到客戶端請求后按照請求地址映射到具體的容器進行處理,這個過程請求映射,Processor實現(xiàn)請求映射依賴于Mapper對象,在容器發(fā)生注冊和注銷的時候,MapperListener會監(jiān)聽到對應(yīng)的事件,從而來變更Mapper中維護的請求映射信息。ProtocolHandler:協(xié)議處理器,針對不同的IO方式(NIO,BIO等)和不同的協(xié)議(Http,AJP)具備不同的實現(xiàn),ProtocolHandler包含一個Endpoint來開啟端口監(jiān)聽,并且包含一個Processor用于按照協(xié)議讀取數(shù)據(jù)并將請求交給容器處理。Acceptor:Acceptor實現(xiàn)了Runnable接口,可以作為一個線程啟動,使用Socket API監(jiān)聽指定端口,用于接收用戶請求。Poller:主要用于監(jiān)測注冊在原始 scoket 上的事件是否發(fā)生,Acceptor接受到請求后,會注冊到Poller的隊列中。

下圖展示了Acceptor 和 Poller的協(xié)作

1.Acceptor 等待客戶端連接

這一步借助ServerSocketChannel#accept方法,進行等待客戶端連接,Acceptor單線程進行監(jiān)聽。

2.Acceptor選擇Poller進行注冊

這一步設(shè)置非阻塞,并且使用計數(shù)取模的方式實現(xiàn)多個Poller的負載均衡

然后將事件保證為PollerEvent 提交到Poller的阻塞隊列中

3.Poller 輪詢阻塞隊列中的PollerEvent并注冊到Selector上

輪詢阻塞隊列中的PollerEvent,并且調(diào)用run方法,run方法會把事件注冊到Poller的Selector上,注意下面的注冊將NioSocketWrapper作為attachment進行了綁定

4.Poller中Selector IO多路復(fù)用處理事件,并處理事件

tomcat處理事件的時候,會創(chuàng)建出SocketProcessor進行處理,SocketProcessor是一個Runnable,最后會提交到線程池。

關(guān)鍵詞:

Copyright@  2015-2022 南非包裝網(wǎng)版權(quán)所有  備案號: 滬ICP備2022005074號-13   聯(lián)系郵箱:58 55 97 3@qq.com