<xmp id="63nn9"><video id="63nn9"></video></xmp>

<xmp id="63nn9"></xmp>

<wbr id="63nn9"><ins id="63nn9"></ins></wbr>

<wbr id="63nn9"></wbr><video id="63nn9"><ins id="63nn9"><table id="63nn9"></table></ins></video>

【RSocket】使用 RSocket(三)——服務端主動調用客戶端方法

1. 編寫客戶端接收請求的邏輯

我們可以在初始化 Rsocket 實例的時候指定客戶端可以被調用的方法,使用 acceptor() 指定可被調用的方法和方法使用的通信模型類型:

  • 通信類型為 RequestResponse 時:
    .acceptor(SocketAcceptor.forRequestResponse(payload -> {}))
    
  • 通信類型為 FireAndForget
    .acceptor(SocketAcceptor.forFireAndForget(payload -> {}))
    
  • 通信類型為 RequestStream
    .acceptor(SocketAcceptor.forRequestStream(payload -> {}))
    
  • 通信類型為 RequestStream
    .acceptor(SocketAcceptor.forRequestChannel(
              payloads ->
                  Flux.from(payloads)...));
    

接下來編寫客戶端方法的處理邏輯,以 RequestResponse 為例

https://github.com/joexu01/rsocket-demo/blob/master/rsocket-client-raw/src/main/java/org/example/CallingTheClientSide.java

public static void main(String[] args) {
    final Logger logger = LoggerFactory.getLogger(RSocketClientRaw.class);

    // 隨機生成 UUID 標識客戶端
    UUID uuid = UUID.randomUUID();
    logger.info("My UUID is {}", uuid);
    // 生成 SETUP 階段(建立連接時) Payload 使用的 route 信息
    ByteBuf setupRouteMetadata = encodeRoute("connect.setup");

    RSocket socket = RSocketConnector.create()
            // 設置 metadata MIME Type,方便服務端根據 MIME 類型確定 metadata 內容
            .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())
            // SETUP 階段的 Payload,data 里面存放 UUID
            .setupPayload(ByteBufPayload.create(
                    ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, uuid.toString()),
                    setupRouteMetadata))

            // 編寫 Request&Response Acceptor
            .acceptor(SocketAcceptor.forRequestResponse(
                    payload -> {
                        String route = decodeRoute(payload.sliceMetadata());
                        logger.info("[Client Acceptor] Received RequestResponse[route={}]", route);

                        String metadataUtf8 = payload.getMetadataUtf8();
                        String dataUtf8 = payload.getDataUtf8();
                        logger.info("[Client Acceptor] This Req&Resp contains data: {}, metadata: {}", dataUtf8, metadataUtf8);

                        payload.release();

                        if ("request.status.callback".equals(route)) {
                            return Mono.just(ByteBufPayload.create("Thanks for handling my task!"));
                        } else if ("request.server.call".equals(route)) {
                            return Mono.just(ByteBufPayload.create("You called my handler actively from server!"));
                        }

                        byte[] respBytes = String
                                .format("Client received your message, but no handler matched. Your meta is %s and data is %s",
                                        metadataUtf8, dataUtf8).getBytes();
                        return Mono.just(DefaultPayload.create(respBytes));
                    }
            ))

            // 設置重連策略
            .reconnect(Retry.backoff(2, Duration.ofMillis(500)))
            .connect(
                    TcpClientTransport.create(
                            TcpClient.create()
                                    .host("127.0.0.1")
                                    .port(8099)))
            .block();

在這里我們設置客戶端能夠接收 RequestResponse 類型的服務端請求,仔細觀察可以看到,服務端發送的請求也是可以攜帶包含路由信息的 metadata 的,在客戶端,我們也可以根據 Payload 中的路由信息將請求分發到不同方法中處理。

為了方便演示,如果服務端調用時指定的路由信息是 request.status.callback,那么服務端就是在完成一個由客戶端發起的,異步執行的任務后調用客戶端的回調函數返回任務執行結果。

如果服務端調用時指定的路由信息是 request.server.call,那么服務端就是在主動調用客戶端以獲取一些狀態信息。

當然,使用上面的代碼設置客戶端可被調用的 RSocket 方法有一個局限性,那就是我們只能設置 RequestResponse FireAndForget RequestStream Channel 這四種通信模式的其中一種。也就是說,用這種方法,服務端無法同時向客戶端發出 RequestResponse FireAndForget RequestStream Channel 請求。本文會在第四部分展示如何讓客戶端支持同時響應這四種通信模式。

2. 場景:客戶端提交一個耗時任務,服務端完成任務后使用回調函數返回結果

image

如果客戶端提交一個耗時任務,服務端可以接受這個任務然后立刻返回響應:“任務提交成功”,然后執行任務。當任務執行完,服務端再使用回調函數將結果返回給客戶端。

我們不妨將執行任務的模塊封裝成一個 Spring Service:

@Service
public class RequestProcessor {

    private static final Logger logger = LoggerFactory.getLogger(RequestProcessor.class);

    public void processRequests(RSocketRequester rSocketRequester, UUID uuid) {
        logger.info("[RequestProcessor.processRequests]I'm handling this!");
        ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent(ByteBufAllocator.DEFAULT, Collections.singletonList("request.status.callback"));

        Mono.just("Your request " + uuid + "  is completed")
                .delayElement(Duration.ofSeconds(ThreadLocalRandom.current().nextInt(10, 15)))
                .flatMap(
                        m -> rSocketRequester.rsocketClient()
                                .requestResponse(
                                        Mono.just(ByteBufPayload.create(
                                                ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT,
                                                        String.format("[TASK %s]This is a task result from server using spring.", uuid)),
                                                routeMetadata
                                        )))
                                .doOnSuccess(p -> logger.info("[RequestProcessor.processRequests]Received from client: {}", p.getDataUtf8()))
                )
                .subscribe();
    }
}

這個 Service 中的方法接收一個 RSocketRequester 和一個 任務的 UUID,當任務完成時,這個方法會生成一個 Payload 存放任務結果,指定 metadata 中的路由信息為 request.status.callback。這樣客戶端在收到這個 RequestResponse 時就能知道這是一個已經提交任務的回調。在這里我們使用 delayElement 模擬處理任務時耗時的操作。

值得注意的是,RSocketRequester 參數的來源,我們在編寫服務端接收任務提交的方法時可以將其作為參數,這是 Spring RSocket 的固定用法,這樣就可以拿到服務端-客戶端連接的 RSocketRequester 實例,然后就可以在 Service 中通過 RSocketRequester 實例調用客戶端的回調函數:

@MessageMapping("handler.task")
public Mono<String> task(String request, RSocketRequester rSocketRequester) {
   logger.info("[handler.request]Client request: {}", request);
    UUID uuid = UUID.randomUUID();
    this.requestProcessor.processRequests(rSocketRequester, uuid);
    return Mono.just(uuid.toString());
}

3. 場景:服務端主動調用客戶端獲取信息

我們在【RSocket】使用 RSocket (一)——建立連接一文中已經在連接建立的時刻將客戶端-服務端連接的 RSocketRequester 實例保存在一個 ConcurrentHashMap 中了。我們可以通過一些機制,比如定時任務,或者使用 REST API 向服務端下命令的方式,讓服務端主動調用已經建立連接的客戶端的 RSocket 方法。

在這個示例里,我們編寫兩個 REST API,一個 API 返回所有已連接到服務端的客戶端信息,包括客戶端 UUID、連接建立的時間等:

@ResponseBody
@GetMapping("/client/list")
public List<ConnectedClientDto> clientsInfo() {
    List<ConnectedClientDto> info = new ArrayList<>();
    RSocketController.clientsManager.clients.forEach((key, value) -> {
        info.add(new ConnectedClientDto(key, value.connectedTime));
    });

    return info;
}

另一個 API 用于觸發服務端向客戶端發送請求:

@GetMapping("/client/call")
public ServerResponse callFromServer(String clientRoute, String clientUUID) {
    RSocketRequester requester = RSocketController.clientsManager.getClientRequester(clientUUID);
    if (requester == null) {
        return new ServerResponse("failed: client rSocket has closed.");
    }
    ByteBuf routeMetadata = TaggingMetadataCodec
            .createTaggingContent(ByteBufAllocator.DEFAULT, Collections.singletonList(clientRoute));

    Mono.just("Server is calling you.")
//                .delayElement(Duration.ofSeconds(ThreadLocalRandom.current().nextInt(5, 10)))
            .flatMap(m -> requester.rsocketClient().requestResponse(
                            Mono.just(
                                    ByteBufPayload.create(
                                            ByteBufUtil.writeUtf8(
                                                    ByteBufAllocator.DEFAULT,
                                                    "This is a message from server using spring-stack."),
                                            routeMetadata)))
                    .doOnSubscribe(subscription -> logger.info("subscribed."))
                    .doOnError(throwable -> logger.error("Error when calling client: {}", throwable.toString()))
                    .doOnSuccess(p -> logger.info("[test.connect.requester]Received from client: {}.", p.getDataUtf8()))
            )
            .subscribe();
    return new ServerResponse(String.format("request from server has sent to the client %s.", clientUUID));
}

我們首先啟動服務端再啟動客戶端,然后測試上述兩個 API:

  • 啟動兩個客戶端和服務端后查看連接信息

    image

  • 向其中一個客戶端發送一個請求

    image

    可以從客戶端的輸出看到客戶端接收到了這次請求

    image

4. 讓客戶端同時接收不同類型的請求

前面我們提到如果使用 .acceptor(SocketAcceptor.for...) 來添加客戶端可以被調用的方法時,只能指定四種通信模式中的一種。

這時候,我們可以實現 io.rsocket.SocketAcceptor 接口,重寫 accept 方法,accept 方法的返回值是 Mono<RSocket> ,我們可以實現 RSocket 接口并重寫其中 fireAndForget requestResponse requestStream requestChannel 四個方法來達到讓客戶端同時接收四種通信模式的目的。

首先實現 RSocket 接口,并重寫其中的方法:

// https://github.com/joexu01/rsocket-demo/blob/master/rsocket-client-raw/src/main/java/org/example/service/ClientService.java
public class ClientService implements RSocket {

    Logger logger = LoggerFactory.getLogger(ClientService.class);

    static String decodeRoute(ByteBuf metadata) {
        final RoutingMetadata routingMetadata = new RoutingMetadata(metadata);
        return routingMetadata.iterator().next();
    }

    @Override
    public Mono<Void> fireAndForget(Payload payload) {
        logger.info("Receiving: " + payload.getDataUtf8());
        return Mono.empty();
    }

    @Override
    public Mono<Payload> requestResponse(Payload payload) {
        logger.info("Receiving: " + payload.getDataUtf8());
        return Mono.just(DefaultPayload.create("Client received your RequestResponse"));
    }

    @Override
    public Flux<Payload> requestStream(Payload payload) {
        return Flux.range(-5, 10)
                .delayElements(Duration.ofMillis(500))
                .map(obj ->
                        ByteBufPayload.create(
                                ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, obj.toString())));
    }

    @Override
    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        return Flux.range(-5, 10)
                .delayElements(Duration.ofMillis(500))
                .map(obj ->
                        ByteBufPayload.create(
                                ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, obj.toString())));
    }
}

這只是一個示例,如果業務需要也可以解析 Payload 中的 metadata 來實現路由。

接下來我們實現 RSocketAcceptor 接口:

// https://github.com/joexu01/rsocket-demo/blob/master/rsocket-client-raw/src/main/java/org/example/SocketAcceptorImpl.java
public class SocketAcceptorImpl implements SocketAcceptor {
    @Override
    public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
        return Mono.just(new ClientService());
    }
}

然后我們在初始化客戶端的時候這樣設定 Acceptor 即可:

RSocket socket = RSocketConnector.create().acceptor(new SocketAcceptorImpl())

下一篇聊聊如何啟用 TLS 或者 wss 來保證連接安全,順便談談部署和 nginx 代理 RSocket over WebSocket。

posted @ 2023-03-18 20:24  joexu01  閱讀(192)  評論(0編輯  收藏  舉報
人碰人摸人爱免费视频播放

<xmp id="63nn9"><video id="63nn9"></video></xmp>

<xmp id="63nn9"></xmp>

<wbr id="63nn9"><ins id="63nn9"></ins></wbr>

<wbr id="63nn9"></wbr><video id="63nn9"><ins id="63nn9"><table id="63nn9"></table></ins></video>