[SPRING] Spring 통합 - 대기열 / 폴러는 아무런 조치없이 스레드 풀을 다 소모하는 것 같습니다.
SPRINGSpring 통합 - 대기열 / 폴러는 아무런 조치없이 스레드 풀을 다 소모하는 것 같습니다.
AMQP 중개인에 첨부 된 Spring 통합 응용 프로그램이 있습니다.
amqp-queue에서 메시지를 받고 DB 레코드를 업데이트하려고합니다.
성능을 향상시키기 위해 여러 업데이트가 동시에 발생할 수있는 작업자 풀이 있습니다.
나는 다음과 같은 구성을 가지고있다 :
<int-amqp:inbound-channel-adapter queue-names="pricehub.fixtures.priceUpdates.queue"
channel="pricehub.fixtures.priceUpdates.channel"
message-converter="jsonMessageConverter"/>
<int:channel id="pricehub.fixtures.priceUpdates.channel">
<int:queue />
</int:channel>
<int:service-activator ref="updatePriceAction"
method="updatePrices"
input-channel="pricehub.instruments.priceUpdates.channel">
<int:poller fixed-delay="50" time-unit="MILLISECONDS" task-executor="taskExecutor" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="5-50" keep-alive="120" queue-capacity="500"/>
AMQP 채널에서 처리 할 인바운드 메시지가없는이 실행을 시작할 경우, 나는 신속하게 쓰레드 풀이 고갈 된 것을보고 거부를 시작합니다.
여기에 로그가 있습니다.
[Thu Apr 2013 23:41:51.153] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-w4qPp60jVEQOIEovR4cERv], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.160] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-Q3Lq4R9g9E8WBNVLYzaFmq], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.166] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-w8bg7ltEV2mot8QXDPCmfK], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.170] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-A-0KdqhFjpc-Hvjmv7aZAc], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,4), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.180] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.180] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.199] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.200] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.220] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
꽤 빨리 스레드 풀이 실행을 거부하기 시작합니다.
[Thu Apr 2013 23:47:15.363] ERROR [] (org.springframework.integration.handler.LoggingHandler:126) - org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6ff3cb0e] did not accept task: org.springframework.integration.util.ErrorHandlingTaskExecutor$1@78615c8b
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:244)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:231)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)
Caused by: java.util.concurrent.RejectedExecutionException
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1768)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658)
at org.springframework.sched
uling.concurrent.ThreadPoolTaskExecutor.execute (ThreadPoolTaskExecutor.java:241) ... 12 자세히
나는 그 범인이 여기에 있다고 의심한다 : BlockingQueueConsumer - 메시지가 도착할 때까지 메시지에 대한 각 폴링이 스레드를 차단한다는 것을 나타내는 ... 스레드 풀이 빨리 고갈된다.
이것을 구성하는 올바른 방법은 무엇입니까?
해결법
-
==============================
1.QueueChannel 및 폴러를 사용하는 대신 인바운드 어댑터의 concurrent-consumers 특성을 단순히 늘리지 않는 이유는 무엇입니까?
QueueChannel 및 폴러를 사용하는 대신 인바운드 어댑터의 concurrent-consumers 특성을 단순히 늘리지 않는 이유는 무엇입니까?
<xsd:attribute name="concurrent-consumers" type="xsd:string"> <xsd:annotation> <xsd:documentation> Specify the number of concurrent consumers to create. Default is 1. Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume queues. </xsd:documentation> </xsd:annotation> </xsd:attribute>
그리고
및 를 제거하십시오. 또한 항상 로그에 스레드 이름을 포함하는 것을 권장합니다 (log4J의 경우 % t). 스레딩 문제를 쉽게 디버그 할 수 있습니다.
편집하다:
폴러에서 스레드가 부족한 이유는 폴러의 기본 수신 시간 제한이 1 초이기 때문입니다. 스레드를 50ms마다 스케줄링하지만, 각 스레드는 QueueChannel에서 1 초 동안 대기합니다. 결국 귀하의 작업 대기열이 가득 찼습니다.
이를 방지하려면이 기술을 계속 사용하려면
에서 receive-timeout을 0으로 설정하십시오. 그러나 다른 스레드에 대한 폴링이나 핸드 오버가 없기 때문에 어댑터에서 더 높은 동시성을 사용하는 것이 더 효율적입니다. -
==============================
2.amqp-inbound 대기열 (pub / sub 스타일 대기열)과 queue-channel 사이를 연결하는 브리지가 필요합니다.
amqp-inbound 대기열 (pub / sub 스타일 대기열)과 queue-channel 사이를 연결하는 브리지가 필요합니다.
<int-amqp:inbound-channel-adapter queue-names="pricehub.fixtures.priceUpdates.queue" channel="pricehub.fixtures.priceUpdates.subpub" message-converter="jsonMessageConverter"/> <int:publish-subscribe-channel id="pricehub.fixtures.priceUpdates.subpub" /> <int:bridge input-channel="pricehub.fixtures.priceUpdates.subpub" output-channel="pricehub.fixtures.priceUpdates.channel" /> <int:channel id="pricehub.fixtures.priceUpdates.channel"> <int:queue /> </int:channel> <int:service-activator ref="updatePriceAction" method="updatePrices" input-channel="pricehub.instruments.priceUpdates.channel"> <int:poller fixed-delay="50" time-unit="MILLISECONDS" task-executor="taskExecutor" /> </int:service-activator> <task:executor id="taskExecutor" pool-size="5-50" keep-alive="120" queue-capacity="500"/>
상당히 간단한 작업을 수행하는 데 많은 코드가 필요한 것 같습니다. 개선 된 솔루션이나 제안이 있다면 누구나 볼 수 있습니다.
from https://stackoverflow.com/questions/15811831/spring-integration-queue-poller-seems-to-exhaust-threadpool-without-any-action by cc-by-sa and MIT license
'SPRING' 카테고리의 다른 글
[SPRING] JSTL 및 Spring : 인수를 사용하여 메소드 액세스 (0) | 2019.02.24 |
---|---|
[SPRING] 경로의 시작 부분과 일치하는 antMatcher (0) | 2019.02.24 |
[SPRING] ajax spring mvc에서 ModelAndView 리턴하기 (0) | 2019.02.24 |
[SPRING] hibernate-validator의 예외. 상자의 예외는 java.lang.NoClassDefFoundError입니다 : ConfigurationImpl (0) | 2019.02.24 |
[SPRING] JSON 시리얼 라이저에서 지연로드 오류 (0) | 2019.02.24 |