복붙노트

[SPRING] Spring 통합 - 대기열 / 폴러는 아무런 조치없이 스레드 풀을 다 소모하는 것 같습니다.

SPRING

Spring 통합 - 대기열 / 폴러는 아무런 조치없이 스레드 풀을 다 소모하는 것 같습니다.

AMQP 중개인에 첨부 된 Spring 통합 응용 프로그램이 있습니다.

amqp-queue에서 메시지를 받고 DB 레코드를 업데이트하려고합니다.

성능을 향상시키기 위해 여러 업데이트가 동시에 발생할 수있는 작업자 풀이 있습니다.

나는 다음과 같은 구성을 가지고있다 :

<int-amqp:inbound-channel-adapter queue-names="pricehub.fixtures.priceUpdates.queue" 
                            channel="pricehub.fixtures.priceUpdates.channel"
                            message-converter="jsonMessageConverter"/>

<int:channel id="pricehub.fixtures.priceUpdates.channel">
    <int:queue  />
</int:channel>

<int:service-activator ref="updatePriceAction" 
     method="updatePrices" 
     input-channel="pricehub.instruments.priceUpdates.channel">
    <int:poller fixed-delay="50" time-unit="MILLISECONDS" task-executor="taskExecutor" />
</int:service-activator>

<task:executor id="taskExecutor" pool-size="5-50" keep-alive="120" queue-capacity="500"/>

AMQP 채널에서 처리 할 인바운드 메시지가없는이 실행을 시작할 경우, 나는 신속하게 쓰레드 풀이 고갈 된 것을보고 거부를 시작합니다.

여기에 로그가 있습니다.

[Thu Apr 2013 23:41:51.153] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-w4qPp60jVEQOIEovR4cERv], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.160] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-Q3Lq4R9g9E8WBNVLYzaFmq], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.166] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-w8bg7ltEV2mot8QXDPCmfK], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.170] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-A-0KdqhFjpc-Hvjmv7aZAc], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,4), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.180] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.180] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.199] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.200] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.220] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'

꽤 빨리 스레드 풀이 실행을 거부하기 시작합니다.

[Thu Apr 2013 23:47:15.363] ERROR [] (org.springframework.integration.handler.LoggingHandler:126) - org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6ff3cb0e] did not accept task: org.springframework.integration.util.ErrorHandlingTaskExecutor$1@78615c8b
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:244)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:231)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:680)
Caused by: java.util.concurrent.RejectedExecutionException
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1768)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658)
    at org.springframework.sched

uling.concurrent.ThreadPoolTaskExecutor.execute (ThreadPoolTaskExecutor.java:241)     ... 12 자세히

나는 그 범인이 여기에 있다고 의심한다 : BlockingQueueConsumer - 메시지가 도착할 때까지 메시지에 대한 각 폴링이 스레드를 차단한다는 것을 나타내는 ... 스레드 풀이 빨리 고갈된다.

이것을 구성하는 올바른 방법은 무엇입니까?

해결법

  1. ==============================

    1.QueueChannel 및 폴러를 사용하는 대신 인바운드 어댑터의 concurrent-consumers 특성을 단순히 늘리지 않는 이유는 무엇입니까?

    QueueChannel 및 폴러를 사용하는 대신 인바운드 어댑터의 concurrent-consumers 특성을 단순히 늘리지 않는 이유는 무엇입니까?

        <xsd:attribute name="concurrent-consumers" type="xsd:string">
            <xsd:annotation>
                <xsd:documentation>
    Specify the number of concurrent consumers to create. Default is 1.
    Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in
    from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In
    general, stick with 1 consumer for low-volume queues.
                </xsd:documentation>
            </xsd:annotation>
        </xsd:attribute>
    

    그리고 를 제거하십시오.

    또한 항상 로그에 스레드 이름을 포함하는 것을 권장합니다 (log4J의 경우 % t). 스레딩 문제를 쉽게 디버그 할 수 있습니다.

    편집하다:

    폴러에서 스레드가 부족한 이유는 폴러의 기본 수신 시간 제한이 1 초이기 때문입니다. 스레드를 50ms마다 스케줄링하지만, 각 스레드는 QueueChannel에서 1 초 동안 대기합니다. 결국 귀하의 작업 대기열이 가득 찼습니다.

    이를 방지하려면이 기술을 계속 사용하려면 에서 receive-timeout을 0으로 설정하십시오. 그러나 다른 스레드에 대한 폴링이나 핸드 오버가 없기 때문에 어댑터에서 더 높은 동시성을 사용하는 것이 더 효율적입니다.

  2. ==============================

    2.amqp-inbound 대기열 (pub / sub 스타일 대기열)과 queue-channel 사이를 연결하는 브리지가 필요합니다.

    amqp-inbound 대기열 (pub / sub 스타일 대기열)과 queue-channel 사이를 연결하는 브리지가 필요합니다.

    <int-amqp:inbound-channel-adapter queue-names="pricehub.fixtures.priceUpdates.queue" 
                                channel="pricehub.fixtures.priceUpdates.subpub"
                                message-converter="jsonMessageConverter"/>
    
    <int:publish-subscribe-channel id="pricehub.fixtures.priceUpdates.subpub" />
    <int:bridge input-channel="pricehub.fixtures.priceUpdates.subpub" 
        output-channel="pricehub.fixtures.priceUpdates.channel" />
    
    <int:channel id="pricehub.fixtures.priceUpdates.channel">
        <int:queue  />
    </int:channel>
    
    <int:service-activator ref="updatePriceAction" 
         method="updatePrices" 
         input-channel="pricehub.instruments.priceUpdates.channel">
        <int:poller fixed-delay="50" time-unit="MILLISECONDS" task-executor="taskExecutor" />
    </int:service-activator>
    
    <task:executor id="taskExecutor" pool-size="5-50" keep-alive="120" queue-capacity="500"/>
    

    상당히 간단한 작업을 수행하는 데 많은 코드가 필요한 것 같습니다. 개선 된 솔루션이나 제안이 있다면 누구나 볼 수 있습니다.

  3. from https://stackoverflow.com/questions/15811831/spring-integration-queue-poller-seems-to-exhaust-threadpool-without-any-action by cc-by-sa and MIT license