免费爱碰视频在线观看,九九精品国产屋,欧美亚洲尤物久久精品,1024在线观看视频亚洲

      圖解 Kafka 網(wǎng)絡(luò)層實(shí)現(xiàn)機(jī)制之上篇

      圖解 Kafka 網(wǎng)絡(luò)層實(shí)現(xiàn)機(jī)制之上篇

      在上一篇中,主要帶大家深度剖析了「 生產(chǎn)者元數(shù)據(jù) 」的拉取、管理全流程,今天我們就來聊聊 Kafka 是如何對(duì) Java NIO 進(jìn)行封裝的 ,本系列總共分為3篇,主要剖析以下幾個(gè)問題:

    1. 針對(duì) Java NIO 的 SocketChannel,kafka 是如何封裝統(tǒng)一的傳輸層來實(shí)現(xiàn)最基礎(chǔ)的網(wǎng)絡(luò)連接以及讀寫操作的?
    2. 剖析 KafkaChannel 是如何對(duì)傳輸層、讀寫 buffer 操作進(jìn)行封裝的?
    3. 剖析工業(yè)級(jí) NIO 實(shí)戰(zhàn):如何基于位運(yùn)算來控制事件的監(jiān)聽以及拆包、粘包是如何實(shí)現(xiàn)的?
    4. 剖析 Kafka 是如何封裝 Selector 多路復(fù)用器的?
    5. 剖析 Kafka 封裝的 Selector 是如何初始化并與 Broker 進(jìn)行連接以及網(wǎng)絡(luò)讀寫的?
    6. 剖析 Kafka 網(wǎng)絡(luò)發(fā)送消息和接收響應(yīng)的整個(gè)過程是怎樣的?
    7. 本篇只討論前3個(gè)問題,剩余的放到后2篇中。

      認(rèn)真讀完這篇文章,我相信你會(huì)對(duì) Kafka 封裝 Java NIO 源碼有更加深刻的理解。

      這篇文章干貨很多,希望你可以耐心讀完。

      01 總體概述

      上篇剖析了「 生產(chǎn)者元數(shù)據(jù)的拉取和管理的全過程 」,此時(shí)發(fā)送消息的時(shí)候就有了元數(shù)據(jù),但是還沒有進(jìn)行網(wǎng)絡(luò)通信,而網(wǎng)絡(luò)通信是一個(gè)相對(duì)復(fù)雜的過程,對(duì)于 Java 系統(tǒng)來說網(wǎng)絡(luò)通信一般會(huì)采用 NIO 庫來實(shí)現(xiàn),所以 Kafka 對(duì) Java NIO 封裝了統(tǒng)一的框架,來實(shí)現(xiàn)多路復(fù)用的網(wǎng)絡(luò) I/O 操作 。

      為了方便大家理解,所有的源碼只保留骨干。

      02 Kafka 對(duì) Java NIO 的封裝

      如果大家對(duì) Java NIO 不了解的話,可以看下這個(gè)文檔,這里就不過多介紹了。

      https://pdai.tech/md/java/io/java-io-nio.html

      我們來看看 Kafka 對(duì) Java NIO 組件做了哪些封裝? 這里先說下結(jié)果,后面會(huì)深度剖析。

    8. TransportLayer:它是一個(gè)接口,封裝了底層 NIO 的 SocketChannel。
    9. NetworkReceive:封裝了 NIO 的 ByteBuffer 中的讀 Buffer, 對(duì)網(wǎng)絡(luò)編程中的粘包、拆包經(jīng)典實(shí)現(xiàn) 。
    10. NetworkSend:封裝了 NIO 的 ByteBuffer 中的寫 Buffer。
    11. KafkaChannel:對(duì) TransportLayer、NetworkReceive、NetworkSend 進(jìn)一步封裝,屏蔽了底層的實(shí)現(xiàn)細(xì)節(jié),對(duì)上層更友好。
    12. KafkaSelector:封裝了 NIO 的 Selector 多路復(fù)用器組件。
    13. 接下來我們挨個(gè)對(duì)上面組件進(jìn)行剖析。

      02 TransportLayer 封裝過程

      TransportLayer 接口是對(duì) NIO 中 「 SocketChannel 」 的封裝。它的實(shí)現(xiàn)類總共有 2 個(gè):

    14. PlaintextTransportLayer:明文網(wǎng)絡(luò)傳輸實(shí)現(xiàn)。
    15. SslTransportLayer:SSL 加密網(wǎng)絡(luò)傳輸實(shí)現(xiàn)。
    16. 本篇只剖析 PlaintextTransportLayer 的實(shí)現(xiàn)。

      github 源碼地址如下:

      https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.javapublic class PlaintextTransportLayer implements TransportLayer { // java nio 中 SelectionKey 事件 private final SelectionKey key; // java nio 中的SocketChannel private final SocketChannel socketChannel; // 安全相關(guān) private final Principal principal = KafkaPrincipal.ANONYMOUS; // 初始化 public PlaintextTransportLayer(SelectionKey key) throws IOException { // 對(duì) NIO 中 SelectionKey 類的對(duì)象引用 this.key = key; // 對(duì) NIO 中 SocketChannel 類的對(duì)象引用 this.socketChannel = (SocketChannel) key.channel(); }}

      從上面代碼可以看出,該類就是 對(duì)底層 NIO 的 socketChannel 封裝引用 。將構(gòu)造函數(shù)的 SelectionKey 類對(duì)象賦值給 key,然后從 key 中取出對(duì)應(yīng)的 SocketChannel 賦值給 socketChannel,這樣就完成了初始化工作。

      接下來,我們看看幾個(gè)重要方法是如何使用這2個(gè) NIO 組件的。

      02.1 finishConnect()

      @Override// 判斷網(wǎng)絡(luò)連接是否完成public boolean finishConnect() throws IOException { // 1. 調(diào)用socketChannel的finishConnect方法,返回該連接是否已經(jīng)連接完成 boolean connected = socketChannel.finishConnect(); // 2. 如果網(wǎng)絡(luò)連接完成以后就刪除對(duì)OP_CONNECT事件的監(jiān)聽,同時(shí)添加對(duì)OP_READ事件的監(jiān)聽 if (connected) // 事件操作 key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); // 3. 最后返回網(wǎng)絡(luò)連接 return connected;}

      該方法主要用來 判斷網(wǎng)絡(luò)連接是否完成 ,如果完成就關(guān)注 「 OP_READ 」 事件,并取消 「 OP_CONNECT 」 事件。

    17. 首先調(diào)用 socketChannel 通道的 finishConnect() 判斷連接是否完成。
    18. 如果網(wǎng)絡(luò)連接完成以后就刪除對(duì) OP_CONNECT 事件的監(jiān)聽,同時(shí)添加對(duì) OP_READ 事件的監(jiān)聽,因?yàn)檫B接完成后就可能接收數(shù)據(jù)了。
    19. 最后返回網(wǎng)絡(luò)連接 connected。
    20. 二進(jìn)制位運(yùn)算事件監(jiān)聽

      這里通過「 二進(jìn)制位運(yùn)算 」巧妙的解決了網(wǎng)絡(luò)事件的監(jiān)聽操作,實(shí)現(xiàn)非常經(jīng)典。

      通過 socketChannel 在 Selector 多路復(fù)用器注冊事件返回 SelectionKey ,SelectionKey 的類型包括:

    21. OP_READ:可讀事件,值為:1<<0 == 1 == 00000001。
    22. OP_WRITE:可寫事件,值為:1<<2 == 4 == 00000100。
    23. OP_CONNECT:客戶端連接服務(wù)端的事件,一般為創(chuàng)建 SocketChannel 客戶端 channel,值為:1<<3 == 8 ==00001000。
    24. OP_ACCEPT:服務(wù)端接收客戶端連接的事件,一般為創(chuàng)建 ServerSocketChannel 服務(wù)端 channel,值為:1<<4 == 16 == 00010000。
    25. key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);

      首先” “符號(hào)代表按位取反,”&”代表按位取與,通過 key.interestOps() 獲取當(dāng)前的事件,然后和 OP_CONNECT事件取反「 11110111 」 后按位與操作。

      所以,”& xx” 代表刪除 xx 事件, 有就刪除,沒有就不變 ;而 “| xx” 代表將 xx 事件添加進(jìn)去。

      02.2 read()

      @Overridepublic int read(ByteBuffer dst) throws IOException { // 調(diào)用 NIO 的通道實(shí)現(xiàn)數(shù)據(jù)的讀取 return socketChannel.read(dst);}

      該方法主要用來 把 socketChannel 里面的數(shù)據(jù)讀取緩沖區(qū) ByteBuffer 里 ,通過調(diào)用 socketChannel.read() 實(shí)現(xiàn)。

      02.3 write()

      @Overridepublic int write(ByteBuffer src) throws IOException { return socketChannel.write(src);}

      該方法主要用來 把緩沖區(qū) ByteBuffer 的數(shù)據(jù)寫到 SocketChannel 里 ,通過調(diào)用 socketChannel.write() 實(shí)現(xiàn)。

      大家都知道在網(wǎng)絡(luò)編程中,一次讀寫操作并一定能把數(shù)據(jù)讀寫完,所以就需要判斷是否讀寫完成,勢必會(huì)涉及數(shù)據(jù)的「 拆包 」、「 粘包 」操作。 這些操作比較繁瑣,因此 Kafka 將 ByteBuffer 的讀寫操作進(jìn)行重新封裝,分別對(duì)應(yīng) NetworkReceive 讀操作、NetworkSend 寫操作,對(duì)于上層調(diào)用無需判斷是否讀寫完成,更加友好 。

      接下來我們就來分別剖析下這2個(gè)類的實(shí)現(xiàn)。

      03 NetworkReceive 封裝過程

      public class NetworkReceive implements Receive { …. // 空 ByteBuffer private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); private final String source; // 存儲(chǔ)響應(yīng)消息數(shù)據(jù)長度 private final ByteBuffer size; // 響應(yīng)消息數(shù)據(jù)的最大長度 private final int maxSize; // ByteBuffer 內(nèi)存池 private final MemoryPool memoryPool; // 已讀取字節(jié)大小 private int requestedBufferSize = -1; // 存儲(chǔ)響應(yīng)消息數(shù)據(jù)體 private ByteBuffer buffer; // 初始化構(gòu)造函數(shù) public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) { this.source = source; // 分配4個(gè)字節(jié)大小的數(shù)據(jù)長度 this.size = ByteBuffer.allocate(4); this.buffer = null; // 能接收消息的最大長度 this.maxSize = maxSize; this.memoryPool = memoryPool; }}

    26. EMPTY_BUFFER:空 Buffer,值為 ByteBuffer.allocate(0)。
    27. source:final類型,用來確定對(duì)應(yīng) channel id。
    28. size:final類型,存儲(chǔ)響應(yīng)消息數(shù)據(jù)長度,大小為4字節(jié)。
    29. maxSize:final類型,接收響應(yīng)消息數(shù)據(jù)的最大長度。
    30. memoryPool:final類型,ByteBuffer 內(nèi)存池。
    31. requestedBufferSize:已讀取字節(jié)大小。
    32. buffer:存儲(chǔ)響應(yīng)消息數(shù)據(jù)體。
    33. 從屬性可以看出,包含2個(gè) ByteBuffer,分別是 size 和 buffer。這里重點(diǎn)說下源碼中的 size字段 的初始化。通過長度編碼方式實(shí)現(xiàn),上來就先分配了 4字節(jié) 大小的 ByteBuffer 來存儲(chǔ)響應(yīng)消息數(shù)據(jù)長度,即32位,與 Java int 占用相同的字節(jié)數(shù),完全滿足表示消息長度的值。

      介紹完字段后,我們來深度剖析下該類的幾個(gè)重要的方法。

      03.1 readFrom()

      public long readFrom(ScatteringByteChannel channel) throws IOException { // 讀取數(shù)據(jù)總大小 int read = 0; // 1.判斷響應(yīng)消息數(shù)據(jù)長度的 ByteBuffer 是否讀完 if (size.hasRemaining()) { // 2.還有剩余,直接讀取消息數(shù)據(jù)的長度 int bytesRead = channel.read(size); if (bytesRead < 0) throw new EOFException(); // 3.每次讀取后,累加到總讀取數(shù)據(jù)大小里 read += bytesRead; // 4.判斷響應(yīng)消息數(shù)據(jù)長度的緩存是否讀完了 if (!size.hasRemaining()) { // 5.重置position size.rewind(); // 6.讀取響應(yīng)消息數(shù)據(jù)長度 int receiveSize = size.getInt(); // 7.如果有異常就拋出 if (receiveSize maxSize) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); // 8.將讀到數(shù)據(jù)長度賦值已讀取字節(jié)大小,即數(shù)據(jù)體的大小 requestedBufferSize = receiveSize; if (receiveSize == 0) { buffer = EMPTY_BUFFER; } } } // 9.如果數(shù)據(jù)體buffer還沒有分配,且響應(yīng)消息數(shù)據(jù)頭已讀完 if (buffer == null && requestedBufferSize != -1) { // 10.分配requestedBufferSize字節(jié)大小的內(nèi)存空間給數(shù)據(jù)體buffer buffer = memoryPool.tryAllocate(requestedBufferSize); if (buffer == null) log.trace("Broker low on memory – could not allocate buffer of size {} for source {}", requestedBufferSize, source); } // 11.判斷buffer是否分配成功 if (buffer != null) { // 12.把channel里的數(shù)據(jù)讀到buffer中 int bytesRead = channel.read(buffer); if (bytesRead < 0) throw new EOFException(); // 13.累計(jì)讀取數(shù)據(jù)總大小 read += bytesRead; } // 14. 返回總大小 return read;}

      該方法主要用來 把對(duì)應(yīng) channel 中的數(shù)據(jù)讀到 ByteBuffer 中 ,包括響應(yīng)消息數(shù)據(jù)長度的 size 和響應(yīng)消息數(shù)據(jù)體長度的 buffer,可能會(huì)被多次調(diào)用,每次都需要判斷 size 和 buffer 的狀態(tài)并讀取。

      在讀取時(shí),先讀取4字節(jié)到 size 中,再根據(jù) size 的大小為 buffer 分配內(nèi)存,然后讀滿整個(gè) buffer 時(shí)就表示讀取完成了。

      通過短短的30行左右代碼就解決了工業(yè)級(jí)「 拆包 」 、「 粘包 」 問題,相當(dāng)?shù)慕?jīng)典 。

      如果要解決「 粘包 」問題,就是在每個(gè)響應(yīng)數(shù)據(jù)中間插入一個(gè)特殊的字節(jié)大小的「 分隔符 」,這里就在響應(yīng)消息體前面插入4個(gè)字節(jié),代表響應(yīng)消息自己本身的數(shù)據(jù)大小,如下圖所示:

      具體「 拆包 」的操作步驟如下:

    34. 調(diào)用 size.hasRemaining() 返回 position 至 limit 之間的字節(jié)大小 來判斷響應(yīng)消息數(shù)據(jù)長度的 ByteBuffer 是否讀完。
    35. 當(dāng)未讀完則通過調(diào)用 NIO 的方法 channel.read(size), 直接把讀取4字節(jié)的響應(yīng)消息數(shù)據(jù)的長度寫入到 ByteBuffer size 中 ,如果已經(jīng)讀取到了4字節(jié),此時(shí) position=4,與 limit 相同, 表示 ByteBuffer size 已經(jīng)讀滿了 。
    36. 每次讀取后,累加到總讀取數(shù)據(jù)大小里
    37. 再次判斷響應(yīng)消息數(shù)據(jù)長度的緩存是否讀完了。
    38. 如果讀完了,先重置 position 位置為0,此時(shí)就可以從 ByteBuffer 中讀取數(shù)據(jù)了,然后 調(diào)用 size.getInt() 從 ByteBuffer 當(dāng)前 position 位置讀取4個(gè)字節(jié),并轉(zhuǎn)化成int 類型數(shù)值賦給 receiveSize ,即響應(yīng)體的長度。
    39. 如果有異常就拋出,包括響應(yīng)數(shù)據(jù)體的長度無效或者大于最大長度等。
    40. 將讀到響應(yīng)數(shù)據(jù)長度賦值 requestedBufferSize,即數(shù)據(jù)體的大小。
    41. 如果響應(yīng)數(shù)據(jù)體 buffer 還沒有分配,且響應(yīng)數(shù)據(jù)頭已讀完,分配 requestedBufferSize 字節(jié)大小的內(nèi)存空間給數(shù)據(jù)體 buffer。
    42. 如果 buffer 分配成功, 表示 size 已讀完,此時(shí)直接把 channel 里的響應(yīng)數(shù)據(jù)讀到跟它大小一致的 ByteBuffer 中 ,再次累計(jì)讀取數(shù)據(jù)總大小。
    43. 最后返回?cái)?shù)據(jù)總大小。
    44. 03.2 complete()

      @Overridepublic boolean complete() { // 響應(yīng)消息頭已讀完 && 響應(yīng)消息體已讀完 return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();}

      該方法主要用來判斷是否都讀取完成, 即響應(yīng)頭大小和響應(yīng)體大小都讀取完 。

      03.3 size()

      // 返回大小public int size() { return payload().limit() + size.limit();}public ByteBuffer payload() { return this.buffer;}

      該方法主要用來返回 響應(yīng)頭和響應(yīng)體還有多少數(shù)據(jù)需要讀出 。

      此時(shí)已經(jīng)剖析完讀 Buffer 的封裝,接下來我們看看寫 Buffer。

      04 NetworkSend 封裝過程

      github 源碼地址如下:

      https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.javahttps://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.javahttps://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/Send.java

      調(diào)用關(guān)系圖如下:

      04.1 Send 接口

      我們先看一下接口 Send 都定義了哪些方法。

      public interface Send { // 要把數(shù)據(jù)寫入目標(biāo)的 channel id String destination(); // 要發(fā)送的數(shù)據(jù)是否發(fā)送完了 boolean completed(); // 把數(shù)據(jù)寫到對(duì)應(yīng) channel 中 long writeTo(GatheringByteChannel channel) throws IOException; // 發(fā)送數(shù)據(jù)的大小 long size();}

      Send 作為要發(fā)送數(shù)據(jù)的接口, 子類 ByteBufferSend 實(shí)現(xiàn) complete() 方法用于判斷是否已經(jīng)發(fā)送完成,實(shí)現(xiàn) writeTo() 方法來實(shí)現(xiàn)寫入數(shù)據(jù)到Channel中。

      04.2 ByteBufferSend 類

      ByteBufferSend 類實(shí)現(xiàn)了 Send 接口, 即實(shí)現(xiàn)了數(shù)據(jù)從 ByteBuffer 數(shù)組發(fā)送到 channel :

      public class ByteBufferSend implements Send { private final String destination; // 總共要寫多少字節(jié)數(shù)據(jù) private final int size; // 用于寫入channel里的ByteBuffer數(shù)組,說明kafka一次最大傳輸字節(jié)是有限定的 protected final ByteBuffer[] buffers; // 總共還剩多少字節(jié)沒有寫完 private int remaining; private boolean pending = false; public ByteBufferSend(String destination, ByteBuffer… buffers) { this.destination = destination; this.buffers = buffers; for (ByteBuffer buffer : buffers) remaining += buffer.remaining(); // 計(jì)算需要寫入字節(jié)的總和 this.size = remaining; }}

      我們來看下這個(gè)類中的幾個(gè)重要字段:

    45. destination:數(shù)據(jù)寫入的目標(biāo) channel id。
    46. size:總共需要往 channel 里寫多少字節(jié)數(shù)據(jù)。
    47. buffers:ByteBuffer數(shù)組類型,用來存儲(chǔ)要寫入 channel 里的數(shù)據(jù)。
    48. remaining:ByteBuffer數(shù)組所有的ByteBuffer 還剩多少字節(jié)沒有寫完。
    49. 介紹完字段后,我們來深度剖析下該類的幾個(gè)重要的方法。

      04.2.1 writeTo()

      @Override// 將字節(jié)流數(shù)據(jù)寫入到channel中public long writeTo(GatheringByteChannel channel) throws IOException { // 1.調(diào)用nio底層write方法把buffers寫入傳輸層返回寫入的字節(jié)數(shù) long written = channel.write(buffers); if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); // 2.計(jì)算還剩多少字節(jié)沒有寫入傳輸層 remaining -= written; // 每次發(fā)送 都檢查是否 pending = TransportLayers.hasPendingWrites(channel); return written;}

      該方法主要用來 把 buffers 數(shù)組寫入到 SocketChannel里 ,因?yàn)樵诰W(wǎng)絡(luò)編程中,寫一次不一定可以完全把數(shù)據(jù)都寫成功,所以調(diào)用底層 channel.write(buffers) 方法會(huì)返回「 已經(jīng)寫入成功多少字節(jié) 」的返回值,這樣調(diào)用一次后就知道已經(jīng)寫入多少字節(jié)了。

      04.2.2 some other

      @Overridepublic String destination() { // 返回對(duì)應(yīng)的channel id return destination;}@Overridepublic boolean completed() { // 判斷是否完成 即沒有剩余&pending=false return remaining <= 0 && !pending;}/** * always returns false as there will be not be any * pending writes since we directly write to socketChannel. */@Overridepublic boolean hasPendingWrites() { // 在PLAINTEXT下 pending 始終為 false return false;}@Overridepublic long size() { // 返回寫入字節(jié)的總和 return this.size;}

      04.3 NetworkSend 類

      NetworkSend 類繼承了 ByteBufferSend 類,真正用來寫 Buffer。

      public class NetworkSend extends ByteBufferSend { // 實(shí)例化 public NetworkSend(String destination, ByteBuffer buffer) { // 調(diào)用父類的方法初始化 super(destination, sizeBuffer(buffer.remaining()), buffer); } // 用來構(gòu)造4個(gè)字節(jié)的 sizeBuffer private static ByteBuffer sizeBuffer(int size) { // 先分配一個(gè)4個(gè)字節(jié)的ByteBuffer ByteBuffer sizeBuffer = ByteBuffer.allocate(4); // 寫入size長度值 sizeBuffer.putInt(size); // 重置 position sizeBuffer.rewind(); // 返回 sizeBuffer return sizeBuffer; }}

      該類相對(duì)簡單些,就是構(gòu)建一個(gè)發(fā)往 channel 對(duì)應(yīng)的節(jié)點(diǎn) id 的消息數(shù)據(jù),它的實(shí)例化過程如下:

    50. 先分配一個(gè)4個(gè)字節(jié)的 ByteBuffer 的變量 sizeBuffer,再把要發(fā)送的數(shù)據(jù)長度賦值給 sizeBuffer。
    51. 此時(shí) sizeBuffer 的響應(yīng)頭字節(jié)數(shù)和 sizeBuffer 的響應(yīng)數(shù)據(jù)就都有了。
    52. 然后調(diào)用父類 ByteBufferSend 的方法進(jìn)行初始化。
    53. 另外 ByteBuffer[] 為兩個(gè) buffer,可以理解為一個(gè)消息頭 buffer 即 size,一個(gè)消息體 buffer。消息頭 buffer 的長度為4byte,存放的是消息體 buffer 的長度。而消息體 buffer 是上層傳入的業(yè)務(wù)數(shù)據(jù),所以 send 就是持有一個(gè)待發(fā)送的 ByteBuffer 。

      接下來我們來看看 KafkaChannel 是如何對(duì)上面幾個(gè)類進(jìn)行封裝的。

      05 KafkaChannel 封裝過程

      github 源碼地址如下:

      https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.javapublic class KafkaChannel implements AutoCloseable { …. // 節(jié)點(diǎn) id private final String id; // 傳輸層對(duì)象 private final TransportLayer transportLayer; …. // 最大能接收請求的字節(jié)數(shù) private final int maxReceiveSize; // 內(nèi)存池,用來分配指定大小的 ByteBuffer private final MemoryPool memoryPool; // NetworkReceive 類的實(shí)例 private NetworkReceive receive; // NetworkSend 類的實(shí)例 private Send send; // 是否關(guān)閉連接 private boolean disconnected; …. // 連接狀態(tài) private ChannelState state; // 需要連接的遠(yuǎn)端地址 private SocketAddress remoteAddress; // 初始化 public KafkaChannel(String id, TransportLayer transportLayer, Supplier authenticatorCreator,int maxReceiveSize, MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) { this.id = id; this.transportLayer = transportLayer; this.authenticatorCreator = authenticatorCreator; this.authenticator = authenticatorCreator.get(); this.networkThreadTimeNanos = 0L; this.maxReceiveSize = maxReceiveSize; this.memoryPool = memoryPool; this.metadataRegistry = metadataRegistry; this.disconnected = false; this.muteState = ChannelMuteState.NOT_MUTED; this.state = ChannelState.NOT_CONNECTED; }}

      我們來看下這個(gè)類中的幾個(gè)重要字段:

    54. id:channel 對(duì)應(yīng)的節(jié)點(diǎn) id。
    55. transportLayer:傳輸層對(duì)象。
    56. maxReceiveSize:最大能接收請求的字節(jié)數(shù)。
    57. memoryPool:內(nèi)存池,用來分配指定大小的 ByteBuffer。
    58. receive:NetworkReceive 類的實(shí)例。
    59. send:NetworkSend 類的實(shí)例。
    60. disconnected:是否關(guān)閉連接。
    61. state:KafkaChannel 的狀態(tài)。
    62. remoteAddress:需要連接的遠(yuǎn)端地址。
    63. 從屬性可以看出, 有3個(gè)最重要的成員變量:TransportLayer、NetworkReceive、Send 。KafkaChannel 通過 TransportLayer 進(jìn)行讀寫操作,NetworkReceive 用來讀取,Send 用來寫出。

      為了封裝普通和加密的Channel「 TransportLayer根據(jù)網(wǎng)絡(luò)協(xié)議的不同,提供不同的子類 」而對(duì)于 KafkaChannel 提供統(tǒng)一的接口,「 這是策略模式很好的應(yīng)用 」。

    64. 每個(gè) NetworkReceive 代表一個(gè)單獨(dú)的響應(yīng),KafkaChannel 讀取的數(shù)據(jù)會(huì)存儲(chǔ)到 NetworkReceive 中,當(dāng) NetworkReceive 讀滿,一個(gè)請求就完整讀取了。
    65. 每個(gè) Send 代表一個(gè)單獨(dú)的請求,需要寫出時(shí)只需賦值此變量,之后調(diào)用 write() 方法將其中的數(shù)據(jù)寫出。
    66. 介紹完字段后,我們來深度剖析下其 網(wǎng)絡(luò)讀寫操作 是如何實(shí)現(xiàn)的?

      05.1 setSend()

      public void setSend(Send send) { if (this.send != null) throw new IllegalStateException(“Attempt to begin a send operation with prior send operation still in progress, connection id is ” + id); // 設(shè)置要發(fā)送消息的字段 this.send = send; // 調(diào)用傳輸層增加寫事件 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);}// PlaintextTransportLayer 類方法@Overridepublic void addInterestOps(int ops) { //通過 key.interestOps() | ops 來添加事件 key.interestOps(key.interestOps() | ops);}

      該方法主要用來 預(yù)發(fā)送,即在發(fā)送網(wǎng)絡(luò)請求前,將需要發(fā)送的ByteBuffer 數(shù)據(jù)保存到 KafkaChannel 的 send 中 ,然后調(diào)用傳輸層方法增加對(duì)這個(gè) channel 上「 OP_WRITE 」事件的關(guān)注。當(dāng)真正執(zhí)行發(fā)送的時(shí)候,會(huì)從 send 中讀取數(shù)據(jù)。

      05.2 write()

      public long write() throws IOException { // 判斷 send 是否為空,如果為空表示已經(jīng)發(fā)送完畢了 if (send == null) return 0; midWrite = true; // 調(diào)用ByteBufferSend.writeTo把數(shù)據(jù)真正發(fā)送出去 return send.writeTo(transportLayer);}

      該方法主要用來 把保存在 send 上的數(shù)據(jù)真正發(fā)送出去 。

    67. 首先判斷要發(fā)送的 send 是否為空,如果為空則表示在 KafkaChannel 的 Buffer 的數(shù)據(jù)都發(fā)送完畢了。
    68. 如果不為空就調(diào)用ByteBufferSend.writeTo() 方法通過網(wǎng)絡(luò) I/O 操作將數(shù)據(jù)發(fā)送出去。
    69. 05.3 read()

      public long read() throws IOException { // 如果receive為空表示數(shù)據(jù)已經(jīng)讀完,需要重新實(shí)例化對(duì)象 if (receive == null) { // 確保分配了 NetworkReceive receive = new NetworkReceive(maxReceiveSize, id, memoryPool); } //如果未讀完,嘗試讀取該對(duì)象 long bytesReceived = receive(this.receive); if (this.receive.requiredMemoryAmountKnown() && !this.receive.memoryAllocated() && isInMutableState()) { //pool must be out of memory, mute ourselves. mute(); } return bytesReceived;}

      該方法主要用來 把從網(wǎng)絡(luò)I/O操作中讀出的數(shù)據(jù)保存到 NetworkReceive 中 。

    70. 判斷 receive 是否為空,如果為空 表示上次已讀完 ,需要重新實(shí)例化 NetworkReceive 對(duì)象。
    71. 如果 receive 不為空, 表示未讀完,此時(shí)讀取的還是原先的 NetworkReceive 對(duì)象 ,然后再調(diào)用 receive() 方法嘗試把 channel 的數(shù)據(jù)讀到 NetworkReceive 對(duì)象中。
    72. 最后返回讀到的字節(jié)數(shù)。
    73. 05.4 maybeCompleteReceive()

      public NetworkReceive maybeCompleteReceive() { if (receive != null && receive.complete()) { receive.payload().rewind(); NetworkReceive result = receive; receive = null; return result; } return null;}// NetworkReceivepublic boolean complete() { return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();}

      該方法主要用來 判斷數(shù)據(jù)已經(jīng)讀取完畢了 ,而判斷是否讀完的條件是 NetworkReceive 里的 buffer 是否用完 ,包括上面說過的表示響應(yīng)消息頭 size ByteBuffer 和響應(yīng)消息體本身的 buffer ByteBuffer。這兩個(gè)都讀完才算真正讀完了。

    74. 當(dāng) buffer 讀完后調(diào)用 rewind 重置 position位置。
    75. 將 receive 賦值給結(jié)果集 result
    76. 此時(shí)讀完后將 receive 清空,以便下次讀。
    77. 最后返回結(jié)果集 result,完成一次讀操作。
    78. 05.5 maybeCompleteSend()

      // 可能完成發(fā)送public Send maybeCompleteSend() { if (send != null && send.completed()) { midWrite = false; transportLayer.removeInterestOps(SelectionKey.OP_WRITE); Send result = send; send = null; return result; } return null;}// PlaintextTransportLayer 類方法@Overridepublic void removeInterestOps(int ops) { // 通過 key.interestOps() & ~ops 來刪除事件 key.interestOps(key.interestOps() & ~ops);}// ByteBufferSend@Overridepublic boolean completed() { return remaining <= 0 && !pending;}

      該方法主要用來 是否寫數(shù)據(jù)完畢了 ,而判斷的寫數(shù)據(jù)完畢的條件是 buffer 中沒有剩余且pending為false 。

    79. 當(dāng)寫數(shù)據(jù)完畢后,取消傳輸層對(duì) OP_WRITE 事件的監(jiān)聽,完成一次寫操作。
    80. 將 send 賦值給結(jié)果集 result。
    81. 此時(shí)讀完后將 send 清空,以便下次寫。
    82. 最后返回結(jié)果集 result,完成一次寫操作。
    83. 最后我們來聊聊事件注冊和取消的具體時(shí)機(jī),以便更好的理解網(wǎng)絡(luò) I/O 操作。

      06 事件注冊與取消時(shí)機(jī)

      我們知道 Java NIO 是基于 epoll 模型來實(shí)現(xiàn)的。所有基于 epoll 的框架,都有3個(gè)階段:

    84. 注冊事件(OP_CONNECT, OP_ACCEPT, OP_READ, OP_WRITE)。
    85. 輪詢網(wǎng)絡(luò)I/O是否就緒。
    86. 執(zhí)行實(shí)際網(wǎng)絡(luò)I/O操作。
    87. 這里我們來看下相關(guān)事件是何時(shí)被注冊和取消的。

      06.1 OP_CONNECT 事件

      06.1.1 OP_CONNECT 事件注冊時(shí)機(jī)

      在 Selector 發(fā)起網(wǎng)絡(luò)連接的時(shí)候進(jìn)行「 OP_CONNECT 」事件注冊。

      public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { SocketChannel socketChannel = SocketChannel.open(); SelectionKey key = null; try { // 注冊 OP_CONNECT 到 selector 上 key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT); } catch (IOException | RuntimeException e){}}

      06.1.2 OP_CONNECT 事件取消時(shí)機(jī)

      在 PlainTransportLayer 明文傳輸層完成連接的時(shí)候取消 「 OP_CONNECT 」事件。

      public boolean finishConnect() throws IOException { // 刪除連接事件,添加讀事件 key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);}

      06.2 OP_READ 事件

      06.2.1 OP_READ 事件注冊時(shí)機(jī)

      從上面也可以看出,「 OP_READ 」事件的注冊和「 OP_CONNECT 」事件的取消是同時(shí)進(jìn)行的。

      06.2.2 OP_READ 事件取消時(shí)機(jī)

      由于 「 OP_READ 」事件是要一直監(jiān)聽是否有新數(shù)據(jù)到來,所以不會(huì)取消。并且因?yàn)槭?Java NIO 使用的 「 epoll 的 LT 模式 」,只要「 讀緩沖區(qū) 」有數(shù)據(jù),就會(huì)一直觸發(fā)。

      06.3 OP_WRITE 事件

      06.3.1 OP_WRITE 事件注冊時(shí)機(jī)

      在 KafkaChannel 真正發(fā)送網(wǎng)絡(luò)請求之前注冊「 OP_WRITE 」事件。

      public void setSend(Send send) { // 調(diào)用傳輸層增加寫事件 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);}

      06.3.2 OP_WRITE 事件取消時(shí)機(jī)

      public Send maybeCompleteSend() { if (send != null && send.completed()) { //完成一次發(fā)送后取消 OP_WRITE 事件 transportLayer.removeInterestOps(SelectionKey.OP_WRITE); }}

      06.4 事件總結(jié)

    88. 對(duì)于不同事件類型的「 事件就緒 」:
    89. OP_READ事件就緒:即當(dāng)有新數(shù)據(jù)到來,需要去讀取。由于是基于 LT 模式,只要讀緩沖區(qū)有數(shù)據(jù),會(huì)一直觸發(fā)。
    90. OP_WRITE事件就緒:即本地 socketchannel 緩沖區(qū)有沒有寫滿。如果沒有寫滿的話,就會(huì)一直觸發(fā)寫事件。所以要避免「 寫的死循環(huán) 」問題,寫完就要取消寫事件。
    91. OP_CONNECT事件就緒: 即 connect 連接完成。
    92. OP_ACCEPT事件就緒:即有新的連接進(jìn)來,調(diào)用 accept處理。
    93. 不同類型事件處理方式是不一樣的:
    94. OP_CONNECT事件:注冊1次,連接成功之后,就取消了。有且僅有1次。
    95. OP_READ事件:注冊之后不取消,一直監(jiān)聽。
    96. OP_WRITE事件:每調(diào)用一次send,注冊1次。send成功,取消注冊。
    97. 07 總結(jié)

      這里,我們一起來總結(jié)一下這篇文章的重點(diǎn)。

      1、帶你先整體的梳理了 Kafka 對(duì) Java NIO 封裝的組件以及調(diào)用關(guān)系圖。

      2、分別帶你梳理了傳輸層 TransportLayer 的明文網(wǎng)絡(luò)傳輸層的實(shí)現(xiàn)、網(wǎng)絡(luò)讀操作 NetworkReceive、網(wǎng)絡(luò)寫操作 NetworkSend 的實(shí)現(xiàn)、以及 KafkaChannel 是如何進(jìn)一步對(duì)上面組件進(jìn)行封裝提供更加友好的網(wǎng)絡(luò)連接、讀寫操作的。

      3、最后剖析了網(wǎng)絡(luò) I/O 操作過程中的事件注冊和取消時(shí)機(jī)。

      鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場,版權(quán)歸原作者所有,如有侵權(quán)請聯(lián)系管理員(admin#wlmqw.com)刪除。
      用戶投稿
      上一篇 2022年6月23日 06:23
      下一篇 2022年6月23日 06:23

      相關(guān)推薦

      聯(lián)系我們

      聯(lián)系郵箱:admin#wlmqw.com
      工作時(shí)間:周一至周五,10:30-18:30,節(jié)假日休息