系列文章目錄和關(guān)于我
參考:[nio.pdf (oswego.edu)](https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf)
一丶什么是ReactorThe reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.
Reactor模式是一種用于處理高并發(fā)的設(shè)計模式,也被稱為事件驅(qū)動模式。在這種模式中,應(yīng)用程序會將輸入事件交給一個事件處理器,稱為Reactor,Reactor會監(jiān)聽所有輸入事件,并將它們分發(fā)
給相應(yīng)的處理程序進行處理。這種模式可以大大提高應(yīng)用程序的性能和可擴展性,因為它使用了非阻塞I/O和異步處理技術(shù),使得一個進程可以同時處理多個事件,而不會因為某個事件的處理時間過長而影響其他事件的處理。Reactor模式被廣泛應(yīng)用于網(wǎng)絡(luò)編程和操作系統(tǒng)級別的事件驅(qū)動程序。
(資料圖)
二丶為什么需要Reactor1.傳統(tǒng)BIO在傳統(tǒng)BIO模式中有多少個客戶端請求,就需要多少個對于的線程進行一對一的處理。
這種模型有如下缺點:
同步阻塞IO,讀寫阻塞,大量線程掛起指定線程數(shù)的時候,只能依據(jù)系統(tǒng)的cpu核心數(shù),無法根據(jù)并發(fā)請求數(shù)來指定。大量線程導(dǎo)致上下文切換開銷大,線程占用內(nèi)存大。2.NIOJava NIO 帶來非阻塞IO,和IO多路復(fù)用。
得益于非阻塞IO和IO多路復(fù)用,讓服務(wù)可以處理更多的并發(fā)請,不再受限于一個客戶端一個線程來處理,而是一個線程可以維護多個客戶端。
可以看到j(luò)ava 中NIO有點reactor的意思:
Selector多路復(fù)用器監(jiān)聽IO事件進行分發(fā),針對連接事件,讀寫事件進行不同的處理。
Reactor核心是Reactor加上對應(yīng)的處理器Handler,Reactor在一個單獨的線程中運行,負責(zé)監(jiān)聽和分發(fā)事件,將接收到的事件交給不同的Handler來處理,Handler是處理程序執(zhí)行I/O事件的實際操作。
高并發(fā):Reactor模式可以在同一時間內(nèi)處理大量的客戶端請求,提高了系統(tǒng)的并發(fā)處理能力。得益于Java NIO 非阻塞IO 于 IO多路復(fù)用
??蓴U展性:Reactor模式可以很容易地擴展到更多的處理器,以滿足更高的并發(fā)量。編碼簡單:Reactor模式可以使編碼更加簡單明了,因為它將不同的事件分離開來處理,降低了代碼的復(fù)雜度。例如Netty就使用了Reactor模式,程序員只需要寫如何處理事件
效率高:Reactor模式采用非阻塞I/O和異步處理技術(shù),可以使得一個進程可以同時處理多個事件,而不會因為某個事件的處理時間過長而影響其他事件的處理,從而提高了系統(tǒng)的效率??梢浦残院茫篟eactor模式可以很方便地移植到不同的平臺上,因為它遵循了標準的Java NIO接口,可以在不同的操作系統(tǒng)上實現(xiàn)。三丶Reactor模型于簡單代碼實現(xiàn)1.單Reactor單線程模型這個模型詮釋了Reactor模式的組成部分:
Reactor 負責(zé)分離套接字,對于觸發(fā)connect的io事件交給Acceptor處理,對于IO讀寫事件交給Handler處理Acceptor負責(zé)創(chuàng)建Handler,將Handler和socketChannel進行綁定,當socketChannel讀事件觸發(fā)后,Reactor進行分發(fā)給對應(yīng)Handler處理。public class Reactor implements Runnable { //多路復(fù)用器 final Selector selector;//服務(wù)端Channel final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind( new InetSocketAddress(port)); serverSocket.configureBlocking(false); // 注冊io多路復(fù)用器連接事件 SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); // 將服務(wù)端Channel 關(guān)聯(lián)一個Acceptor sk.attach(new Acceptor()); } @Override public void run() { try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) //分發(fā) dispatch(it.next()); selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { // 拿到關(guān)聯(lián)的acceptor 或者handler Runnable r = (Runnable) (k.attachment()); if (r != null) r.run(); } //內(nèi)部類 負責(zé)處理連接事件 class Acceptor implements Runnable { public void run() { try { // 拿到Channel SocketChannel c = serverSocket.accept(); if (c != null) // 創(chuàng)建handler new Handler(selector, c); } catch (IOException ex) { /* ... */ } } } final class Handler implements Runnable { final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(1024); ByteBuffer output = ByteBuffer.allocate(1024); static final int READING = 0, SENDING = 1; int state = READING; //設(shè)置非阻塞 //監(jiān)聽可讀事件 Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); sk = socket.register(sel, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); sel.wakeup(); } boolean inputIsComplete() { return false; } boolean outputIsComplete() { return false; } void process() { } public void run() { try { //如果可讀 if (state == READING) read();//如果可寫 else if (state == SENDING) send(); } catch (IOException ex) { /* ... */ } } void read() throws IOException { socket.read(input); if (inputIsComplete()) { process(); state = SENDING; sk.interestOps(SelectionKey.OP_WRITE); } } void send() throws IOException { socket.write(output); if (outputIsComplete()) sk.cancel(); } }}
可以看到Reactor模式將Channel和Acceptor,Handler進行綁定依賴于SelectionKey#attach
方法,通過此方法在不同的事件發(fā)生時調(diào)用SelectionKey#attachment
方法,獲取到對應(yīng)的處理程序進行處理。
Reactor由單線程運行,通過IO多路復(fù)用Selector監(jiān)聽多個事件是否就緒,得益于Channel提供的非阻塞IO能力,當IO沒有就緒的時候,單線程不會阻塞而是繼續(xù)處理下一個。
由于其單線程的原因,無法利用計算機多核心資源,并且如果讀取請求內(nèi)容處理的過程存在耗時操作(比如數(shù)據(jù)庫,rpc等)那么回導(dǎo)致下一個事件得不到快速的響應(yīng)。
2.單Reactor多線程模型引入多線程解決單線程Reactor的不足
可以看到多線程模型引入了線程池,對于就緒的可讀,可寫IO事件交給線程池進行處理。
主要是對單線程模型中的Handler進行改造,將處理邏輯提交到線程池中。
多線程模型涉及到共享資源的使用,不如讀寫Channel依賴的Buffer如何分配。
可以看到多線程模型的缺點:線程通信和同步邏輯復(fù)雜,需要處理多線程安全問題。
3.多Reactor多線程模型在這種模型中,mainReactor負責(zé)處理連接建立事件,只需要一個線程即可。subReactor負責(zé)和建立連接的socket進行數(shù)據(jù)交互并處理業(yè)務(wù)邏輯,并且每一個subReactor可持有一個獨立的Selector進行IO多路復(fù)用事件監(jiān)聽。
// SubReactor 池子,負責(zé)負載均衡的選擇SubReactorpublic class SubReactorPool { final static SubReactor[] subReactors; static final AtomicInteger count = new AtomicInteger(); static { int availableProcessors = Runtime.getRuntime().availableProcessors(); subReactors = new SubReactor[availableProcessors]; for (int i = 0; i < subReactors.length; i++) { subReactors[i] = new SubReactor(); } } static class SubReactor implements Runnable{ // 業(yè)務(wù)處理線程池 final static Executor poolExecutor = Executors.newCachedThreadPool();// io多路復(fù)用 Selector selector; SubReactor() { try { selector = Selector.open(); } catch (IOException e) { throw new RuntimeException(e); } } public void registry(SocketChannel socketChannel) throws ClosedChannelException { socketChannel.register(selector,SelectionKey.OP_READ); } @Override public void run() { while (true){ try { selector.select(); } catch (IOException e) { throw new RuntimeException(e); } Set selectionKeys = selector.selectedKeys(); Iterator iterator = selectionKeys.iterator(); while (iterator.hasNext()){ SelectionKey sk = iterator.next(); if (sk.isReadable()) { poolExecutor.execute(()->new Handler(sk)); } // 可寫,。。。。 iterator.remove(); } } } } //選擇合適的SubReactor static SubReactor loadBalanceChoose(SocketChannel socketChannel){ int countInt = count.getAndAdd(1); return subReactors[countInt % subReactors.length]; }}
多Reactor解決了單個Selector注冊連接,讀寫事件,導(dǎo)致內(nèi)核輪詢的時候需要判斷太多fd而效率緩慢的問題。
四丶Tomcat中Reactor在Tomcat請求處理流程與源碼淺析 - Cuzzz - 博客園 (cnblogs.com)中,說到Tomcat Connector的設(shè)計
其中
Endpoint:tomcat中沒有這個接口,只有AbstractEndpoint,它負責(zé)啟動線程來監(jiān)聽服務(wù)器端口,并且在接受到數(shù)據(jù)后交給Processor處理Processor:Processor讀取到客戶端請求后按照請求地址映射到具體的容器進行處理,這個過程請求映射,Processor實現(xiàn)請求映射依賴于Mapper對象,在容器發(fā)生注冊和注銷的時候,MapperListener會監(jiān)聽到對應(yīng)的事件,從而來變更Mapper中維護的請求映射信息。ProtocolHandler:協(xié)議處理器,針對不同的IO方式(NIO,BIO等)和不同的協(xié)議(Http,AJP)具備不同的實現(xiàn),ProtocolHandler包含一個Endpoint來開啟端口監(jiān)聽,并且包含一個Processor用于按照協(xié)議讀取數(shù)據(jù)并將請求交給容器處理。Acceptor:Acceptor實現(xiàn)了Runnable接口,可以作為一個線程啟動,使用Socket API監(jiān)聽指定端口,用于接收用戶請求。Poller:主要用于監(jiān)測注冊在原始 scoket 上的事件是否發(fā)生,Acceptor接受到請求后,會注冊到Poller的隊列中。下圖展示了Acceptor 和 Poller的協(xié)作
1.Acceptor 等待客戶端連接這一步借助ServerSocketChannel#accept方法,進行等待客戶端連接,Acceptor單線程進行監(jiān)聽。
2.Acceptor選擇Poller進行注冊這一步設(shè)置非阻塞,并且使用計數(shù)取模的方式實現(xiàn)多個Poller的負載均衡
然后將事件保證為PollerEvent 提交到Poller的阻塞隊列中
3.Poller 輪詢阻塞隊列中的PollerEvent并注冊到Selector上輪詢阻塞隊列中的PollerEvent,并且調(diào)用run方法,run方法會把事件注冊到Poller的Selector上,注意下面的注冊將NioSocketWrapper作為attachment進行了綁定
4.Poller中Selector IO多路復(fù)用處理事件,并處理事件tomcat處理事件的時候,會創(chuàng)建出SocketProcessor進行處理,SocketProcessor是一個Runnable,最后會提交到線程池。
關(guān)鍵詞:
Copyright@ 2015-2022 南非包裝網(wǎng)版權(quán)所有 備案號: 滬ICP備2022005074號-13 聯(lián)系郵箱:58 55 97 3@qq.com