[SPRING] Http 통합 흐름에서 Spring Reactor Flux를 만드는 방법은 무엇입니까?
SPRINGHttp 통합 흐름에서 Spring Reactor Flux를 만드는 방법은 무엇입니까?
나는이 질문과 매우 흡사하다. ActiveMQ 큐에서 Spring Reactor Flux를 만드는 방법은?
한 가지 차이점은 메시지가 JMS 큐가 아닌 HTTP 엔드 포인트에서 온다는 것입니다. 문제는 메시지 채널이 어떤 이유로 든 채워지지 않거나 Flux.from ()에 의해 선택되지 않는다는 것입니다. GenericMessage는 경로 변수로 페이로드가 포함 된 Http Integration 플로우에서 생성되지만 채널에 대기열에 포함되거나 게시되지 않습니다. 나는 .channel (MessageChannels.queue ()) 및 .channel (MessageChannels.publishSubscribe ())을 시도했다. 아무런 차이가 없으면 이벤트 스트림이 비어 있습니다. 다음은 코드입니다.
@Bean
public Publisher<Message<String>> httpReactiveSource() {
return IntegrationFlows.
from(Http.inboundChannelAdapter("/eventmessage/{id}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.payloadExpression("#pathVariables.id")
)
.channel(MessageChannels.queue())
.log(LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher();
}
@GetMapping(value="eventmessagechannel/{id}", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages(@PathVariable String id){
return Flux.from(httpReactiveSource())
.map(Message::getPayload);
}
UPDATE1 :
build.gradle
buildscript {
ext {
springBootVersion = '2.0.0.M2'
}
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-freemarker')
compile('org.springframework.boot:spring-boot-starter-integration')
compile('org.springframework.boot:spring-boot-starter-web')
compile('org.springframework.boot:spring-boot-starter-webflux')
compile('org.springframework.integration:spring-integration-http')
testCompile('org.springframework.boot:spring-boot-starter-test')
testCompile('io.projectreactor:reactor-test')
}
UPDATE2
@SpringBootApplication 및 @RestController가 하나의 파일에 정의되어있을 때 작동하지만 @SpringBootApplication 및 @RestController가 별도의 파일에있을 때 작동하지 않습니다.
TestApp.java
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TestApp {
public static void main(String[] args) {
SpringApplication.run(TestApp.class, args);
}
}
TestController.java
package com.example.controller;
import org.springframework.context.annotation.Bean;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.http.dsl.Http;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.GetMapping;
import reactor.core.publisher.Flux;
@RestController
public class TestController {
@Bean
public Publisher<Message<String>> httpReactiveSource() {
return IntegrationFlows.
from(Http.inboundChannelAdapter("/message/{id}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.payloadExpression("#pathVariables.id")
)
.channel(MessageChannels.queue())
.toReactivePublisher();
}
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages() {
return Flux.from(httpReactiveSource())
.map(Message::getPayload);
}
}
해결법
-
==============================
1.이것은 나를 위해 잘 작동합니다.
이것은 나를 위해 잘 작동합니다.
@SpringBootApplication @RestController public class SpringIntegrationSseDemoApplication { public static void main(String[] args) { SpringApplication.run(SpringIntegrationSseDemoApplication.class, args); } @Bean public Publisher<Message<String>> httpReactiveSource() { return IntegrationFlows. from(Http.inboundChannelAdapter("/message/{id}") .requestMapping(r -> r .methods(HttpMethod.POST) ) .payloadExpression("#pathVariables.id") ) .channel(MessageChannels.queue()) .toReactivePublisher(); } @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> eventMessages() { return Flux.from(httpReactiveSource()) .map(Message::getPayload); } }
POM에 다음과 같은 종속성이 있습니다.
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.BUILD-SNAPSHOT</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-http</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
나는 응용 프로그램을 실행하고 두 개의 터미널이 있습니다 :
curl http://localhost:8080/events
SSE 듣기.
그리고 두 번째로 나는 이것을 수행합니다 :
curl -X POST http://localhost:8080/message/foo curl -X POST http://localhost:8080/message/bar curl -X POST http://localhost:8080/message/666
그래서 첫 번째 터미널은 다음과 같이 응답합니다.
data:foo data:bar data:666
Spring-boot-starter-webflux 의존성은 필요 없다. Flux to SSE는 Servlet Container의 일반 MVC와 잘 작동합니다.
Spring Integration은 곧 WebFlux도 지원할 것입니다 : https://jira.spring.io/browse/INT-4300. 그래서 다음과 같은 것을 설정할 수 있습니다 :
IntegrationFlows .from(Http.inboundReactiveGateway("/sse") .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
Servlet Container 종속성없이 WebFlux 만 사용할 수 있습니다.
from https://stackoverflow.com/questions/45076028/how-to-create-a-spring-reactor-flux-from-http-integration-flow by cc-by-sa and MIT license