[SPRING] ActiveMQ 대기열에서 스프링 반응기 유출을 만드는 방법?
SPRINGActiveMQ 대기열에서 스프링 반응기 유출을 만드는 방법?
JMS 큐에서 반응성 스트림 (Flux)을 생성하기 위해 Spring Reactor 3 구성 요소와 Spring Integration을 실험하고 있습니다.
클라이언트가 JMS 메시지를 비동기 적으로 가져 오기 위해 JMS 큐 (Spring Integration을 사용하는 ActiveMQ)에서 반응성 스트림 (Spring Reactor 3 Flux)을 생성하려고합니다. 나는 모든 것이 올바르게 연결되어 있다고 믿지만 서버가 멈출 때까지 클라이언트는 JMS 메시지를받지 못한다. 그런 다음 모든 메시지가 클라이언트에게 한 번 "푸시 (push)"됩니다.
어떤 도움을 주시면 감사하겠습니다.
다음은 JMS, 통합 구성 요소 및 리 액티브 게시자를 구성하기 위해 사용하고있는 구성 파일입니다.
@Configuration
@EnableJms
@EnableIntegration
public class JmsConfiguration {
@Value("${spring.activemq.broker-url:tcp://localhost:61616}")
private String defaultBrokerUrl;
@Value("${queues.patient:patient}")
private String patientQueue;
@Autowired
MessageListenerAdapter messageListenerAdapter;
@Bean
public DefaultJmsListenerContainerFactory myFactory(
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
configurer.configure(factory, jmsConnectionFactory());
return factory;
}
@Bean
public Queue patientQueue() {
return new ActiveMQQueue(patientQueue);
}
@Bean
public ActiveMQConnectionFactory jmsConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(defaultBrokerUrl);
connectionFactory.setTrustedPackages(Arrays.asList("com.sapinero"));
return connectionFactory;
}
// Set the jackson message converter
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(jmsConnectionFactory());
template.setDefaultDestinationName(patientQueue);
template.setMessageConverter(jacksonJmsMessageConverter());
return template;
}
@Bean
public MessageListenerAdapter messageListenerAdapter() {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
messageListenerAdapter.setMessageConverter(jacksonJmsMessageConverter());
return messageListenerAdapter;
}
@Bean
public AbstractMessageListenerContainer messageListenerContainer() {
DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setMessageConverter(jacksonJmsMessageConverter());
defaultMessageListenerContainer.setConnectionFactory(jmsConnectionFactory());
defaultMessageListenerContainer.setDestinationName(patientQueue);
defaultMessageListenerContainer.setMessageListener(messageListenerAdapter());
defaultMessageListenerContainer.setCacheLevel(100);
defaultMessageListenerContainer.setErrorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable t) {
t.printStackTrace();
}
});
return defaultMessageListenerContainer;
}
@Bean // Serialize message content to json using TextMessage
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public MessageChannel jmsOutboundInboundReplyChannel() {
return MessageChannels.queue().get();
}
@Bean
public Publisher<Message<String>> pollableReactiveFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(messageListenerContainer()).get())
.channel(MessageChannels.queue())
.log(LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher();
}
@Bean
public MessageChannel jmsChannel() {
return new DirectChannel();
}
Flux를 생성하는 컨트롤러는 다음과 같습니다.
@RestController
@RequestMapping("patients")
public class PatientChangePushController {
private LocalDateTime lastTimePatientDataRetrieved = LocalDateTime.now();
private int durationInSeconds = 30;
private Patient patient;
AtomicReference<SignalType> checkFinally = new AtomicReference<>();
@Autowired
PatientService patientService;
@Autowired
@Qualifier("pollableReactiveFlow")
private
Publisher<Message<String>> pollableReactiveFlow;
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Queue patientQueue;
/**
* Subscribe to a Flux of a patient that has been updated.
*
* @param id
* @return
*/
@GetMapping(value = "/{id}/alerts", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Message<String>> getPatientAlerts(@PathVariable Long id) {
Flux<Message<String>> messageFlux = Flux.from(pollableReactiveFlow);
return messageFlux;
}
@GetMapping(value = "/generate")
public void generateJmsMessage() {
for (long i = 0L; i < 100; i++) {
Patient patient = new Patient();
patient.setId(i);
send(patient);
System.out.println("Message was sent to the Queue");
}
}
void send(Patient patient) {
this.jmsTemplate.convertAndSend(this.patientQueue, patient);
}
}
서버가 종료 될 때까지 메시지가 클라이언트로 전송되지 않는 이유는 누구에게 말해 줄 수 있습니다. 감사하게 생각합니다.
해결법
-
==============================
1.나를 위해 잘 작동합니다.
나를 위해 잘 작동합니다.
@SpringBootApplication @RestController public class SpringIntegrationSseDemoApplication { public static void main(String[] args) { SpringApplication.run(SpringIntegrationSseDemoApplication.class, args); } @Autowired private ConnectionFactory connectionFactory; @Autowired private JmsTemplate jmsTemplate; @Bean public Publisher<Message<String>> jmsReactiveSource() { return IntegrationFlows .from(Jms.messageDrivenChannelAdapter(this.connectionFactory) .destination("testQueue")) .channel(MessageChannels.queue()) .log(LoggingHandler.Level.DEBUG) .log() .toReactivePublisher(); } @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> getPatientAlerts() { return Flux.from(jmsReactiveSource()) .map(Message::getPayload); } @GetMapping(value = "/generate") public void generateJmsMessage() { for (int i = 0; i < 100; i++) { this.jmsTemplate.convertAndSend("testQueue", "testMessage #" + (i + 1)); } } }
하나의 터미널에서 나는 Flux로부터 SSE를 기다리는 http : // localhost : 8080 / events를 컬한다.
다른 말단에서는 컬 (curl) http : // localhost : 8080 / generate를 생성하고 첫 번째 것을 봅니다.
data:testMessage #1 data:testMessage #2 data:testMessage #3 data:testMessage #4
나는 Spring Boot 2.0.0을 사용한다. BUILD-SNAPSHOT.
또한 여기를 참조하십시오 : https://spring.io/blog/2017/03/08/spring-tips-server-sent-events-sese
from https://stackoverflow.com/questions/43126775/how-to-create-a-spring-reactor-flux-from-a-activemq-queue by cc-by-sa and MIT license
'SPRING' 카테고리의 다른 글
[SPRING] Spring Data Elasticsearch는 Amazon Elasticsearch를 지원합니까? (0) | 2019.04.18 |
---|---|
[SPRING] 요람 | 스프링 부트 의존성은 제외되지 않는다. (0) | 2019.04.18 |
[SPRING] 스프링 다이나믹 모듈 - 살아있는 프로젝트입니까? (0) | 2019.04.18 |
[SPRING] 하나의 속성에 매핑되지 않은 최대 절전 모드의 referencedColumnNames (0) | 2019.04.18 |
[SPRING] 스프링 부트 yml ResourceBundle 파일 (0) | 2019.04.18 |