복붙노트

[SPRING] Spring RabbitMQ로 새로운 큐를 생성하는 방법?

SPRING

Spring RabbitMQ로 새로운 큐를 생성하는 방법?

내 (제한적) rabbit-mq 환경에서 아직 존재하지 않는 큐에 대한 새로운 리스너를 생성하면 큐가 자동으로 생성됩니다. 리스너를 설정하기 위해 rabbit-mq와 함께 Spring AMQP 프로젝트를 사용하려고하는데, 대신 오류가 발생합니다. 이것은 내 XML 구성입니다 :

<rabbit:connection-factory id="rabbitConnectionFactory" host="172.16.45.1" username="test" password="password" />

<rabbit:listener-container connection-factory="rabbitConnectionFactory"  >
    <rabbit:listener ref="testQueueListener" queue-names="test" />
</rabbit:listener-container>

<bean id="testQueueListener" class="com.levelsbeyond.rabbit.TestQueueListener"> 
</bean>

내 RabbitMq 로그에 다음과 같이 표시됩니다.

=ERROR REPORT==== 3-May-2013::23:17:24 ===
connection <0.1652.0>, channel 1 - soft error:
{amqp_error,not_found,"no queue 'test' in vhost '/'",'queue.declare'}

그리고 AMQP와 비슷한 오류 :

2013-05-03 23:17:24,059 ERROR [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] (SimpleAsyncTaskExecutor-1) - Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.

그것은 "수동"모드에서 대기열이 생성되고 스택 추적에서 보이는 것 - 아무도이 오류가 표시되지 않도록 수동 모드를 사용하지 않는 대기열을 만드는 방법을 지적 할 수 있습니까? 아니면 다른 것을 놓치고 있습니까?

해결법

  1. ==============================

    1.이전 버전의 스레드,하지만 여전히 Google에 꽤 높은 것으로 나타나므로 다음과 같은 새로운 정보가 있습니다.

    이전 버전의 스레드,하지만 여전히 Google에 꽤 높은 것으로 나타나므로 다음과 같은 새로운 정보가 있습니다.

    Spring 4.2.x와 Spring-Amqp 1.4.5.RELEASE 및 Spring-Rabbit 1.4.5.RELEASE 이후, 교환, 대기열 및 바인딩 선언은 @Configuration 클래스를 통해 매우 간단 해졌습니다. 몇 가지 주석 :

    @EnableRabbit
    @Configuration
    @PropertySources({
        @PropertySource("classpath:rabbitMq.properties")
    })
    public class RabbitMqConfig {    
        private static final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);
    
        @Value("${rabbitmq.host}")
        private String host;
    
        @Value("${rabbitmq.port:5672}")
        private int port;
    
        @Value("${rabbitmq.username}")
        private String username;
    
        @Value("${rabbitmq.password}")
        private String password;
    
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
    
            logger.info("Creating connection factory with: " + username + "@" + host + ":" + port);
    
            return connectionFactory;
        }
    
        /**
         * Required for executing adminstration functions against an AMQP Broker
         */
        @Bean
        public AmqpAdmin amqpAdmin() {
            return new RabbitAdmin(connectionFactory());
        }
    
        /**
         * This queue will be declared. This means it will be created if it does not exist. Once declared, you can do something
         * like the following:
         * 
         * @RabbitListener(queues = "#{@myDurableQueue}")
         * @Transactional
         * public void handleMyDurableQueueMessage(CustomDurableDto myMessage) {
         *    // Anything you want! This can also return a non-void which will queue it back in to the queue attached to @RabbitListener
         * }
         */
        @Bean
        public Queue myDurableQueue() {
            // This queue has the following properties:
            // name: my_durable
            // durable: true
            // exclusive: false
            // auto_delete: false
            return new Queue("my_durable", true, false, false);
        }
    
        /**
         * The following is a complete declaration of an exchange, a queue and a exchange-queue binding
         */
        @Bean
        public TopicExchange emailExchange() {
            return new TopicExchange("email", true, false);
        }
    
        @Bean
        public Queue inboundEmailQueue() {
            return new Queue("email_inbound", true, false, false);
        }
    
        @Bean
        public Binding inboundEmailExchangeBinding() {
            // Important part is the routing key -- this is just an example
            return BindingBuilder.bind(inboundEmailQueue()).to(emailExchange()).with("from.*");
        }
    }
    

    도움이 될만한 출처 및 문서 :

    참고 : Spring AMQP 1.5부터는 버전을 놓친 것처럼 보입니다. 리스너에서 전체 바인딩을 선언 할 수 있기 때문에 훨씬 쉽습니다!

  2. ==============================

    2.내 문제를 해결 한 것으로 보이는 것은 관리자를 추가하는 것입니다. 여기 내 XML입니다 :

    내 문제를 해결 한 것으로 보이는 것은 관리자를 추가하는 것입니다. 여기 내 XML입니다 :

    <rabbit:listener-container connection-factory="rabbitConnectionFactory"  >
        <rabbit:listener ref="orderQueueListener" queues="test.order" />
    </rabbit:listener-container>
    
    <rabbit:queue name="test.order"></rabbit:queue>
    
    <rabbit:admin id="amqpAdmin" connection-factory="rabbitConnectionFactory"/>
    
    <bean id="orderQueueListener" class="com.levelsbeyond.rabbit.OrderQueueListener">   
    </bean>
    
  3. ==============================

    3.연결 태그 다음에 수신기를 추가 할 수 있습니다.

    연결 태그 다음에 수신기를 추가 할 수 있습니다.

    <rabbit:queue name="test" auto-delete="true" durable="false" passive="false" />
    

    불행히도 XSD 스키마에 따르면 수동 속성 (위에 나열된)은 유효하지 않습니다. 그러나 필자가 보았던 모든 queue_declare 구현에서 passive는 유효한 queue_declare 매개 변수였습니다. 그게 효과가 있을지 또는 미래에 그것을 지원할 계획인지 여부에 대해 궁금합니다.

    다음은 대기열 선언에 대한 전체 옵션 목록입니다. http://www.rabbitmq.com/amqp-0-9-1-reference.html#class.queue

    다음은 스프링 토끼 스키마의 전체 XSD입니다 (주석 포함). http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd

  4. from https://stackoverflow.com/questions/16370911/how-to-get-spring-rabbitmq-to-create-a-new-queue by cc-by-sa and MIT license