복붙노트

[SPRING] Spring Cloud Dataflow Type 변환이 프로세서 구성 요소에서 작동하지 않습니까?

SPRING

Spring Cloud Dataflow Type 변환이 프로세서 구성 요소에서 작동하지 않습니까?

byte [] 페이로드를 MyClass 페이로드로 변환하는 프로세서가 있습니다.

@Slf4j
@EnableBinding(Processor.class)
public class MyDecoder {

    @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public MyClass decode(final byte[] payload) {
        MyClass decoded = doStuff(payload);
        if (decoded != null) {
            log.info("Successfully decoded!");
        }

        return decoded;
    }
}

나는 다음과 같은 DSL을 만들려고했다 : some-source | 내 디코더 | some-sink 및 some-sink는 classLoader에 MyClass 클래스가 없으므로 오류를보고합니다. 이는 예상되는 동작입니다.

내 디코더에서 형식 변환을 적용 해 보았습니다. some-source | 내 디코더 --spring.cloud.stream.bindings.output.contentType = application / json | 일부 싱크 및 내 디코더 로그에 다음과 같은 오류가 나타납니다.

2017-01-20 21:45:17.278  INFO 9408 --- [afka-listener-2] com.example.MyDecoder  : Successfully decoded!
2017-01-20 21:45:18.441  INFO 9408 --- [afka-listener-2] com.example.MyDecoder  : Successfully decoded!
2017-01-20 21:45:20.512  INFO 9408 --- [afka-listener-2] com.example.MyDecoder  : Successfully decoded!
2017-01-20 21:45:20.515 ERROR 9408 --- [afka-listener-2] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = example.some-source, partition = 0, offset = 1, key = null, value = [B@7dfad000)

org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalArgumentException: payload must not be null
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$000(KafkaMessageDrivenChannelAdapter.java:47) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:197) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:76) ~[spring-kafka-1.0.4.RELEASE.jar!/:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.4.RELEASE.jar!/:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:276) ~[spring-retry-1.1.5.RELEASE.jar!/:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:172) ~[spring-retry-1.1.5.RELEASE.jar!/:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.4.RELEASE.jar!/:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:597) [spring-kafka-1.0.4.RELEASE.jar!/:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1800(KafkaMessageListenerContainer.java:222) [spring-kafka-1.0.4.RELEASE.jar!/:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:778) [spring-kafka-1.0.4.RELEASE.jar!/:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_111]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_111]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]
Caused by: java.lang.IllegalArgumentException: payload must not be null
    at org.springframework.util.Assert.notNull(Assert.java:115) ~[spring-core-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
    at org.springframework.integration.support.MutableMessage.<init>(MutableMessage.java:57) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.support.MutableMessage.<init>(MutableMessage.java:53) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.support.MutableMessageBuilder.withPayload(MutableMessageBuilder.java:86) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:35) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:26) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:194) ~[spring-cloud-stream-1.1.0.RELEASE.jar!/:1.1.0.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
    ... 42 common frames omitted

메시지가 byte []에서 MyClass로 변환되었으며 null이 아님을 알 수 있습니다. 나는 왜 kafka 속성 '재시도'가 내 디코더 로그에 처음 시작될 때 0이므로이 메시지가 실패하기 전에 3 번 메시지가 표시되는지 이해할 수 없습니다.

2017-01-20 21:44:32.080  INFO 9408 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    compression.type = none
    metric.reporters = []
    metadata.max.age.ms = 300000
    metadata.fetch.timeout.ms = 60000
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [localhost:9092]
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.keystore.type = JKS
    ssl.trustmanager.algorithm = PKIX
    block.on.buffer.full = false
    ssl.key.password = null
    max.block.ms = 60000
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.truststore.password = null
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    client.id = 
    ssl.endpoint.identification.algorithm = null
    ssl.protocol = TLS
    request.timeout.ms = 30000
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    acks = 1
    batch.size = 16384
    ssl.keystore.location = null
    receive.buffer.bytes = 32768
    ssl.cipher.suites = null
    ssl.truststore.type = JKS
    security.protocol = PLAINTEXT
    retries = 0
    max.request.size = 1048576
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    ssl.truststore.location = null
    ssl.keystore.password = null
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    send.buffer.bytes = 131072
    linger.ms = 0

나는 통합 테스트를 쓰려고 노력했다.

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@DirtiesContext
public abstract class MyDecoderTests {

    @Autowired
    protected Processor channels;

    @Autowired
    protected MessageCollector collector;

    public static class UsingNothingIntegrationTests extends MyDecoderTest {

        @Test
        public void test() throws Exception {
            channels.input().send(new GenericMessage<Object>(Hex.decodeHex("ff".toCharArray())));
            assertThat(collector.forChannel(channels.output()), receivesPayloadThat(instanceOf(MyClass.class)));
        }
    }

    @SpringBootTest("spring.cloud.stream.bindings.output.contentType=application/json")
    public static class UsingOutputConverterIntegrationTests extends MyDecoderTest {

        @Test
        public void test() throws Exception {
            channels.input().send(new GenericMessage<Object>(Hex.decodeHex("ff".toCharArray())));
            assertThat(collector.forChannel(channels.output()), receivesPayloadThat(is("{\"example\": true\"}")));
        }
    }

    @Configuration
    @EnableAutoConfiguration
    @Import(MyDecoderConfiguration.class)
    public static class MyDecoderTestApplication {

    }
}

테스트가 성공적으로 완료되면 변환이 수행됩니다.

그런 다음 DSL이 올바르지 않아서 테스트 할 새 소스를 작성했다고 생각했습니다.

@Bean
@InboundChannelAdapter(Source.OUTPUT)
public MessageSource<MyClass> exampleSource() {
    return () -> new GenericMessage<>(getMyClassObject());
}

그리고 다음 DSL은 MyClass를 예상대로 JSON으로 변환합니다. my-source --spring.cloud.stream.bindings.output.contentType = application / json | 약간의 가라 앉다

왜 디코딩에 대한 메시지가 3 번 기록되고 왜 '페이로드가 null이 아니어야합니다'라는 메시지가 나타나지 않는 이유는 무엇입니까? 제 프로세서에서 뭔가요?

해결법

  1. ==============================

    1.바인더의 입력 채널에 대한 기본 재 시도 구성이기 때문에 3 회의 시도가 표시됩니다.

    바인더의 입력 채널에 대한 기본 재 시도 구성이기 때문에 3 회의 시도가 표시됩니다.

    변환기가 메시지를 변환 할 수 없으면 널 페이로드로 메시지를 작성하려고 시도하는 바인더에 버그가 있습니다.

    페이로드를 변환 할 수없는 이유는 인바운드 콘텐츠 유형 (아마도 애플리케이션 / 옥텟 스트림)을보고 JSON에서 변환 할 수 없기 때문입니다.

    해결 방법은 classpath에 파일을 추가하는 것입니다.

    META-INF/spring.integration.properties
    

    추가

    spring.integration.readOnly.headers=contentType
    

    그것에.

    이렇게하면 인바운드 콘텐츠 형식 헤더가 아웃 바운드 메시지로 전파되지 않습니다.

    스프링 통합 4.3.2 이상이 필요합니다.

    SCSt의 향후 릴리스에서이 값은 기본적으로 설정됩니다.

  2. from https://stackoverflow.com/questions/41781351/spring-cloud-dataflow-type-conversion-not-working-in-processor-component by cc-by-sa and MIT license