<xmp id="63nn9"><video id="63nn9"></video></xmp>

<xmp id="63nn9"></xmp>

<wbr id="63nn9"><ins id="63nn9"></ins></wbr>

<wbr id="63nn9"></wbr><video id="63nn9"><ins id="63nn9"><table id="63nn9"></table></ins></video>

【RSocket】使用 RSocket (一)——建立連接

0. RSocket 簡介

采用二進制點對點數據傳輸,主要應用于分布式架構之中,是一種基于Reactive Stream規范標準實現的新的通信協議。

參考阿里云開發者社區的介紹

相關文檔和資料:

RSocket By Example

rsocket-java 原生庫例子

Spring RSocket 支持文檔

在這里我們在客戶端使用 rsocket-java 原生庫,在服務端使用 spring-boot-starter-rsocket。

1. 服務端

1.1 SETUP階段 - 處理客戶端發起的連接請求

點擊查看源代碼

新建一個 RSocketController 類來處理 RSocket 相關的請求。

@Controller
public class RSocketController {

    private static Logger logger = LoggerFactory.getLogger(RSocketController.class);

    // 對到來的連接做一些處理
    @ConnectMapping("connect.setup")
    public Mono<Void> setup(String data, RSocketRequester rSocketRequester) {
        logger.info("[connect.setup]Client connection: {}\n", data);
        return Mono.empty();
    }
}

RSocket 的 metadata 中可以包含路由(Routing)信息,這和 一般 WEB 框架通過解析 URL 將請求導向不同的處理函數是一樣的。在連接建立時,客戶端會發送一個 SETUP Payload,@ConnectMapping 可以通過解析 SETUP Payload 的 metadata 中的路由信息來使用不同的連接建立階段的處理函數。在這里,只要 SETUP Payload 的 metadata 中的路由信息是 connect.setup ,該函數就會處理建立連接后客戶端發送的 SETUP Payload。

1.2 保存客戶端的 Requester

RSocket 協議支持雙方主動調用對方的函數。如果服務端想要主動向客戶端發送請求,他就可以在連接建立時保存 RSocketRequester 對象以便服務端在需要時向客戶端發起請求。

首先在這里我們假設客戶端建立連接時會將 UUID 放在 SETUP Payload 的 data 中。然后我們聲明一個類來保存 RSocketRequester,代碼如下:

public class ConnectedClient {
    public RSocketRequester requester;
    public Date connectedTime;

    ConnectedClient(RSocketRequester requester) {
        this.requester = requester;
        this.connectedTime = new Date();
    }
}

然后我們建立一個 Service 來管理客戶端的 RSocketRequester。在這里使用 ConcurrentHashMap 來存儲 Requester,鍵是客戶端的 UUID,值是 ConnectedClient 對象。

@Service
public class ConnectedClientsManager {
    private static Logger logger = LoggerFactory.getLogger(ConnectedClientsManager.class);
    public final ConcurrentHashMap<String, ConnectedClient> clients;

    public ConnectedClientsManager() {
        this.clients = new ConcurrentHashMap<>();
    }

    public Set<String> getAllClientIdentifier() {
        return this.clients.keySet();
    }

    public RSocketRequester getClientRequester(String clientIdentifier) {
        return this.clients.get(clientIdentifier).requester;
    }

    public void putClientRequester(String clientIdentifier, RSocketRequester requester) {
        requester.rsocket()
                .onClose()
                .doFirst(() -> this.clients.put(clientIdentifier, new ConnectedClient(requester)))
                .doFinally(sig -> {
                    logger.info("Client closed, uuid is {}. signal is {}.", clientIdentifier, sig.toString());
                    this.clients.remove(clientIdentifier);
                }).subscribe();
    }

    public void removeClientRequester(String clientIdentifier) {
        this.clients.remove(clientIdentifier);
    }
}

然后我們就可以在 RSocketController 中引入 ConnectedClientsManager 了。

@Controller
public class RSocketController {

    private static Logger logger = LoggerFactory.getLogger(RSocketController.class);

    public static ConnectedClientsManager clientsManager;

    @Autowired
    private void initializeClientsManager() {
        clientsManager = new ConnectedClientsManager();
    }
...

最后我們編寫連接處理函數,將 Requester 保存起來:

@ConnectMapping("connect.setup")
    public Mono<Void> setup(String data, RSocketRequester rSocketRequester) {
        logger.info("[connect.setup]Client connection: {}\n", data);
        clientsManager.putClientRequester(data, rSocketRequester);
        return Mono.empty();
    }

下面是 spring application 配置 application.yaml

spring:
  rsocket:
    server:
      port: 8099
      transport: tcp

2. 客戶端

點擊查看源代碼

  • 第一步:隨機生成標識客戶端身份的 UUID
public class ConnectionSetup {

    public static void main(String[] args) {
        final Logger logger = LoggerFactory.getLogger(RSocketClientRaw.class);
        UUID uuid = UUID.randomUUID();
......
  • 第二步:生成 SETUP Payload 使用的 routing 信息
ByteBuf setupRouteMetadata = TaggingMetadataCodec.createTaggingContent(
                ByteBufAllocator.DEFAULT,
                Collections.singletonList("connect.setup"));
  • 第三步:使用 RSocketConnector 建立 RSocket:
    • 在這里首先需要設置元數據的 MIME 類型,方便服務端根據 MIME 類型確定 metadata 的內容
    • 然后生成 SETUP Payload,data 中存放 UUID 字符串,metadata 中存放路由信息
    • 設置重連策略
    • 最后指定 ClientTransport 和服務端建立連接
    • 使用 block() 在連接建立真正之前阻塞進程
RSocket socket = RSocketConnector.create()
                // 設置 metadata MIME Type,方便服務端根據 MIME 類型確定 metadata 內容
                .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())
                // SETUP 階段的 Payload,data 里面存放 UUID
                .setupPayload(ByteBufPayload.create(
                        ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, uuid.toString()),
                        setupRouteMetadata))
                // 設置重連策略
                .reconnect(Retry.backoff(2, Duration.ofMillis(500)))
                .connect(
                        TcpClientTransport.create(
                                TcpClient.create()
                                        .host("127.0.0.1")
                                        .port(8099)))
                .block();

然后可以使用 socket.onClose().block(); 保持連接。此時如果我們運行客戶端,然后再關閉客戶端的話,會在服務端看到輸出:

image

表明客戶端和服務端建立了連接之后又關閉了連接。

posted @ 2023-03-05 23:54  joexu01  閱讀(364)  評論(0編輯  收藏  舉報
人碰人摸人爱免费视频播放

<xmp id="63nn9"><video id="63nn9"></video></xmp>

<xmp id="63nn9"></xmp>

<wbr id="63nn9"><ins id="63nn9"></ins></wbr>

<wbr id="63nn9"></wbr><video id="63nn9"><ins id="63nn9"><table id="63nn9"></table></ins></video>