Spring Web Reactive | 5. RSocket | 5.2. RSocketRequester
RSocketRequester
는 RSocket 요청을 실행하기 위한 능숙한 API를 제공하고, 낮은 레벨의 데이터 버퍼가 아닌 데이터와 메타 데이터 객체를 받아들이고 반환한다. 대칭적으로 사용하여 클라이언트에서 요청을 만들거나 서버에서 요청을 만들 수 있다.
5.2.1. 클라이언트 요청자(Client Requester)
클라이언트에서 RSocketRequester
을 받으려면 서버에 연결한다. 여기에는 연결 설정을 포함한 RSocket SETUP
프레임의 전송이 포함된다. RSocketRequester
는 SETUP 프레임의 연결 설정을 포함한 io.rsocket.core.RSocketConnector
의 준비를 위한 빌더를 제공한다.
이는 디폴트로 연결하는 가장 기본적인 방법이다.
Java
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
Kotlin
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
위는 즉시 연결되지 않는다. 요청이 이루어지면 공유 연결이 투명하게 확립되어 사용된다.
연결 설정
RSocketRequester.Builder
는 초기 SETUP
프레임을 정의하기 위해 다음을 제공한다.
dataMimeType(MimeType)
- 연결 상의 데이터의 MIME 타입을 설정한다.metadataMimeType(MimeType)
- 연결의 메타 데이터의 MIME 타입을 설정한다.setupData(Object)
-SETUP
에 포함할 데이터.setupRoute(String, Object...)
-SETUP
에 포함할 메타 데이터의 루트.setupMetadata(Object, MimeType)
-SETUP
에 포함하는 다른 메타 데이터.
데이터의 경우는 기본 MIME 타입은 처음 구성된 Decoder에서 파생한다. 메타 데이터의 경우 기본 MIME 유형은 복합 메타 데이터(composite metadata)이며, 요청마다 여러 메타 데이터 값과 MIME 타입의 페어를 허가한다. 일반적으로 모두를 변경할 필요가 없다.
SETUP 프레임의 데이터와 메타 데이터는 옵션이다. 서버 사이드에서 @ConnectMapping
메소드를 사용하여 연결 시작과 SETUP
프레임의 콘텐츠를 처리 할 수 있다. 메타 데이터는 연결 수준의 보안에 사용할 수 있다.
전략(Strategies)
RSocketRequester.Builder
는 RSocketStrategies
를 수락하여 요청자를 구성한다. 이를 사용하여 데이터와 메타 데이터 값 (de) -serialization 용 인코더 및 디코더를 제공해야 한다. 기본적으로 String
, byte[]
, ByteBuffer
용 spring-core
의 기본 코덱만 등록된다. spring-web
를 추가하면 다음과 같이 등록 할 수있는 기타 기능에 액세스 할 수 있다.
Java
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
Kotlin
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies
는 재사용을 위해 설계되어 있다. 일부 시나리오에서는 예를 들어 동일한 응용 프로그램의 클라이언트와 서버의 경우, Spring 설정으로 선언하는 것이 좋다.
클라이언트 응답자(Client Responders)
RSocketRequester.Builder
를 사용하여 서버에서 요청에 대한 응답자를 구성 할 수 있다.
서버에서 사용되는 것과 동일한 인프라를 기반으로 클라이언트의 응답과 어노테이션 처리기를 사용할 수 있지만, 다음과 같이 프로그램에 등록한다.
Java
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) // (1)
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); // (2)
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) // (3)
.tcp("localhost", 7000);
Kotlin
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) // (1)
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); // (2)
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } // (3)
.tcp("localhost", 7000)
- (1)
spring-web
가 존재하는 경우, 효율적인 경로 매칭을 위해PathPatternRouteMatcher
를 사용한다. - (2)
@MessageMaping
그리고/또는@ConnectMapping
메소드를 사용하여 클래스에서 응답자을 만든다. - (3) 응답자을 등록한다.
상기 클라이언트 응답자의 프로그램에 의한 등록을 위해 설계된 도구에 지나지 않는다는 점에 유의해라. 클라이언트 응답자가 Spring 설정의 대체 시나리오의 경우 RSocketMessageHandler
를 Spring Bean으로 선언하고 다음과 같이 적용 할 수 있다.
Java
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
Kotlin
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
위의 경우 RSocketMessageHandler
에서 setHandlerPredicate
를 사용하여 클라이언트 응답자를 검출하기 위한 다른 전략으로 전환해야 하는 경우도 있다. @RSocketClientResponder
등의 커스텀 어노테이션과 기본의 @Controller
을 기반으로한다. 이것은 클라이언트와 서버 또는 동일한 응용 프로그램에서 여러 클라이언트를 사용하는 시나리오에 필요한다.
프로그래밍 모델에 대한 자세한 내용은 어노테이션이 선언된 응답자를 참조해라.
확장(Advanced)
RSocketRequesterBuilder
는 기본이 되는io.rsocket.core.RSocketConnector
를 공개하는 콜백을 제공하고, keep-alive 간격(intervals), 세션 재개, 인터셉터 등의 고급 옵션을 제공한다. 다음과 같이 그 레벨에 옵션을 구성 할 수 있다.
Java
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
Kotlin
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
5.2.2 서버 요청자
서버에서 연결된 클라이언트에 요청을 할 서버에서 연결된 클라이언트 요청자를 얻을 수 있다.
어노테이션이 선언된 응답자는 @ConnectMapping
와 @MessageMapping
메소드는 RSocketRequester
인수를 지원한다. 이것을 사용하여 연결 요청자에 액세스한다. @ConnectMapping
메서드는 본질적으로 SETUP
프레임 핸들러이며 요청을 시작하기 전에 처리해야 하는 점에 유의해라. 따라서 첫번째 요청은 처리에서 분리해야 한다. 예를 들면 아래와 같다.
Java
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { // (1)
// ...
});
return ... // (2)
}
- (1) 처리와 관계없이 요청을 비동기적으로 시작한다.
- (2) 처리를 실행하고, 완료되면
Mono<Void>
를 반환한다.
Kotlin
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { // (1)
// ...
}
}
/// ... // (2)
}
- (1) 처리와 관계없이 요청을 비동기적으로 시작한다.
- (2) 일시 중지 기능으로 처리한다.
5.2.3. Requests
클라이언트 또는 서버의 요청자를 얻으면 다음과 같이 요청을 만들 수 있다.
Java
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") // (1)
.data(viewBox) // (2)
.retrieveFlux(AirportLocation.class); // (3)
Kotlin
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") // (1)
.data(viewBox) // (2)
.retrieveFlow<AirportLocation>() // (3)
- (1) 요청 메시지의 메타 데이터에 포함 경로를 지정한다.
- (2) 요청 메시지의 데이터를 제공한다.
- (3) 예상되는 응답을 선언한다.
상호 작용 유형은 입력과 출력의 카디널리티에서 암묵적으로 결정된다. 위의 예는 하나의 값이 전송된 값의 스트림이 수신되기 때문에 Request-Stream
이다. 대부분의 경우, 입력과 출력의 선택이 RSocket 상호 작용의 유형과 응답자가 기대하는 입력과 출력의 종류와 일치하는 한 이에 대하여 생각할 필요가 없다. 잘못된 조합의 유일한 예는 “다대일"이다.
data(Object)
메소드는 Flux
와 Mono
포함하는 Reactive Streams Publisher
와 ReactiveAdapterRegistry
에 등록되어 있는 값의 다른 프로듀서도 받아 들이다. 동일한 유형의 값을 생성하는 Flux
등의 여러 값 Publisher
의 경우 오버로드된 data 메서드 중 하나를 사용하여 모든 요소 유형 검사와 Encoder
조회를 회피하는 것을 고려해라.
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
data(Object)
단계는 선택 사항이다. 데이터를 전송하지 요청의 경우는 생략한다.
Java
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
Kotlin
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
복합 메타 데이터(composite metadata) (디폴트)을 사용하여 값이 등록된 Encoder
으로 지원되는 경우, 추가로 메타 데이터 값을 추가 할 수 있다. 예를 들면 다음과 같다.
Java
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
Kotlin
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
Fire-and-Forget
의 경우 Mono<Void>
를 반환하는 send()
메소드를 사용한다. Mono
는 메시지가 성공적으로 전송된 것만을 보여주고 처리 된 것을 나타내는 것은 아니다는 것에 유의해라.
Metadata-Push
의 경우 Mono<Void>
을 반환 값으로 지정하고 sendMetadata()
메소드를 사용한다.