【RSocket】使用 RSocket (一)——建立連接
0. RSocket 簡介
采用二進制點對點數據傳輸,主要應用于分布式架構之中,是一種基于Reactive Stream規范標準實現的新的通信協議。
相關文檔和資料:
在這里我們在客戶端使用 rsocket-java 原生庫,在服務端使用 spring-boot-starter-rsocket。
1. 服務端
1.1 SETUP階段 - 處理客戶端發起的連接請求
新建一個 RSocketController
類來處理 RSocket 相關的請求。
@Controller
public class RSocketController {
private static Logger logger = LoggerFactory.getLogger(RSocketController.class);
// 對到來的連接做一些處理
@ConnectMapping("connect.setup")
public Mono<Void> setup(String data, RSocketRequester rSocketRequester) {
logger.info("[connect.setup]Client connection: {}\n", data);
return Mono.empty();
}
}
RSocket 的 metadata
中可以包含路由(Routing)信息,這和 一般 WEB 框架通過解析 URL 將請求導向不同的處理函數是一樣的。在連接建立時,客戶端會發送一個 SETUP Payload,@ConnectMapping
可以通過解析 SETUP Payload 的 metadata
中的路由信息來使用不同的連接建立階段的處理函數。在這里,只要 SETUP Payload 的 metadata
中的路由信息是 connect.setup
,該函數就會處理建立連接后客戶端發送的 SETUP Payload。
1.2 保存客戶端的 Requester
RSocket 協議支持雙方主動調用對方的函數。如果服務端想要主動向客戶端發送請求,他就可以在連接建立時保存 RSocketRequester
對象以便服務端在需要時向客戶端發起請求。
首先在這里我們假設客戶端建立連接時會將 UUID 放在 SETUP Payload 的 data
中。然后我們聲明一個類來保存 RSocketRequester
,代碼如下:
public class ConnectedClient {
public RSocketRequester requester;
public Date connectedTime;
ConnectedClient(RSocketRequester requester) {
this.requester = requester;
this.connectedTime = new Date();
}
}
然后我們建立一個 Service 來管理客戶端的 RSocketRequester
。在這里使用 ConcurrentHashMap
來存儲 Requester,鍵是客戶端的 UUID,值是 ConnectedClient
對象。
@Service
public class ConnectedClientsManager {
private static Logger logger = LoggerFactory.getLogger(ConnectedClientsManager.class);
public final ConcurrentHashMap<String, ConnectedClient> clients;
public ConnectedClientsManager() {
this.clients = new ConcurrentHashMap<>();
}
public Set<String> getAllClientIdentifier() {
return this.clients.keySet();
}
public RSocketRequester getClientRequester(String clientIdentifier) {
return this.clients.get(clientIdentifier).requester;
}
public void putClientRequester(String clientIdentifier, RSocketRequester requester) {
requester.rsocket()
.onClose()
.doFirst(() -> this.clients.put(clientIdentifier, new ConnectedClient(requester)))
.doFinally(sig -> {
logger.info("Client closed, uuid is {}. signal is {}.", clientIdentifier, sig.toString());
this.clients.remove(clientIdentifier);
}).subscribe();
}
public void removeClientRequester(String clientIdentifier) {
this.clients.remove(clientIdentifier);
}
}
然后我們就可以在 RSocketController
中引入 ConnectedClientsManager
了。
@Controller
public class RSocketController {
private static Logger logger = LoggerFactory.getLogger(RSocketController.class);
public static ConnectedClientsManager clientsManager;
@Autowired
private void initializeClientsManager() {
clientsManager = new ConnectedClientsManager();
}
...
最后我們編寫連接處理函數,將 Requester 保存起來:
@ConnectMapping("connect.setup")
public Mono<Void> setup(String data, RSocketRequester rSocketRequester) {
logger.info("[connect.setup]Client connection: {}\n", data);
clientsManager.putClientRequester(data, rSocketRequester);
return Mono.empty();
}
下面是 spring application 配置 application.yaml
:
spring:
rsocket:
server:
port: 8099
transport: tcp
2. 客戶端
- 第一步:隨機生成標識客戶端身份的 UUID
public class ConnectionSetup {
public static void main(String[] args) {
final Logger logger = LoggerFactory.getLogger(RSocketClientRaw.class);
UUID uuid = UUID.randomUUID();
......
- 第二步:生成 SETUP Payload 使用的 routing 信息
ByteBuf setupRouteMetadata = TaggingMetadataCodec.createTaggingContent(
ByteBufAllocator.DEFAULT,
Collections.singletonList("connect.setup"));
- 第三步:使用
RSocketConnector
建立 RSocket:- 在這里首先需要設置元數據的 MIME 類型,方便服務端根據 MIME 類型確定
metadata
的內容 - 然后生成 SETUP Payload,
data
中存放 UUID 字符串,metadata
中存放路由信息 - 設置重連策略
- 最后指定
ClientTransport
和服務端建立連接 - 使用
block()
在連接建立真正之前阻塞進程
- 在這里首先需要設置元數據的 MIME 類型,方便服務端根據 MIME 類型確定
RSocket socket = RSocketConnector.create()
// 設置 metadata MIME Type,方便服務端根據 MIME 類型確定 metadata 內容
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())
// SETUP 階段的 Payload,data 里面存放 UUID
.setupPayload(ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, uuid.toString()),
setupRouteMetadata))
// 設置重連策略
.reconnect(Retry.backoff(2, Duration.ofMillis(500)))
.connect(
TcpClientTransport.create(
TcpClient.create()
.host("127.0.0.1")
.port(8099)))
.block();
然后可以使用 socket.onClose().block();
保持連接。此時如果我們運行客戶端,然后再關閉客戶端的話,會在服務端看到輸出:
表明客戶端和服務端建立了連接之后又關閉了連接。