복붙노트

[SPRING] ActiveMQ 대기열에서 스프링 반응기 유출을 만드는 방법?

SPRING

ActiveMQ 대기열에서 스프링 반응기 유출을 만드는 방법?

JMS 큐에서 반응성 스트림 (Flux)을 생성하기 위해 Spring Reactor 3 구성 요소와 Spring Integration을 실험하고 있습니다.

클라이언트가 JMS 메시지를 비동기 적으로 가져 오기 위해 JMS 큐 (Spring Integration을 사용하는 ActiveMQ)에서 반응성 스트림 (Spring Reactor 3 Flux)을 생성하려고합니다. 나는 모든 것이 올바르게 연결되어 있다고 믿지만 서버가 멈출 때까지 클라이언트는 JMS 메시지를받지 못한다. 그런 다음 모든 메시지가 클라이언트에게 한 번 "푸시 (push)"됩니다.

어떤 도움을 주시면 감사하겠습니다.

다음은 JMS, 통합 구성 요소 및 리 액티브 게시자를 구성하기 위해 사용하고있는 구성 파일입니다.

@Configuration
@EnableJms
@EnableIntegration
public class JmsConfiguration {

    @Value("${spring.activemq.broker-url:tcp://localhost:61616}")
    private String defaultBrokerUrl;

    @Value("${queues.patient:patient}")
    private String patientQueue;

    @Autowired
    MessageListenerAdapter messageListenerAdapter;

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, jmsConnectionFactory());
        return factory;
    }

    @Bean
    public Queue patientQueue() {
        return new ActiveMQQueue(patientQueue);

    }

    @Bean
    public ActiveMQConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(defaultBrokerUrl);
        connectionFactory.setTrustedPackages(Arrays.asList("com.sapinero"));
        return connectionFactory;
    }

    // Set the jackson message converter
    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(jmsConnectionFactory());
        template.setDefaultDestinationName(patientQueue);
        template.setMessageConverter(jacksonJmsMessageConverter());
        return template;
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter() {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        messageListenerAdapter.setMessageConverter(jacksonJmsMessageConverter());
        return messageListenerAdapter;
    }

    @Bean
    public AbstractMessageListenerContainer messageListenerContainer() {
        DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setMessageConverter(jacksonJmsMessageConverter());
        defaultMessageListenerContainer.setConnectionFactory(jmsConnectionFactory());
        defaultMessageListenerContainer.setDestinationName(patientQueue);
        defaultMessageListenerContainer.setMessageListener(messageListenerAdapter());
        defaultMessageListenerContainer.setCacheLevel(100);
        defaultMessageListenerContainer.setErrorHandler(new ErrorHandler() {
            @Override
            public void handleError(Throwable t) {
                t.printStackTrace();
            }
        });

        return defaultMessageListenerContainer;
    }

    @Bean // Serialize message content to json using TextMessage
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }


    @Bean
    public MessageChannel jmsOutboundInboundReplyChannel() {
        return MessageChannels.queue().get();
    }

    @Bean
    public Publisher<Message<String>> pollableReactiveFlow() {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(messageListenerContainer()).get())
                .channel(MessageChannels.queue())
                .log(LoggingHandler.Level.DEBUG)
                .log()
                .toReactivePublisher();
    }

    @Bean
    public MessageChannel jmsChannel() {
        return new DirectChannel();
    }

Flux를 생성하는 컨트롤러는 다음과 같습니다.

@RestController
@RequestMapping("patients")
public class PatientChangePushController {
    private LocalDateTime lastTimePatientDataRetrieved = LocalDateTime.now();
    private int durationInSeconds = 30;
    private Patient patient;
    AtomicReference<SignalType> checkFinally = new AtomicReference<>();

    @Autowired
    PatientService patientService;

    @Autowired
    @Qualifier("pollableReactiveFlow")
    private
    Publisher<Message<String>> pollableReactiveFlow;

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Queue patientQueue;

    /**
     * Subscribe to a Flux of a patient that has been updated.
     *
     * @param id
     * @return
     */
    @GetMapping(value = "/{id}/alerts", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Message<String>> getPatientAlerts(@PathVariable Long id) {

        Flux<Message<String>> messageFlux = Flux.from(pollableReactiveFlow);
        return messageFlux;
    }

    @GetMapping(value = "/generate")
    public void generateJmsMessage() {
        for (long i = 0L; i < 100; i++) {
            Patient patient = new Patient();
            patient.setId(i);
            send(patient);
            System.out.println("Message was sent to the Queue");
        }

    }

    void send(Patient patient) {
        this.jmsTemplate.convertAndSend(this.patientQueue, patient);
    }

}

서버가 종료 될 때까지 메시지가 클라이언트로 전송되지 않는 이유는 누구에게 말해 줄 수 있습니다. 감사하게 생각합니다.

해결법

  1. ==============================

    1.나를 위해 잘 작동합니다.

    나를 위해 잘 작동합니다.

    @SpringBootApplication
    @RestController
    public class SpringIntegrationSseDemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringIntegrationSseDemoApplication.class, args);
        }
    
        @Autowired
        private ConnectionFactory connectionFactory;
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        @Bean
        public Publisher<Message<String>> jmsReactiveSource() {
            return IntegrationFlows
                    .from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
                            .destination("testQueue"))
                    .channel(MessageChannels.queue())
                    .log(LoggingHandler.Level.DEBUG)
                    .log()
                    .toReactivePublisher();
        }
    
        @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<String> getPatientAlerts() {
            return Flux.from(jmsReactiveSource())
                    .map(Message::getPayload);
        }
    
        @GetMapping(value = "/generate")
        public void generateJmsMessage() {
            for (int i = 0; i < 100; i++) {
                this.jmsTemplate.convertAndSend("testQueue", "testMessage #" + (i + 1));
            }
        }
    
    }
    

    하나의 터미널에서 나는 Flux로부터 SSE를 기다리는 http : // localhost : 8080 / events를 컬한다.

    다른 말단에서는 컬 (curl) http : // localhost : 8080 / generate를 생성하고 첫 번째 것을 봅니다.

    data:testMessage #1
    
    data:testMessage #2
    
    data:testMessage #3
    
    data:testMessage #4
    

    나는 Spring Boot 2.0.0을 사용한다. BUILD-SNAPSHOT.

    또한 여기를 참조하십시오 : https://spring.io/blog/2017/03/08/spring-tips-server-sent-events-sese

  2. from https://stackoverflow.com/questions/43126775/how-to-create-a-spring-reactor-flux-from-a-activemq-queue by cc-by-sa and MIT license