복붙노트

[SPRING] Http 통합 흐름에서 Spring Reactor Flux를 만드는 방법은 무엇입니까?

SPRING

Http 통합 흐름에서 Spring Reactor Flux를 만드는 방법은 무엇입니까?

나는이 질문과 매우 흡사하다. ActiveMQ 큐에서 Spring Reactor Flux를 만드는 방법은?

한 가지 차이점은 메시지가 JMS 큐가 아닌 HTTP 엔드 포인트에서 온다는 것입니다. 문제는 메시지 채널이 어떤 이유로 든 채워지지 않거나 Flux.from ()에 의해 선택되지 않는다는 것입니다. GenericMessage는 경로 변수로 페이로드가 포함 된 Http Integration 플로우에서 생성되지만 채널에 대기열에 포함되거나 게시되지 않습니다. 나는 .channel (MessageChannels.queue ()) 및 .channel (MessageChannels.publishSubscribe ())을 시도했다. 아무런 차이가 없으면 이벤트 스트림이 비어 있습니다. 다음은 코드입니다.

@Bean
public Publisher<Message<String>> httpReactiveSource() {
        return IntegrationFlows.
                from(Http.inboundChannelAdapter("/eventmessage/{id}")
                        .requestMapping(r -> r
                        .methods(HttpMethod.POST)                                                                                   
                        )
                        .payloadExpression("#pathVariables.id")
                        )                           
                        .channel(MessageChannels.queue())
                        .log(LoggingHandler.Level.DEBUG)                            
                        .log()  
                        .toReactivePublisher();
    }


@GetMapping(value="eventmessagechannel/{id}", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages(@PathVariable String id){     
    return Flux.from(httpReactiveSource())              
            .map(Message::getPayload);

}

UPDATE1 :

build.gradle

buildscript {
    ext {
        springBootVersion = '2.0.0.M2'
    }
    repositories {
        mavenCentral()
        maven { url "https://repo.spring.io/snapshot" }
        maven { url "https://repo.spring.io/milestone" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
    maven { url "https://repo.spring.io/snapshot" }
    maven { url "https://repo.spring.io/milestone" }
}


dependencies {
    compile('org.springframework.boot:spring-boot-starter-freemarker')
    compile('org.springframework.boot:spring-boot-starter-integration') 
    compile('org.springframework.boot:spring-boot-starter-web')
    compile('org.springframework.boot:spring-boot-starter-webflux') 
    compile('org.springframework.integration:spring-integration-http')
    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile('io.projectreactor:reactor-test')

}

UPDATE2

@SpringBootApplication 및 @RestController가 하나의 파일에 정의되어있을 때 작동하지만 @SpringBootApplication 및 @RestController가 별도의 파일에있을 때 작동하지 않습니다.

TestApp.java

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class TestApp {
     public static void main(String[] args) {
            SpringApplication.run(TestApp.class, args);
    }
}

TestController.java

package com.example.controller;


import org.springframework.context.annotation.Bean;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.http.dsl.Http;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;



import org.springframework.web.bind.annotation.GetMapping;
import reactor.core.publisher.Flux;



@RestController
public class TestController {
     @Bean
        public Publisher<Message<String>> httpReactiveSource() {
            return IntegrationFlows.
                    from(Http.inboundChannelAdapter("/message/{id}")
                            .requestMapping(r -> r
                                    .methods(HttpMethod.POST)
                            )
                            .payloadExpression("#pathVariables.id")
                    )
                    .channel(MessageChannels.queue())
                    .toReactivePublisher();
        }

        @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<String> eventMessages() {
            return Flux.from(httpReactiveSource())
                    .map(Message::getPayload);
        }

}

해결법

  1. ==============================

    1.이것은 나를 위해 잘 작동합니다.

    이것은 나를 위해 잘 작동합니다.

    @SpringBootApplication
    @RestController
    public class SpringIntegrationSseDemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringIntegrationSseDemoApplication.class, args);
        }
    
        @Bean
        public Publisher<Message<String>> httpReactiveSource() {
            return IntegrationFlows.
                    from(Http.inboundChannelAdapter("/message/{id}")
                            .requestMapping(r -> r
                                    .methods(HttpMethod.POST)
                            )
                            .payloadExpression("#pathVariables.id")
                    )
                    .channel(MessageChannels.queue())
                    .toReactivePublisher();
        }
    
        @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<String> eventMessages() {
            return Flux.from(httpReactiveSource())
                    .map(Message::getPayload);
        }
    
    }
    

    POM에 다음과 같은 종속성이 있습니다.

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-http</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    

    나는 응용 프로그램을 실행하고 두 개의 터미널이 있습니다 :

    curl http://localhost:8080/events
    

    SSE 듣기.

    그리고 두 번째로 나는 이것을 수행합니다 :

    curl -X POST http://localhost:8080/message/foo
    
    curl -X POST http://localhost:8080/message/bar
    
    curl -X POST http://localhost:8080/message/666
    

    그래서 첫 번째 터미널은 다음과 같이 응답합니다.

    data:foo
    
    data:bar
    
    data:666
    

    Spring-boot-starter-webflux 의존성은 필요 없다. Flux to SSE는 Servlet Container의 일반 MVC와 잘 작동합니다.

    Spring Integration은 곧 WebFlux도 지원할 것입니다 : https://jira.spring.io/browse/INT-4300. 그래서 다음과 같은 것을 설정할 수 있습니다 :

       IntegrationFlows
        .from(Http.inboundReactiveGateway("/sse")
                            .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
    

    Servlet Container 종속성없이 WebFlux 만 사용할 수 있습니다.

  2. from https://stackoverflow.com/questions/45076028/how-to-create-a-spring-reactor-flux-from-http-integration-flow by cc-by-sa and MIT license