@Bean
public HandlerMapping webSocketHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/event-emitter", webSocketHandler);
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(1);
handlerMapping.setUrlMap(map);
return handlerMapping;
}
@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {
// private fields ...
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
return webSocketSession.send(intervalFlux
.map(webSocketSession::textMessage))
.and(webSocketSession.receive()
.map(WebSocketMessage::getPayloadAsText)
.log());
}
}
public class ReactiveJavaClientWebSocket {
public static void main(String[] args) throws InterruptedException {
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(
URI.create("ws://localhost:8080/event-emitter"),
session -> session.send(
Mono.just(session.textMessage("event-spring-reactive-client-websocket")))
.thenMany(session.receive()
.map(WebSocketMessage::getPayloadAsText)
.log())
.then())
.block(Duration.ofSeconds(10L));
}
}