Spring Web Reactive | 5. RSocket | 5.2. RSocketRequester

RSocketRequester는 RSocket 요청을 실행하기 위한 능숙한 API를 제공하고, 낮은 레벨의 데이터 버퍼가 아닌 데이터와 메타 데이터 객체를 받아들이고 반환한다. 대칭적으로 사용하여 클라이언트에서 요청을 만들거나 서버에서 요청을 만들 수 있다.

5.2.1. 클라이언트 요청자(Client Requester)

클라이언트에서 RSocketRequester을 받으려면 서버에 연결한다. 여기에는 연결 설정을 포함한 RSocket SETUP 프레임의 전송이 포함된다. RSocketRequester는 SETUP 프레임의 연결 설정을 포함한 io.rsocket.core.RSocketConnector의 준비를 위한 빌더를 제공한다.

이는 디폴트로 연결하는 가장 기본적인 방법이다.

Java

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);

URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);

Kotlin

val requester = RSocketRequester.builder().tcp("localhost", 7000)

URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

위는 즉시 연결되지 않는다. 요청이 이루어지면 공유 연결이 투명하게 확립되어 사용된다.

연결 설정

RSocketRequester.Builder 는 초기 SETUP 프레임을 정의하기 위해 다음을 제공한다.

  • dataMimeType(MimeType) - 연결 상의 데이터의 MIME 타입을 설정한다.
  • metadataMimeType(MimeType) - 연결의 메타 데이터의 MIME 타입을 설정한다.
  • setupData(Object) - SETUP에 포함할 데이터.
  • setupRoute(String, Object...) - SETUP에 포함할 메타 데이터의 루트.
  • setupMetadata(Object, MimeType) - SETUP에 포함하는 다른 메타 데이터.

데이터의 경우는 기본 MIME 타입은 처음 구성된 Decoder에서 파생한다. 메타 데이터의 경우 기본 MIME 유형은 복합 메타 데이터(composite metadata)이며, 요청마다 여러 메타 데이터 값과 MIME 타입의 페어를 허가한다. 일반적으로 모두를 변경할 필요가 없다.

SETUP 프레임의 데이터와 메타 데이터는 옵션이다. 서버 사이드에서 @ConnectMapping 메소드를 사용하여 연결 시작과 SETUP 프레임의 콘텐츠를 처리 할 수 있다. 메타 데이터는 연결 수준의 보안에 사용할 수 있다.

전략(Strategies)

RSocketRequester.BuilderRSocketStrategies를 수락하여 요청자를 구성한다. 이를 사용하여 데이터와 메타 데이터 값 (de) -serialization 용 인코더 및 디코더를 제공해야 한다. 기본적으로 String, byte[], ByteBufferspring-core의 기본 코덱만 등록된다. spring-web를 추가하면 다음과 같이 등록 할 수있는 기타 기능에 액세스 할 수 있다.

Java

RSocketStrategies strategies = RSocketStrategies.builder()
    .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
    .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
    .build();

RSocketRequester requester = RSocketRequester.builder()
    .rsocketStrategies(strategies)
    .tcp("localhost", 7000);

Kotlin

val strategies = RSocketStrategies.builder()
        .encoders { it.add(Jackson2CborEncoder()) }
        .decoders { it.add(Jackson2CborDecoder()) }
        .build()

val requester = RSocketRequester.builder()
        .rsocketStrategies(strategies)
        .tcp("localhost", 7000)

RSocketStrategies는 재사용을 위해 설계되어 있다. 일부 시나리오에서는 예를 들어 동일한 응용 프로그램의 클라이언트와 서버의 경우, Spring 설정으로 선언하는 것이 좋다.

클라이언트 응답자(Client Responders)

RSocketRequester.Builder를 사용하여 서버에서 요청에 대한 응답자를 구성 할 수 있다.

서버에서 사용되는 것과 동일한 인프라를 기반으로 클라이언트의 응답과 어노테이션 처리기를 사용할 수 있지만, 다음과 같이 프로그램에 등록한다.

Java

RSocketStrategies strategies = RSocketStrategies.builder()
    .routeMatcher(new PathPatternRouteMatcher())    // (1)
    .build();

SocketAcceptor responder =
    RSocketMessageHandler.responder(strategies, new ClientHandler());   // (2)

RSocketRequester requester = RSocketRequester.builder()
    .rsocketConnector(connector -> connector.acceptor(responder))   // (3)
    .tcp("localhost", 7000);

Kotlin

val strategies = RSocketStrategies.builder()
        .routeMatcher(PathPatternRouteMatcher())    // (1)
        .build()

val responder =
    RSocketMessageHandler.responder(strategies, new ClientHandler());   // (2)

val requester = RSocketRequester.builder()
        .rsocketConnector { it.acceptor(responder) }   // (3)
        .tcp("localhost", 7000)
  • (1) spring-web 가 존재하는 경우, 효율적인 경로 매칭을 위해 PathPatternRouteMatcher를 사용한다.
  • (2) @MessageMaping 그리고/또는 @ConnectMapping 메소드를 사용하여 클래스에서 응답자을 만든다.
  • (3) 응답자을 등록한다.

상기 클라이언트 응답자의 프로그램에 의한 등록을 위해 설계된 도구에 지나지 않는다는 점에 유의해라. 클라이언트 응답자가 Spring 설정의 대체 시나리오의 경우 RSocketMessageHandler를 Spring Bean으로 선언하고 다음과 같이 적용 할 수 있다.

Java

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

RSocketRequester requester = RSocketRequester.builder()
    .rsocketConnector(connector -> connector.acceptor(handler.responder()))
    .tcp("localhost", 7000);

Kotlin

import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
        .rsocketConnector { it.acceptor(handler.responder()) }
        .tcp("localhost", 7000)

위의 경우 RSocketMessageHandler에서 setHandlerPredicate를 사용하여 클라이언트 응답자를 검출하기 위한 다른 전략으로 전환해야 하는 경우도 있다. @RSocketClientResponder 등의 커스텀 어노테이션과 기본의 @Controller을 기반으로한다. 이것은 클라이언트와 서버 또는 동일한 응용 프로그램에서 여러 클라이언트를 사용하는 시나리오에 필요한다.

프로그래밍 모델에 대한 자세한 내용은 어노테이션이 선언된 응답자를 참조해라.

확장(Advanced)

RSocketRequesterBuilder는 기본이 되는io.rsocket.core.RSocketConnector를 공개하는 콜백을 제공하고, keep-alive 간격(intervals), 세션 재개, 인터셉터 등의 고급 옵션을 제공한다. 다음과 같이 그 레벨에 옵션을 구성 할 수 있다.

Java

RSocketRequester requester = RSocketRequester.builder()
    .rsocketConnector(connector -> {
        // ...
    })
    .tcp("localhost", 7000);

Kotlin

val requester = RSocketRequester.builder()
        .rsocketConnector {
            //...
        }
        .tcp("localhost", 7000)

5.2.2 서버 요청자

서버에서 연결된 클라이언트에 요청을 할 서버에서 연결된 클라이언트 요청자를 얻을 수 있다.

어노테이션이 선언된 응답자@ConnectMapping@MessageMapping 메소드는 RSocketRequester 인수를 지원한다. 이것을 사용하여 연결 요청자에 액세스한다. @ConnectMapping 메서드는 본질적으로 SETUP 프레임 핸들러이며 요청을 시작하기 전에 처리해야 하는 점에 유의해라. 따라서 첫번째 요청은 처리에서 분리해야 한다. 예를 들면 아래와 같다.

Java

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
    requester.route("status").data("5")
        .retrieveFlux(StatusReport.class)
        .subscribe(bar -> {   // (1)
            // ...
        });
    return ...   // (2)
}
  • (1) 처리와 관계없이 요청을 비동기적으로 시작한다.
  • (2) 처리를 실행하고, 완료되면 Mono<Void>를 반환한다.

Kotlin

@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
    GlobalScope.launch {
        requester.route("status").data("5").retrieveFlow<StatusReport>().collect {   // (1)
            // ...
        }
    }
    /// ...   // (2)
}
  • (1) 처리와 관계없이 요청을 비동기적으로 시작한다.
  • (2) 일시 중지 기능으로 처리한다.

5.2.3. Requests

클라이언트 또는 서버의 요청자를 얻으면 다음과 같이 요청을 만들 수 있다.

Java

ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within")   // (1)
        .data(viewBox)   // (2)
        .retrieveFlux(AirportLocation.class);   // (3)

Kotlin

val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within")   // (1)
        .data(viewBox)   // (2)
        .retrieveFlow<AirportLocation>()   // (3)
  • (1) 요청 메시지의 메타 데이터에 포함 경로를 지정한다.
  • (2) 요청 메시지의 데이터를 제공한다.
  • (3) 예상되는 응답을 선언한다.

상호 작용 유형은 입력과 출력의 카디널리티에서 암묵적으로 결정된다. 위의 예는 하나의 값이 전송된 값의 스트림이 수신되기 때문에 Request-Stream 이다. 대부분의 경우, 입력과 출력의 선택이 RSocket 상호 작용의 유형과 응답자가 기대하는 입력과 출력의 종류와 일치하는 한 이에 대하여 생각할 필요가 없다. 잘못된 조합의 유일한 예는 “다대일"이다.

data(Object) 메소드는 FluxMono 포함하는 Reactive Streams PublisherReactiveAdapterRegistry에 등록되어 있는 값의 다른 프로듀서도 받아 들이다. 동일한 유형의 값을 생성하는 Flux 등의 여러 값 Publisher의 경우 오버로드된 data 메서드 중 하나를 사용하여 모든 요소 유형 검사와 Encoder 조회를 회피하는 것을 고려해라.

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

data(Object) 단계는 선택 사항이다. 데이터를 전송하지 요청의 경우는 생략한다.

Java

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
    .retrieveMono(AirportLocation.class);

Kotlin

import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
    .retrieveAndAwait<AirportLocation>()

복합 메타 데이터(composite metadata) (디폴트)을 사용하여 값이 등록된 Encoder으로 지원되는 경우, 추가로 메타 데이터 값을 추가 할 수 있다. 예를 들면 다음과 같다.

Java

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
        .metadata(securityToken, mimeType)
        .data(viewBox)
        .retrieveFlux(AirportLocation.class);

Kotlin

import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
        .metadata(securityToken, mimeType)
        .data(viewBox)
        .retrieveFlow<AirportLocation>()

Fire-and-Forget의 경우 Mono<Void>를 반환하는 send() 메소드를 사용한다. Mono는 메시지가 성공적으로 전송된 것만을 보여주고 처리 된 것을 나타내는 것은 아니다는 것에 유의해라.

Metadata-Push의 경우 Mono<Void>을 반환 값으로 지정하고 sendMetadata() 메소드를 사용한다.




최종 수정 : 2021-04-12