복붙노트

[SPRING] Java 및 Spring 3.0을 사용하여 JMS 항목 (대기열 아님)에서 동시에 여러 메시지를 처리 ​​할 수 ​​있습니까?

SPRING

Java 및 Spring 3.0을 사용하여 JMS 항목 (대기열 아님)에서 동시에 여러 메시지를 처리 ​​할 수 ​​있습니까?

여러 메시지 청취자가 주제의 연속 메시지를 동시에 처리하기를 바랍니다. 또한 각 메시지 수신기가 트랜잭션 방식으로 작동하도록하여 주어진 메시지 수신기에서 처리 오류가 발생하면 해당 수신기의 메시지가 해당 항목에 남아있게됩니다.

Spring DefaultMessageListenerContainer는 JMS 대기열에 대해서만 동시성을 지원하는 것으로 보인다.

여러 개의 DefaultMessageListenerContainers를 인스턴스화해야합니까?

시간이 수직축 아래로 흐르면 :

ListenerA reads msg 1        ListenerB reads msg 2        ListenerC reads msg 3
ListenerA reads msg 4        ListenerB reads msg 5        ListenerC reads msg 6
ListenerA reads msg 7        ListenerB reads msg 8        ListenerC reads msg 9
ListenerA reads msg 10       ListenerB reads msg 11       ListenerC reads msg 12
...

최신 정보: @ T.Rob과 @skaffman님께 의견을 보내 주셔서 감사합니다.

내가 한 일은 Concurrency = 1로 여러 DefaultMessageListenerContainers를 생성 한 다음 하나의 스레드 만 주어진 메시지 ID를 처리 할 수 ​​있도록 로직을 메시지 수신기에 넣는 것입니다.

해결법

  1. ==============================

    1.여러 개의 DefaultMessageListenerContainer 인스턴스 (no)는 원하지 않지만 concurrentConsumers 등록 정보를 사용하여 DefaultMessageListenerContainer를 동시 구성으로 구성해야합니다.

    여러 개의 DefaultMessageListenerContainer 인스턴스 (no)는 원하지 않지만 concurrentConsumers 등록 정보를 사용하여 DefaultMessageListenerContainer를 동시 구성으로 구성해야합니다.

    그러나 하단에는 큰 경고가 있습니다.

    이것은 흥미롭고, 당신이 그것에 대해 생각할 때 의미가 있습니다. DefaultMessageListenerContainer 인스턴스가 여러 개있는 경우에도 마찬가지입니다.

    나는 내가 당신에게 무엇을 제안 할 지 모르겠지만 아마도 당신이 당신의 디자인을 다시 생각할 필요가 있다고 생각합니다. pub / sub 메시지의 동시 소비는 완벽하게 합당한 것처럼 보이지만 동일한 메시지가 동시에 모든 소비자에게 전달되는 것을 피하는 방법은 무엇입니까?

  2. ==============================

    2.최소한 ActiveMQ에서 원하는 것은 전적으로 지원되며, 그의 이름은 VirtualTopic입니다.

    최소한 ActiveMQ에서 원하는 것은 전적으로 지원되며, 그의 이름은 VirtualTopic입니다.

    개념은 다음과 같습니다.

    여기에 코드

    @Component
    public class ColorReceiver {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        // simply generating data to the topic
        long id=0;
        @Scheduled(fixedDelay = 500)
        public void postMail() throws JMSException, IOException {
    
            final Color colorName = new Color[]{Color.BLUE, Color.RED, Color.WHITE}[new Random().nextInt(3)];
            final Color color = new Color(++id, colorName.getName());
            final ActiveMQObjectMessage message = new ActiveMQObjectMessage();
            message.setObject(color);
            message.setProperty("color", color.getName());
            LOGGER.info("status=color-post, color={}", color);
            jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.color"), message);
        }
    
        /**
         * Listen all colors messages
         */
        @JmsListener(
            destination = "Consumer.client1.VirtualTopic.color", containerFactory = "colorContainer"
            selector = "color <> 'RED'"
        )
        public void genericReceiveMessage(Color color) throws InterruptedException {
            LOGGER.info("status=GEN-color-receiver, color={}", color);
        }
    
        /**
         * Listen only red colors messages
         *
         * the destination ClientId have not necessary exists (it means that his name can be a fancy name), the unique requirement is that
         * the containers clientId need to be different between each other
         */
        @JmsListener(
    //      destination = "Consumer.redColorContainer.VirtualTopic.color",
            destination = "Consumer.client1.VirtualTopic.color",
            containerFactory = "redColorContainer", selector = "color='RED'"
        )
        public void receiveMessage(ObjectMessage message) throws InterruptedException, JMSException {
            LOGGER.info("status=RED-color-receiver, color={}", message.getObject());
        }
    
        /**
         * Listen all colors messages
         */
        @JmsListener(
            destination = "Consumer.client2.VirtualTopic.color", containerFactory = "colorContainer"
        )
        public void genericReceiveMessage2(Color color) throws InterruptedException {
            LOGGER.info("status=GEN-color-receiver-2, color={}", color);
        }
    
    }
    
    @SpringBootApplication
    @EnableJms
    @EnableScheduling
    @Configuration
    public class Config {
    
        /**
         * Each @JmsListener declaration need a different containerFactory because ActiveMQ requires different
         * clientIds per consumer pool (as two @JmsListener above, or two application instances)
         * 
         */
        @Bean
        public JmsListenerContainerFactory<?> colorContainer(ActiveMQConnectionFactory connectionFactory, 
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
    
            final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setConcurrency("1-5");
            configurer.configure(factory, connectionFactory);
            // container.setClientId("aId..."); lets spring generate a random ID
            return factory;
        }
    
        @Bean
        public JmsListenerContainerFactory<?> redColorContainer(ActiveMQConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
    
            // necessary when post serializable objects (you can set it at application.properties)
            connectionFactory.setTrustedPackages(Arrays.asList(Color.class.getPackage().getName()));
    
            final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setConcurrency("1-2");
            configurer.configure(factory, connectionFactory);
            return factory;
        }
    
    }
    
    public class Color implements Serializable {
    
        public static final Color WHITE = new Color("WHITE");
        public static final Color BLUE = new Color("BLUE");
        public static final Color RED = new Color("RED");
    
        private String name;
        private long id;
    
        // CONSTRUCTORS, GETTERS AND SETTERS
    }
    
  3. ==============================

    3.이것은 JMS의 추상화를 통해 전송 공급자의 차이점을 포착하는 경우 중 하나입니다. JMS는 주제에 대한 각 가입자의 메시지 사본을 제공하려고합니다. 그러나 원하는 동작은 실제로 대기열의 동작입니다. 나는 거기에 설명되지 않은 술집 / 하위 솔루션을 운전하는 다른 요구 사항이 있다고 의심합니다. 예를 들어 다른 것들은 앱과 독립적으로 동일한 주제를 구독해야합니다.

    이것은 JMS의 추상화를 통해 전송 공급자의 차이점을 포착하는 경우 중 하나입니다. JMS는 주제에 대한 각 가입자의 메시지 사본을 제공하려고합니다. 그러나 원하는 동작은 실제로 대기열의 동작입니다. 나는 거기에 설명되지 않은 술집 / 하위 솔루션을 운전하는 다른 요구 사항이 있다고 의심합니다. 예를 들어 다른 것들은 앱과 독립적으로 동일한 주제를 구독해야합니다.

    WebSphere MQ에서이 작업을 수행하려는 경우, 주어진 주제에 대한 각 메시지의 단일 사본이 대기열에 배치되도록하는 관리 서브 스크립 션을 작성하는 것이 해결책입니다. 그런 다음 여러 구독자가 해당 큐의 메시지를 놓고 경쟁 할 수 있습니다. 이 방법으로 앱은 메시지가 배포되는 여러 스레드를 가질 수 있으며 동시에이 애플리케이션과 독립적 인 다른 구독자가 동일한 주제를 동적으로 구독 할 수 있습니다.

    유감 스럽지만, JMS를 수행 할 수있는 일반적인 방법은 없습니다. 전송 제공자의 구현에 크게 의존합니다. 내가 말할 수있는 유일한 것 중 하나는 WebSphere MQ이지만, 다른 전송 장치가 한 방향으로 또는 다른 방향으로 그리고 당신이 창조적이라면 다양한 각도로 지원한다는 것을 확신합니다.

  4. ==============================

    4.가능성은 다음과 같습니다.

    가능성은 다음과 같습니다.

    1) 들어오는 메시지를 처리 ​​할 Bean 및 메소드로 구성된 하나의 DMLC 만 작성하십시오. 동시성을 1로 설정하십시오.

    2) 태스크 실행자를 #threads가 원하는 동시성과 같게 구성하십시오. 실제로 메시지를 처리 ​​할 것으로 예상되는 개체에 대한 개체 풀을 만듭니다. # 1에서 구성한 bean에 태스크 실행자 및 오브젝트 풀에 대한 참조를 제공하십시오. 실제 메시지 처리 bean이 스레드로부터 안전하지 않으면 오브젝트 풀이 유용합니다.

    3) 들어오는 메시지의 경우 DMLC의 Bean은 사용자 정의 Runnable을 작성하고 메시지 및 오브젝트 풀을 가리키며이를 태스크 실행자에게 제공합니다.

    4) Runnable의 run 메소드는 객체 풀에서 빈을 가져오고 주어진 메시지와 함께 'process'메소드를 호출한다.

    # 4는 프록시와 객체 풀을 사용하여 쉽게 관리 할 수 ​​있습니다.

    아직이 솔루션을 시도하지는 않았지만 청구서에 맞는 것으로 보입니다. 이 솔루션은 EJB MDB만큼 강력하지는 않습니다. 예 : 봄 RuntimeException을 throw하는 경우 풀에서 객체를 버리지 않습니다.

  5. ==============================

    5.나는 같은 문제를 겪었다. 저는 현재 RabbitMQ를 조사 중입니다. RabbitMQ는 "작업 대기열"이라고 부르는 디자인 패턴에서 완벽한 솔루션을 제공하는 것으로 보입니다. 자세한 정보는 http://www.rabbitmq.com/tutorials/tutorial-two-java.html을 참조하십시오.

    나는 같은 문제를 겪었다. 저는 현재 RabbitMQ를 조사 중입니다. RabbitMQ는 "작업 대기열"이라고 부르는 디자인 패턴에서 완벽한 솔루션을 제공하는 것으로 보입니다. 자세한 정보는 http://www.rabbitmq.com/tutorials/tutorial-two-java.html을 참조하십시오.

    JMS에 완전히 묶여 있지 않다면이 문제를 조사 할 수 있습니다. JMS에서 AMQP 로의 브릿지도있을 수 있지만, 해커처럼 보일 수도 있습니다.

    RabbitMQ를 설치하고 Mac에서 실행하는 재미 (읽는 데 어려움)가 있습니다. 그러나 제대로 작동하고 있다고 생각합니다. 문제를 해결할 수 있으면 다시 게시 해 드리겠습니다.

  6. ==============================

    6.커스텀 작업 실행자를 생성하는 것은 겉으로보기에는 나에게 중복 처리를 사용하지 않고이 문제를 해결했다.

    커스텀 작업 실행자를 생성하는 것은 겉으로보기에는 나에게 중복 처리를 사용하지 않고이 문제를 해결했다.

    @Configuration
    class BeanConfig {
        @Bean(destroyMethod = "shutdown")
        public ThreadPoolTaskExecutor topicExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setAllowCoreThreadTimeOut(true);
            executor.setKeepAliveSeconds(300);
            executor.setCorePoolSize(4);
            executor.setQueueCapacity(0);
            executor.setThreadNamePrefix("TOPIC-");
            return executor;
        }
    
        @Bean
        JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer, @Qualifier("topicExecutor") Executor topicExecutor) {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setPubSubDomain(true);
            configurer.configure(factory, connectionFactory);
            factory.setPubSubDomain(true);
            factory.setSessionTransacted(false);
            factory.setSubscriptionDurable(false);
            factory.setTaskExecutor(topicExecutor);
            return factory;
        }
    
    }
    
    class MyBean {
        @JmsListener(destination = "MYTOPIC", containerFactory = "topicListenerFactory", concurrency = "1")
        public void receiveTopicMessage(SomeTopicMessage message) {}
    }
    
  7. ==============================

    7.이 질문을 가로 질러 왔어. 내 구성은 다음과 같습니다.

    이 질문을 가로 질러 왔어. 내 구성은 다음과 같습니다.

    id = "DefaultListenerContainer"로 bean을 작성하고, 특성 이름 = "concurrentConsumers"value = "10"및 특성 이름 = "maxConcurrentConsumers"value = "50"을 추가하십시오.

    지금까지 잘 작동합니다. 스레드 ID를 인쇄하고 여러 스레드가 만들어지고 다시 사용되는지 확인했습니다.

  8. from https://stackoverflow.com/questions/3088814/how-can-i-handle-multiple-messages-concurrently-from-a-jms-topic-not-queue-wit by cc-by-sa and MIT license