복붙노트

[SPRING] 봄 AMQP v1.4.2 - 네트워크 실패시 토끼 재접속 문제

SPRING

봄 AMQP v1.4.2 - 네트워크 실패시 토끼 재접속 문제

Spring AMQP v1.4.2에서 다음 시나리오를 테스트 중이며 네트워크 중단 후 다시 연결하지 못합니다.

또한 iptables 드롭 대신 VM 네트워크 어댑터 연결 끊기를 사용하여 동일한 시나리오를 테스트했으며 똑같은 일이 발생했습니다. 즉 자동 재 연결이 필요하지 않습니다. 흥미롭게도, 내가 iptables REJECT 대신 DROP를 시도하면 예상대로 작동하고 거부 규칙을 제거하자 마자 응용 프로그램이 다시 시작되지만 거부는 네트워크 오류보다 서버 오류와 비슷하다고 생각합니다.

참고 문헌에 따르면 :

연결 해제 후 약 1 분 정도 걸리는 로그입니다.

    2015-01-16 14:00:42,433 WARN  [SimpleAsyncTaskExecutor-5] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565) ~[amqp-client-3.4.2.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: java.io.EOFException: null
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.7.0_55]
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ~[amqp-client-3.4.2.jar:na]
    ... 1 common frames omitted

재 연결 후 몇 초 후에이 로그 메시지가 나타납니다.

2015-01-16 14:18:14,551 WARN  [SimpleAsyncTaskExecutor-2] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection timed out

업데이트 : 꽤 이상하게도 org.springframework.amqp 패키지에 DEBUG 로깅을 활성화하면 재 연결이 성공적으로 수행되어 더 이상 문제를 재현 할 수 없습니다!

디버그 로깅을 사용하지 않으면 스프링 AMQP 코드를 디버깅하려고했습니다. 나는 iptables drop이 제거 된 직후 SimpleMessageListenerContainer.doStop () 메서드가 호출되어 shutdown ()을 호출하고 모든 채널을 취소한다는 것을 관찰했다. 원인에 관련된 것으로 보이는 doStop ()에 중단 점을 넣으면이 로그 메시지가 나타납니다.

2015-01-20 15:28:44,200 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:44,243 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:44,243 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Queue declaration failed; retries left=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) [spring-rabbit-1.4.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
2015-01-20 15:28:49,245 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:49,283 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:49,300 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:429) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    ... 2 common frames omitted
2015-01-20 15:28:49,301 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Stopping container from aborted consumer

업데이트 2 : 응답에 제안 된대로 요청 된 하트 비트를 30 초로 설정하면 재 연결이 대부분의 시간 동안 작동하고 팬 아웃 교환에 바인딩 된 전용 임시 큐를 다시 정의하는 데 성공했지만 가끔씩 다시 연결하지 못합니다.

드문 경우지만, 테스트 도중 RabbitMQ 관리 콘솔을 모니터링하고 새로운 연결이 설정되었지만 (이전 연결이 시간 초과에 의해 제거 된 후) 재 연결 후 독점적 인 임시 대기열이 다시 정의되지 않는 것을 관찰했습니다. 또한 클라이언트가 메시지를받지 못했습니다. 덜 자주 발생하므로 문제를 신뢰성있게 재현하는 것이 현재 매우 어렵습니다. 아래의 전체 구성을 제공했으며 이제는 대기열 선언을 포함합니다.

UPDATE 3 : 독점적 인 임시 대기열을 이름이 지정된 자동 삭제 대기열로 바꾼 후에도 가끔씩 동일한 문제가 발생합니다. 즉, 재 연결 후 명명 된 대기열 자동 삭제가 다시 정의되지 않고 응용 프로그램이 다시 시작될 때까지 메시지가 수신되지 않습니다.

누군가가 나를 도울 수 있으면 정말 고맙습니다.

다음은 제가 의존하고있는 AMQP 설정입니다 :

<!-- Create a temporary exclusive queue to subscribe to the control exchange -->
<rabbit:queue id="control-queue"/>

<!-- Bind the temporary queue to the control exchange -->
<rabbit:fanout-exchange name="control">
    <rabbit:bindings>
        <rabbit:binding queue="control-queue"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<!-- Subscribe to the temporary queue -->
<rabbit:listener-container connection-factory="connection-factory"
                           acknowledge="none"
                           concurrency="1"
                           prefetch="1">
    <rabbit:listener queues="control-queue" ref="controlQueueConsumer"/>

</rabbit:listener-container>

<rabbit:connection-factory id="connection-factory"
                           username="${rabbit.username}"
                           password="${rabbit.password}"
                           host="${rabbit.host}"
                           virtual-host="${rabbit.virtualhost}"
                           publisher-confirms="true" 
                           channel-cache-size="100"
                           requested-heartbeat="30" />

<rabbit:admin id="admin" connection-factory="connection-factory"/>

<rabbit:queue id="qu0-id" name="qu0">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="dead-letter"/>
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:topic-exchange id="default-exchange" name="default-ex" declared-by="admin">
    <rabbit:bindings>
        <rabbit:binding queue="qu0" pattern="p.0"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

<rabbit:listener-container connection-factory="connection-factory"
                           acknowledge="manual"
                           concurrency="4"
                           prefetch="30">
    <rabbit:listener queues="qu0" ref="queueConsumerComponent"/>
</rabbit:listener-container>

해결법

  1. ==============================

    1.난 그냥 (패킷을 드롭 iptables를 사용하여 리눅스에 토끼) 설명한대로 테스트를 실행.

    난 그냥 (패킷을 드롭 iptables를 사용하여 리눅스에 토끼) 설명한대로 테스트를 실행.

    연결이 다시 설정되면 로그가 표시되지 않습니다 (아마도해야합니다).

    재 연결을 보려면 디버그 로깅을 설정하는 것이 좋습니다.

    편집하다:

    rabbitmq 문서에서 :

    귀하의 예외 사항 :

    그래서 문제는 브로커가 여전히 다른 연결이 있다고 생각한다는 것입니다.

  2. ==============================

    2.우리는 프로덕션 환경에서도이 문제에 직면 해 있습니다. 다른 ESX 랙에서 VM 노드로 실행되는 Rabbit 노드 때문일 수 있습니다. 우리가 발견 한 해결 방법은 클러스터에서 연결이 끊어지면 클라이언트 응용 프로그램이 계속 다시 연결하려고하는 것입니다 . 아래는 우리가 적용한 설정이며 작동했습니다 :

    우리는 프로덕션 환경에서도이 문제에 직면 해 있습니다. 다른 ESX 랙에서 VM 노드로 실행되는 Rabbit 노드 때문일 수 있습니다. 우리가 발견 한 해결 방법은 클러스터에서 연결이 끊어지면 클라이언트 응용 프로그램이 계속 다시 연결하려고하는 것입니다 . 아래는 우리가 적용한 설정이며 작동했습니다 :

    <util:properties id="spring.amqp.global.properties">
      <prop key="smlc.missing.queues.fatal">false</prop>
    </util:properties>
    

    이 속성은 치명적인 오류 (브로커를 사용할 수없는 경우 등)에 대한 대기열 선언이 실패 할 때 Spring AMQP의 전역 동작을 변경합니다. 기본적으로 컨테이너는 3 번만 시도합니다 ( "retries left = 0"을 나타내는 로그 메시지 참조).

    참조 : http://docs.spring.io/spring-amqp/reference/htmlsingle/#containerAttributes

    또한 컨테이너가 치명적이지 않은 오류에서 복구되도록 복구 간격을 추가했습니다. 그러나 글로벌 동작이 치명적인 오류 (예 : 누락 된 큐)를 다시 시도하는 경우에도 동일한 구성이 사용됩니다.

    <rabbit:listener-container recovery-interval="15000" connection-factory="consumerConnectionFactory">
    ....
    </rabbit:listener-container>
    
  3. ==============================

    3.setRequestedHeartBeat를 ConnectionFactory로 설정하고 setMissingQueuesFatal (false)를 SimpleMessageListenerContainer로 설정하여 무기한으로 연결을 재 시도하십시오. 기본적으로 SimpleMessageListenerContainer setMissingQueuesFatal은 true로 설정되며 3 회만 수행됩니다.

    setRequestedHeartBeat를 ConnectionFactory로 설정하고 setMissingQueuesFatal (false)를 SimpleMessageListenerContainer로 설정하여 무기한으로 연결을 재 시도하십시오. 기본적으로 SimpleMessageListenerContainer setMissingQueuesFatal은 true로 설정되며 3 회만 수행됩니다.

      @Bean
      public ConnectionFactory connectionFactory() {
        final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(getHost(), getPort());
        connectionFactory.setUsername(getUsername());
        connectionFactory.setPassword(getPassword());
        connectionFactory.setVirtualHost(getVirtualHost());
        connectionFactory.setRequestedHeartBeat(30);
        return connectionFactory;
      }
    
      @Bean
      public SimpleMessageListenerContainer listenerContainerCopernicusErrorQueue() {
        final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueueNames(myQueue().getName());
        container.setMessageListener(messageListenerAdapterQueue());
        container.setDefaultRequeueRejected(false);
        container.setMissingQueuesFatal(false);
        return container;
      }
    
  4. from https://stackoverflow.com/questions/27977008/spring-amqp-v1-4-2-rabbit-reconnection-issue-on-network-failure by cc-by-sa and MIT license