복붙노트

[SPRING] DefaultMessageListenerContainer가 크기 조절되지 않음

SPRING

DefaultMessageListenerContainer가 크기 조절되지 않음

나는 (내 의견으로는) 스케일 업하지 않는 DefaultMessageListenerContainer를 가지고있다. 컨테이너는 100 개의 메시지가있는 대기열에서 수신 대기하도록 정의됩니다.

Container는 어떤 길이로도 갈 것이며, 가능한 한 빨리 메시지가 소비 될 것이라고 기대할 것입니다 (maxConcurrentConsumers 구성을 관찰함으로써). 그래서 나는 7 명의 동시 소비자가 있다고 가정 할 것이다. (컨테이너 시동시 동시 concurrentConsumers로 시작) 일부 로깅 정보 :

activeConsumerCount: 5
concurrentConsumers: 2
scheduledConsumerCount: 5
idleConsumerLimit: 1
idleTaskExecLimit: 1
maxConcurrentConsumers: 7

내 Spring-config (일부) :

<bean id="abstractMessageListenerContainer" class="my.package.structure.LoggingListenerContainer" abstract="true">
    <property name="connectionFactory" ref="jmscfCee" />
    <property name="maxConcurrentConsumers" value="7"/>
    <property name="receiveTimeout" value="100000" />
    <property name="concurrentConsumers" value="2" />
</bean>

<bean class="my.package.structure.LoggingListenerContainer" parent="abstractMessageListenerContainer">
    <property name="destinationName" value="MY.QUEUE" />
    <property name="messageListener" ref="myMessageListener" />
</bean>

<bean id="myMessageListener" class="my.package.structure.ListenerClass"></bean>

내 로깅 컨테이너

public class LoggingListenerContainer extends DefaultMessageListenerContainer{

private static final Logger logger = Logger
        .getLogger(LoggingListenerContainer.class);
@Override
protected void doInvokeListener(MessageListener listener, Message message)
        throws JMSException {

    logger.info("activeConsumerCount: " + this.getActiveConsumerCount());
    logger.info("concurrentConsumers: " +  this.getConcurrentConsumers());
    logger.info("scheduledConsumerCount: " + this.getScheduledConsumerCount());
    logger.info("idleConsumerLimit: " + this.getIdleConsumerLimit());
    logger.info("idleTaskExecLimit: " + this.getIdleTaskExecutionLimit());
    logger.info("maxConcurrentConsumers: " + this.getMaxConcurrentConsumers());
    super.doInvokeListener(listener, message);
}

내 리스너 클래스 :

public class ListenerClass implements MessageListener {


    public void onMessage(Message msg) {
           //Do some business function
    }

}

누군가 내 구성을 수정하거나 내 구성과 관련하여 몇 가지 팁을 주거나 Container의 접근 방식을 설명 할 수 있습니까? (내가 뭔가 잘못 이해했다면)

필자는 확장 성 주제와 관련된 ActiveMQ (WebSphere MQ 프로덕션)를 로컬로 테스트하고 있습니다.

편집하다:

<bean id="jmscfCee" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL">
            <value>${jmscfCee.hostName}</value>
        </property>
</bean>

<bean id="jmscfCeeCachingConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory ">
    <constructor-arg ref="jmscfCee" />
    <property name="sessionCacheSize" value="10" />
</bean>

해결법

  1. ==============================

    1.그것은 달려있다. 나는 몇 년 전 ActiveMQ에서 비슷한 문제를 겪었습니다. 즉, 기본 동작이 작은 볼륨의 많은 볼륨 (수천 가지)을 대상으로 크게 악화되었습니다. 기본적으로 각 소비자는 1000 개의 일괄 처리로 메시지를 미리 가져 오므로 소량의 메시지가있는 경우 한 소비자의 프리 페치 버퍼에서 모든 메시지가 종료되어 다른 소비자를 유휴 상태로 유지하게됩니다.

    그것은 달려있다. 나는 몇 년 전 ActiveMQ에서 비슷한 문제를 겪었습니다. 즉, 기본 동작이 작은 볼륨의 많은 볼륨 (수천 가지)을 대상으로 크게 악화되었습니다. 기본적으로 각 소비자는 1000 개의 일괄 처리로 메시지를 미리 가져 오므로 소량의 메시지가있는 경우 한 소비자의 프리 페치 버퍼에서 모든 메시지가 종료되어 다른 소비자를 유휴 상태로 유지하게됩니다.

    프리 페치 정책을 사용하여이 동작을 조정할 수 있습니다. 연결 페치 (connection factory)를 작성하는 방법 인 경우 연결 URI 또는 ​​Spring 구성에서 프리 페치 정책을 사용하십시오.

    <amq:connectionFactory id="connectionFactory" brokerURL="vm://localhost">
      <property name="prefetchPolicy">
        <amq:prefetchPolicy all="1" />
      </property>
    </amq:connectionFactory>
    

    당시 사용하고 있던 ActiveMQ의 버전은 프리 페치 제한 인 0을 지원하지 않았습니다 (예 : 프리 페치를하지 않고 매번 브로커로 이동). 그러나 이제는 허용 된 문서가 제시됩니다.

  2. from https://stackoverflow.com/questions/12583161/defaultmessagelistenercontainer-not-scaling by cc-by-sa and MIT license