【RSocket】使用 RSocket(三)——服務端主動調用客戶端方法
1. 編寫客戶端接收請求的邏輯
我們可以在初始化 Rsocket 實例的時候指定客戶端可以被調用的方法,使用 acceptor()
指定可被調用的方法和方法使用的通信模型類型:
- 通信類型為
RequestResponse
時:.acceptor(SocketAcceptor.forRequestResponse(payload -> {}))
- 通信類型為
FireAndForget
時.acceptor(SocketAcceptor.forFireAndForget(payload -> {}))
- 通信類型為
RequestStream
時.acceptor(SocketAcceptor.forRequestStream(payload -> {}))
- 通信類型為
RequestStream
時.acceptor(SocketAcceptor.forRequestChannel( payloads -> Flux.from(payloads)...));
接下來編寫客戶端方法的處理邏輯,以 RequestResponse
為例
public static void main(String[] args) {
final Logger logger = LoggerFactory.getLogger(RSocketClientRaw.class);
// 隨機生成 UUID 標識客戶端
UUID uuid = UUID.randomUUID();
logger.info("My UUID is {}", uuid);
// 生成 SETUP 階段(建立連接時) Payload 使用的 route 信息
ByteBuf setupRouteMetadata = encodeRoute("connect.setup");
RSocket socket = RSocketConnector.create()
// 設置 metadata MIME Type,方便服務端根據 MIME 類型確定 metadata 內容
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())
// SETUP 階段的 Payload,data 里面存放 UUID
.setupPayload(ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, uuid.toString()),
setupRouteMetadata))
// 編寫 Request&Response Acceptor
.acceptor(SocketAcceptor.forRequestResponse(
payload -> {
String route = decodeRoute(payload.sliceMetadata());
logger.info("[Client Acceptor] Received RequestResponse[route={}]", route);
String metadataUtf8 = payload.getMetadataUtf8();
String dataUtf8 = payload.getDataUtf8();
logger.info("[Client Acceptor] This Req&Resp contains data: {}, metadata: {}", dataUtf8, metadataUtf8);
payload.release();
if ("request.status.callback".equals(route)) {
return Mono.just(ByteBufPayload.create("Thanks for handling my task!"));
} else if ("request.server.call".equals(route)) {
return Mono.just(ByteBufPayload.create("You called my handler actively from server!"));
}
byte[] respBytes = String
.format("Client received your message, but no handler matched. Your meta is %s and data is %s",
metadataUtf8, dataUtf8).getBytes();
return Mono.just(DefaultPayload.create(respBytes));
}
))
// 設置重連策略
.reconnect(Retry.backoff(2, Duration.ofMillis(500)))
.connect(
TcpClientTransport.create(
TcpClient.create()
.host("127.0.0.1")
.port(8099)))
.block();
在這里我們設置客戶端能夠接收 RequestResponse
類型的服務端請求,仔細觀察可以看到,服務端發送的請求也是可以攜帶包含路由信息的 metadata
的,在客戶端,我們也可以根據 Payload 中的路由信息將請求分發到不同方法中處理。
為了方便演示,如果服務端調用時指定的路由信息是 request.status.callback
,那么服務端就是在完成一個由客戶端發起的,異步執行的任務后調用客戶端的回調函數返回任務執行結果。
如果服務端調用時指定的路由信息是 request.server.call
,那么服務端就是在主動調用客戶端以獲取一些狀態信息。
當然,使用上面的代碼設置客戶端可被調用的 RSocket 方法有一個局限性,那就是我們只能設置 RequestResponse
FireAndForget
RequestStream
Channel
這四種通信模式的其中一種。也就是說,用這種方法,服務端無法同時向客戶端發出 RequestResponse
FireAndForget
RequestStream
Channel
請求。本文會在第四部分展示如何讓客戶端支持同時響應這四種通信模式。
2. 場景:客戶端提交一個耗時任務,服務端完成任務后使用回調函數返回結果
如果客戶端提交一個耗時任務,服務端可以接受這個任務然后立刻返回響應:“任務提交成功”,然后執行任務。當任務執行完,服務端再使用回調函數將結果返回給客戶端。
我們不妨將執行任務的模塊封裝成一個 Spring Service:
@Service
public class RequestProcessor {
private static final Logger logger = LoggerFactory.getLogger(RequestProcessor.class);
public void processRequests(RSocketRequester rSocketRequester, UUID uuid) {
logger.info("[RequestProcessor.processRequests]I'm handling this!");
ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent(ByteBufAllocator.DEFAULT, Collections.singletonList("request.status.callback"));
Mono.just("Your request " + uuid + " is completed")
.delayElement(Duration.ofSeconds(ThreadLocalRandom.current().nextInt(10, 15)))
.flatMap(
m -> rSocketRequester.rsocketClient()
.requestResponse(
Mono.just(ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT,
String.format("[TASK %s]This is a task result from server using spring.", uuid)),
routeMetadata
)))
.doOnSuccess(p -> logger.info("[RequestProcessor.processRequests]Received from client: {}", p.getDataUtf8()))
)
.subscribe();
}
}
這個 Service 中的方法接收一個 RSocketRequester
和一個 任務的 UUID,當任務完成時,這個方法會生成一個 Payload 存放任務結果,指定 metadata 中的路由信息為 request.status.callback
。這樣客戶端在收到這個 RequestResponse 時就能知道這是一個已經提交任務的回調。在這里我們使用 delayElement
模擬處理任務時耗時的操作。
值得注意的是,RSocketRequester
參數的來源,我們在編寫服務端接收任務提交的方法時可以將其作為參數,這是 Spring RSocket 的固定用法,這樣就可以拿到服務端-客戶端連接的 RSocketRequester 實例,然后就可以在 Service 中通過 RSocketRequester 實例調用客戶端的回調函數:
@MessageMapping("handler.task")
public Mono<String> task(String request, RSocketRequester rSocketRequester) {
logger.info("[handler.request]Client request: {}", request);
UUID uuid = UUID.randomUUID();
this.requestProcessor.processRequests(rSocketRequester, uuid);
return Mono.just(uuid.toString());
}
3. 場景:服務端主動調用客戶端獲取信息
我們在【RSocket】使用 RSocket (一)——建立連接一文中已經在連接建立的時刻將客戶端-服務端連接的 RSocketRequester
實例保存在一個 ConcurrentHashMap
中了。我們可以通過一些機制,比如定時任務,或者使用 REST API 向服務端下命令的方式,讓服務端主動調用已經建立連接的客戶端的 RSocket 方法。
在這個示例里,我們編寫兩個 REST API,一個 API 返回所有已連接到服務端的客戶端信息,包括客戶端 UUID、連接建立的時間等:
@ResponseBody
@GetMapping("/client/list")
public List<ConnectedClientDto> clientsInfo() {
List<ConnectedClientDto> info = new ArrayList<>();
RSocketController.clientsManager.clients.forEach((key, value) -> {
info.add(new ConnectedClientDto(key, value.connectedTime));
});
return info;
}
另一個 API 用于觸發服務端向客戶端發送請求:
@GetMapping("/client/call")
public ServerResponse callFromServer(String clientRoute, String clientUUID) {
RSocketRequester requester = RSocketController.clientsManager.getClientRequester(clientUUID);
if (requester == null) {
return new ServerResponse("failed: client rSocket has closed.");
}
ByteBuf routeMetadata = TaggingMetadataCodec
.createTaggingContent(ByteBufAllocator.DEFAULT, Collections.singletonList(clientRoute));
Mono.just("Server is calling you.")
// .delayElement(Duration.ofSeconds(ThreadLocalRandom.current().nextInt(5, 10)))
.flatMap(m -> requester.rsocketClient().requestResponse(
Mono.just(
ByteBufPayload.create(
ByteBufUtil.writeUtf8(
ByteBufAllocator.DEFAULT,
"This is a message from server using spring-stack."),
routeMetadata)))
.doOnSubscribe(subscription -> logger.info("subscribed."))
.doOnError(throwable -> logger.error("Error when calling client: {}", throwable.toString()))
.doOnSuccess(p -> logger.info("[test.connect.requester]Received from client: {}.", p.getDataUtf8()))
)
.subscribe();
return new ServerResponse(String.format("request from server has sent to the client %s.", clientUUID));
}
我們首先啟動服務端再啟動客戶端,然后測試上述兩個 API:
-
啟動兩個客戶端和服務端后查看連接信息
-
向其中一個客戶端發送一個請求
可以從客戶端的輸出看到客戶端接收到了這次請求
4. 讓客戶端同時接收不同類型的請求
前面我們提到如果使用 .acceptor(SocketAcceptor.for...)
來添加客戶端可以被調用的方法時,只能指定四種通信模式中的一種。
這時候,我們可以實現 io.rsocket.SocketAcceptor
接口,重寫 accept
方法,accept
方法的返回值是 Mono<RSocket>
,我們可以實現 RSocket
接口并重寫其中 fireAndForget
requestResponse
requestStream
requestChannel
四個方法來達到讓客戶端同時接收四種通信模式的目的。
首先實現 RSocket
接口,并重寫其中的方法:
// https://github.com/joexu01/rsocket-demo/blob/master/rsocket-client-raw/src/main/java/org/example/service/ClientService.java
public class ClientService implements RSocket {
Logger logger = LoggerFactory.getLogger(ClientService.class);
static String decodeRoute(ByteBuf metadata) {
final RoutingMetadata routingMetadata = new RoutingMetadata(metadata);
return routingMetadata.iterator().next();
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
logger.info("Receiving: " + payload.getDataUtf8());
return Mono.empty();
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
logger.info("Receiving: " + payload.getDataUtf8());
return Mono.just(DefaultPayload.create("Client received your RequestResponse"));
}
@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.range(-5, 10)
.delayElements(Duration.ofMillis(500))
.map(obj ->
ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, obj.toString())));
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.range(-5, 10)
.delayElements(Duration.ofMillis(500))
.map(obj ->
ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, obj.toString())));
}
}
這只是一個示例,如果業務需要也可以解析 Payload 中的 metadata 來實現路由。
接下來我們實現 RSocketAcceptor
接口:
// https://github.com/joexu01/rsocket-demo/blob/master/rsocket-client-raw/src/main/java/org/example/SocketAcceptorImpl.java
public class SocketAcceptorImpl implements SocketAcceptor {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
return Mono.just(new ClientService());
}
}
然后我們在初始化客戶端的時候這樣設定 Acceptor 即可:
RSocket socket = RSocketConnector.create().acceptor(new SocketAcceptorImpl())
下一篇聊聊如何啟用 TLS 或者 wss 來保證連接安全,順便談談部署和 nginx 代理 RSocket over WebSocket。