반응형
파티션 키/값을 역직렬화하는 동안 Kafka 오류가 발생했습니다.
키 없이 카프카 주제로 보낼 때 통과하는 통합 테스트가 있습니다.그러나 키를 추가하면 직렬화 오류가 발생합니다.
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic-1 at offset 0
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
보낸 사람 클래스입니다.
public class Sender {
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send() {
String topic = "topic";
String data = "data";
String key = "key";
LOG.info("sending to topic: '{}', key: '{}', data: '{}'", topic, key, data);
// does not work
kafkaTemplate.send(topic, key, data);
// works
// kafkaTemplate.send(topic, data);
}
}
키에 StringSerializer를 지정하는 구성입니다.
@Configuration
@EnableKafka
public class Config {
@Bean
public Sender sender() {
return new Sender();
}
@Bean
public Properties properties() {
return new Properties();
}
@Bean
public Map<String, Object> producerConfigs(Properties properties) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory(Properties properties) {
return new DefaultKafkaProducerFactory<>(producerConfigs(properties));
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(Properties properties) {
return new KafkaTemplate<>(producerFactory(properties));
}
}
이 문제는 내 테스트에서 메시지 수신기와 관련이 있을 수 있지만, 그것도 전체적으로 문자열을 사용하는 것입니다.
@RunWith(SpringRunner.class)
@SpringBootTest()
@DirtiesContext
public class SenderIT {
public static final Logger LOG = LoggerFactory.getLogger(SenderIT.class);
private static String SENDER_TOPIC = "topic";
@Autowired
private Sender sender;
private KafkaMessageListenerContainer<String, String> container;
private BlockingQueue<ConsumerRecord<String, String>> records;
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
@Before
public void setUp() throws Exception {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties =
KafkaTestUtils.consumerProps("sender", "false", embeddedKafka);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// create a Kafka consumer factory
DefaultKafkaConsumerFactory<String, String> consumerFactory =
new DefaultKafkaConsumerFactory<String, String>(consumerProperties);
// set the topic that needs to be consumed
ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);
// create a Kafka MessageListenerContainer
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
// create a thread safe queue to store the received message
records = new LinkedBlockingQueue<>();
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
LOG.debug("test-listener received message='{}'", record.toString());
records.add(record);
}
});
// start the container and underlying message listener
container.start();
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
}
@After
public void tearDown() {
// stop the container
container.stop();
}
@Test
public void test() throws InterruptedException {
sender.send();
// check that the message was received in Kafka
ConsumerRecord<String, String> kafkaTopicMsg = records.poll(10, TimeUnit.SECONDS);
LOG.debug("kafka recieved = {}", kafkaTopicMsg);
assertThat(kafkaTopicMsg).isNotNull();
}
}
언제나처럼, 어떤 도움이라도 주시면 감사하겠습니다.
재생할 모든 코드는 https://github.com/LewisWatson/kafka-embedded-test/tree/8322621ad4e302d982e5ecd28af9fd314696d850 에서 확인할 수 있습니다.
전체 스택 추적은 https://travis-ci.org/LewisWatson/kafka-embedded-test/builds/273227986 에서 사용할 수 있습니다.
로그를 자세히 조사한 후 문제를 테스트 메시지 수신기로 좁힐 수 있었습니다.
2017-09-08 09:30:06.845 ERROR 2550 --- [ -C-1] essageListenerContainer$ListenerConsumer : Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic-1 at offset 0
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
https://travis-ci.org/LewisWatson/kafka-embedded-test/builds/273227986#L2961
왠지 키가 정수가 될 것으로 예상하는 것 같습니다.
소비자 공장에 대해 문자열 역직렬화기를 명시적으로 설정하여 문제를 해결했습니다.
// create a Kafka consumer factory
DefaultKafkaConsumerFactory<String, String> consumerFactory =
new DefaultKafkaConsumerFactory<String, String>(consumerProperties,
new StringDeserializer(), new StringDeserializer());
언급URL : https://stackoverflow.com/questions/46113928/kafka-error-deserializing-key-value-for-partition
반응형
'sourcecode' 카테고리의 다른 글
NLTK python 오류: "TypeError: 'dict_keys' 개체를 구독할 수 없습니다." (0) | 2023.06.22 |
---|---|
사용자 지정 ApplicationContextInitializer를 스프링 부팅 응용 프로그램에 추가하는 방법은 무엇입니까? (0) | 2023.06.22 |
스프링 프로파일, 다양한 Log4j2 구성 (0) | 2023.06.22 |
.NET을 사용하여 Oracle에 대량 삽입 (0) | 2023.06.22 |
패키지의 기존 상태가 삭제되었습니다. (0) | 2023.06.22 |