@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> streamFlux() { return Flux.interval(Duration.ofSeconds(1)) .map(sequence -> "Flux - " + LocalTime.now().toString()); }
@GetMapping("/stream-sse") public Flux<ServerSentEvent<String>> streamEvents() { return Flux.interval(Duration.ofSeconds(1)) .map(sequence -> ServerSentEvent.<String> builder() .id(String.valueOf(sequence)) .event("periodic-event") .data("SSE - " + LocalTime.now().toString()) .build()); }
public void consumeServerSentEvent() { WebClient client = WebClient.create("http://localhost:8080/sse-server"); ParameterizedTypeReference<ServerSentEvent<String>> type = new ParameterizedTypeReference<ServerSentEvent<String>>() {}; Flux<ServerSentEvent<String>> eventStream = client.get() .uri("/stream-sse") .retrieve() .bodyToFlux(type); eventStream.subscribe( content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ", LocalTime.now(), content.event(), content.id(), content.data()), error -> logger.error("Error receiving SSE: {}", error), () -> logger.info("Completed!!!")); }
@GetMapping("/stream-sse-mvc") public SseEmitter streamSseMvc() { SseEmitter emitter = new SseEmitter(); ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor(); sseMvcExecutor.execute(() -> { try { for (int i = 0; true; i++) { SseEventBuilder event = SseEmitter.event() .data("SSE MVC - " + LocalTime.now().toString()) .id(String.valueOf(i)) .name("sse event - mvc"); emitter.send(event); Thread.sleep(1000); } } catch (Exception ex) { emitter.completeWithError(ex); } }); return emitter; }