rsocket-java小試牛刀
序
本文主要研究一下rsocket-java
RSocket
rsocket-core-0.12.1-sources.jar!/io/rsocket/RSocket.java
public interface RSocket extends Availability, Closeable { /** * Fire and Forget interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} that completes when the passed {@code payload} is successfully *handled, otherwise errors. */ Mono<Void> fireAndForget(Payload payload); /** * Request-Response interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} containing at most a single {@code Payload} representing the *response. */ Mono<Payload> requestResponse(Payload payload); /** * Request-Stream interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} containing the stream of {@code Payload}s representing the response. */ Flux<Payload> requestStream(Payload payload); /** * Request-Channel interaction model of {@code RSocket}. * * @param payloads Stream of request payloads. * @return Stream of response payloads. */ Flux<Payload> requestChannel(Publisher<Payload> payloads); /** * Metadata-Push interaction model of {@code RSocket}. * * @param payload Request payloads. * @return {@code Publisher} that completes when the passed {@code payload} is successfully *handled, otherwise errors. */ Mono<Void> metadataPush(Payload payload); @Override default double availability() { return isDisposed() ? 0.0 : 1.0; } }
-
RSocket介面繼承了Availability(
定義double availability()方法
)及Closeable(定義了Mono<Void> onClose()方法
)介面 - RSocket定義了fireAndForget、requestResponse、requestStream、requestChannel方法分別對應4種Interaction Model
- RSocket的Frame包含metadata及data payload,其中metadata可選,可以用於描述data payload,因而RSocket還定義了metadataPush方法用於push metadata
Interaction Model
fireAndForget
@Test public void testFireAndForget() throws InterruptedException { //NOTE SERVER RSocketFactory.receive() .acceptor( (setupPayload, reactiveSocket) -> Mono.just( new AbstractRSocket() { @Override public Mono<Void> fireAndForget(Payload payload) { System.out.printf("fire-forget: %s%n", payload.getDataUtf8()); return Mono.empty(); } })) .transport(TcpServerTransport.create("localhost", 8080)) .start() .subscribe(); //CLIENT RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", 8080)) .start() .block(); socket .fireAndForget(DefaultPayload.create("Hello")) .block(); socket.dispose(); TimeUnit.SECONDS.sleep(5); }
類似udp,無需ack,比較適合metrics上報、訪問日誌上報等
requestResponse
@Test public void testRequestResponse(){ //NOTE SERVER RSocketFactory.receive() .acceptor( (setupPayload, reactiveSocket) -> Mono.just( new AbstractRSocket() { @Override public Mono<Payload> requestResponse(Payload p) { return Mono.just(p); } })) .transport(TcpServerTransport.create("localhost", 8080)) .start() .subscribe(); //NOTE CLIENT RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", 8080)) .start() .block(); socket .requestResponse(DefaultPayload.create("Hello")) .map(Payload::getDataUtf8) .onErrorReturn("error") .doOnNext(System.out::println) .block(); socket.dispose(); }
類似http,但是優於http,因為它是非同步的,而且是multiplexed
requestStream
@Test public void testRequestStream(){ //SERVER RSocketFactory.receive() .acceptor(new SocketAcceptor() { @Override public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) { return Mono.just( new AbstractRSocket() { @Override public Flux<Payload> requestStream(Payload payload) { return Flux.interval(Duration.ofMillis(100)) .map(aLong -> DefaultPayload.create("Interval: " + aLong)); } }); } }) .transport(TcpServerTransport.create("localhost", 7000)) .start() .subscribe(); //CLIENT RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", 7000)) .start() .block(); socket .requestStream(DefaultPayload.create("Hello")) .map(Payload::getDataUtf8) .doOnNext(System.out::println) .take(10) .then() .doFinally(signalType -> socket.dispose()) .then() .block(); }
類似Request-Response(返回Mono
),只不過返回的是Flux
requestChannel
@Test public void testRequestChannel(){ //SERVER RSocketFactory.receive() .acceptor(new SocketAcceptor(){ @Override public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) { return Mono.just( new AbstractRSocket() { @Override public Flux<Payload> requestChannel(Publisher<Payload> payloads) { return Flux.from(payloads) .map(Payload::getDataUtf8) .map(s -> "Echo: " + s) .map(DefaultPayload::create); } }); } }) .transport(TcpServerTransport.create("localhost", 8080)) .start() .subscribe(); //CLIENT RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", 8080)) .start() .block(); socket .requestChannel( Flux.interval(Duration.ofMillis(1000)).map(i -> DefaultPayload.create("Hello"))) .map(Payload::getDataUtf8) .doOnNext(System.out::println) .take(10) .doFinally(signalType -> socket.dispose()) .then() .block(); }
類似websocket,可以雙向通訊
MetadataPush
@Test public void testMetadataPush() throws InterruptedException { //SERVER RSocketFactory.receive() .acceptor( (setupPayload, reactiveSocket) -> Mono.just( new AbstractRSocket() { @Override public Mono<Void> metadataPush(Payload payload) { System.out.printf("metadataPush: %s%n", payload.getDataUtf8()); return Mono.empty(); } })) .transport(TcpServerTransport.create("localhost", 8080)) .start() .subscribe(); //CLIENT RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", 8080)) .start() .block(); socket .metadataPush(DefaultPayload.create("hello","version=1.0.0+")) .block(); socket.dispose(); TimeUnit.SECONDS.sleep(5); }
- RSocket額外定義了metadataPush,與fireAndForget不同的是metadataPush會等待data pushed成功,然後在接收到對方傳送的complete signal時complete
小結
- RSocket是一種bi-directional、multiplexed、message-based的二進位制協議
- RSocket有四種Interaction Model,分別是Request-Response、Fire-and-Forget、Request-Stream、Channel
- RSocket的Frame包含metadata及data payload,其中metadata可選,可以用於描述data payload,因而RSocket還定義了metadataPush方法用於push metadata