Spring Web Reactive | 3. WebSocket | 3.2. WebSocket API

서블릿 스택과 같은

Spring Framework는 WebSocket 메시지를 처리하는 클라이언트 측 및 서버 사이드 응용 프로그램을 만드는데 사용할 수 있는 WebSocket API를 제공한다.

3.2.1. 서버

서블릿 스택과 같은

WebSocket 서버를 만들려면, 먼저 WebSocketHandler를 만들 수 있다. 다음의 예는 그 방법을 보여준다.

Java

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // ...
    }
}

Kotlin

import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession

class MyWebSocketHandler : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {
        // ...
    }
}

다음은 그것을 URL로 매핑 할 수 있다.

Java

@Configuration
class WebConfig {

    @Bean
    public HandlerMapping handlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/path", new MyWebSocketHandler());
        int order = -1; // before annotated controllers

        return new SimpleUrlHandlerMapping(map, order);
    }
}

Kotlin

@Configuration
class WebConfig {

    @Bean
    fun handlerMapping(): HandlerMapping {
        val map = mapOf("/path" to MyWebSocketHandler())
        val order = -1 // before annotated controllers

        return SimpleUrlHandlerMapping(map, order)
    }
}

WebFlux 구성을 사용하는 경우, 그 이상으로 아무것도 할 필요가 없다. 그렇지 않은 경우는 WebFlux 구성을 사용하지 않는 경우는 다음과 같이 WebSocketHandlerAdapter 선언해야 한다.

Java

@Configuration
class WebConfig {

    // ...

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

Kotlin

@Configuration
class WebConfig {

    // ...

    @Bean
    fun handlerAdapter() =  WebSocketHandlerAdapter()
}

3.2.2. WebSocketHandler

WebSocketHandlerhandle 메소드는 WebSocketSession을 받아 Mono<Void>를 반환하고 세션의 응용 프로그램 처리가 완료되었음을 나타낸다. 세션은 받은 메시지용과 보내는 메시지용의 2개의 스트림을 통해 처리된다. 다음 표에서는 스트림을 처리하는 두 가지 방법을 보여준다.

WebSocketSession 멧소드 설명
Flux<WebSocketMessage> receive() 인바운드 메시지 스트림에 대한 액세스를 제공하고 연결이 닫힐 때 완료된다.
Mono<Void> send(Publisher<WebSocketMessage>) 아웃바운드 메시지의 소스를 검색하고 메시지를 작성하고 소스가 완료되어 쓰기가 완료되면 완료되는 Mono<Void> 를 반환한다.

WebSocketHandler는 수신 스트림 전송 스트림을 통합 흐름에 구성하고, 그 흐름의 보완을 반영하기 Mono<Void>를 반환해야 한다. 응용 프로그램 요구 사항에 따라 통합 흐름은 다음의 경우에 완료된다.

  • 수신 또는 송신 메시지 스트림이 완료되었을 때.
  • 전송 스트림은 무한하고, 아웃바운드 스트림이 완료되었을 때(즉, 연결이 닫혔을 때).
  • WebSocketSessionclose 메소드를 호출했을 때.

인바운드, 아웃바운드 메시지 스트림이 함께 구성된 경우, Reactive Streams는 종료 활동을 알리기 위해 연결이 열려 있는지 여부를 확인할 필요는 없다. 인바운드 스트림은 완료 또는 오류 신호를 수신하고, 아웃바운드 스트림은 취소 신호를 수신한다.

핸들러의 가장 기본적인 구현은 인바운드 스트림을 처리하는 것이다. 다음의 예는 이러한 구현을 보여준다.

Java

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.receive()              // (1)
                .doOnNext(message -> {
                    // ...                    // (2)
                })
                .concatMap(message -> {
                    // ...                    // (3)
                })
                .then();                      // (4)
    }
}

Kotlin

class ExampleHandler : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {
        return session.receive()              // (1)
                .doOnNext {
                    // ...                    // (2)
                }
                .concatMap {
                    // ...                    // (3)
                }
                .then()                       // (4)
    }
}
  • (1) 인바운드 메시지 스트림에 액세스한다.
  • (2) 각 메시지에 뭔가를 한다.
  • (3) 메시지 내용을 사용하는 중첩된 비동기 작업을 수행한다.
  • (4) 수신이 종료되면 완료되는 Mono<Void> 를 반환한다.

중첩된 비동기 연산의 경우, 풀링된 데이터 버퍼를 사용하는 기본 서버(예를 들어, Netty)에서 message.retain() 호출해야 한다. 그렇지 않으면 데이터를 읽기 전에 데이터 버퍼가 해제될 수 있다. 배경에 대해서는 데이터 버퍼 및 코덱을 참조해라.

다음의 구현은 수신 스트림와 송신 스트림을 결합한다.

Java

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {

        Flux<WebSocketMessage> output = session.receive()                 // (1)
                .doOnNext(message -> {
                    // ...
                })
                .concatMap(message -> {
                    // ...
                })
                .map(value -> session.textMessage("Echo " + value));      // (2)

        return session.send(output);                                      // (3)
    }
}

Kotlin

class ExampleHandler : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {

        val output = session.receive()                       // (1)
                .doOnNext {
                    // ...
                }
                .concatMap {
                    // ...
                }
                .map { session.textMessage("Echo $it") }     // (2)

        return session.send(output)                          // (3)
    }
}
  • (1) 받은 메시지 스트림을 처리한다.
  • (2) 보내는 메시지를 작성하고, 결합된 흐름을 만든다.
  • (3) 메세지를 받는 동안은 완료하지 않는 Mono<Void>를 반환한다.

다음의 예와 같이, 수신 스트림와 전송 스트림은 독립적으로 처리하고 완료돼었을 때만 결합 할 수 있다.

Java

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {

        Mono<Void> input = session.receive()                                 // (1)
                .doOnNext(message -> {
                    // ...
                })
                .concatMap(message -> {
                    // ...
                })
                .then();

        Flux<String> source = ... ;
        Mono<Void> output = session.send(source.map(session::textMessage));  // (2)

        return Mono.zip(input, output).then();                               // (3)
    }
}

Kotlin

class ExampleHandler : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()                                     // (1)
                .doOnNext {
                    // ...
                }
                .concatMap {
                    // ...
                }
                .then()

        val source: Flux<String> = ...
        val output = session.send(source.map(session::textMessage))       // (2)

        return Mono.zip(input, output).then()                            //  (3)
    }
}
  • (1) 수신 메시지 스트림을 처리한다.
  • (2) 보내는 메시지를 보냅니다.
  • (3) 스트림을 합쳐서 어느 하나의 스트림이 종료하면 완료되는 Mono<Void> 를 반환한다.

3.2.3. DataBuffer

DataBuffer는 WebFlux 바이트 버퍼 표현이다. Spring 코어 부분은 데이터 버퍼 및 코덱 섹션에서 자세히 설명하고 있다. 이해해야 할 중요한 점은 Netty와 같은 일부 서버에서는 바이트 버퍼가 풀링 된 참조 계산 메모리 누수를 피하기 위해 소비가 된 이후에는 반드시 릴리즈해야 한다.

Netty에서 실행하는 경우 응용 프로그램이 해제되지 않도록 입력 데이터 버퍼를 유지하는 경우는 DataBufferUtils.retain(dataBuffer)을 사용하고 버퍼가 소비 될 때에 DataBufferUtils.release(dataBuffer)을 사용해야 한다.

3.2.4. 핸드셰이크(Handshake)

서블릿 스택과 같은

WebSocketHandlerAdapterWebSocketService에 위임한다. 기본적으로 이는 HandshakeWebSocketService의 인스턴스이며, WebSocket 요청에 대해 기본적인 검사를 수행하고 사용중인 서버에 RequestUpgradeStrategy을 사용한다. 현재 Reactor Netty, Tomcat, Jetty, Undertow의 지원이 포함되어 있다.

HandshakeWebSocketServicePredicate<String>를 설정하고 WebSession 에서 속성을 추출하여 WebSocketSession의 속성에 삽입할 수 있는 sessionAttributePredicate 속성을 제공한다.

3.2.5. 서버 구성

서블릿 스택과 같은

각 서버의 RequestUpgradeStrategy는 기본이 되는 WebSocket 서버 엔진에 특정 설정을 제공한다. WebFlux Java 구성을 사용하는 경우는 WebFlux 구성의 해당 섹션에 나와있는 같은 속성을 지정할 수 있다. 그렇지 않은 경우는 WebFlux 구성을 사용하지 않으려면 다음과 같이 사용한다.

Java

@Configuration
class WebConfig {

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter(webSocketService());
    }

    @Bean
    public WebSocketService webSocketService() {
        TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
        strategy.setMaxSessionIdleTimeout(0L);
        return new HandshakeWebSocketService(strategy);
    }
}

Kotlin

@Configuration
class WebConfig {

    @Bean
    fun handlerAdapter() =
            WebSocketHandlerAdapter(webSocketService())

    @Bean
    fun webSocketService(): WebSocketService {
        val strategy = TomcatRequestUpgradeStrategy().apply {
            setMaxSessionIdleTimeout(0L)
        }
        return HandshakeWebSocketService(strategy)
    }
}

서버 업그레이드 전략을 확인하여 사용 가능한 옵션을 확인한다. 현재 Tomcat과 Jetty만 이러한 옵션을 공개하고 있다.

3.2.6. CORS

서블릿 스택과 같은

CORS를 설정하고 WebSocket 엔드포인트에 대한 액세스를 제한하는 가장 쉬운 방법은 WebSocketHandlerCorsConfigurationSource를 구현하고 허용된 발신원, 헤더, 그외 세부 사항을 포함하는 CorsConfiguration을 반환한다. 그럴 수 없는 경우는 SimpleUrlHandlercorsConfigurations 속성을 설정하여 URL 패턴에 CORS 설정을 지정할 수도 있다. 모두가 지정되어 있는 경우는 CorsConfigurationcombine 메소드를 사용하여 결합된다.

3.2.7. 클라이언트

Spring WebFlux은 Reactor Netty, Tomcat, Jetty, Undertow, 표준 Java (즉, JSR-356)의 구현을 제공하는 WebSocketClient 추상화를 제공한다.

Tomcat 클라이언트는 사실상, WebSocketSession 처리에 몇 가지 추가 기능을 갖춘 표준 Java의 확장이며, Tomcat 고유의 API를 이용하여 역 배압 메시지의 수신을 일시 중지한다.

WebSocket 세션을 시작하려면 클라이언트의 인스턴스를 만들어서 execute 메소드를 사용할 수 있다.

Java

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
        session.receive()
                .doOnNext(System.out::println)
                .then());

Kotlin

val client = ReactorNettyWebSocketClient()

        val url = URI("ws://localhost:8080/path")
        client.execute(url) { session ->
            session.receive()
                    .doOnNext(::println)
            .then()
        }

Jetty와 같은 일부 클라이언트는 Lifecycle 를 구현하고, 사용하기 전에 중지 및 시작해야 한다. 모든 클라이언트에는 기본되는 WebSocket 클라이언트 구성에 대한 생성자 옵션이 있다.




최종 수정 : 2021-04-12