스프링 부츠를 사용한 심플한 임베디드 Kafka 테스트 예시
인터넷을 검색했지만 내장된 Kafka 테스트의 기능적이고 간단한 예를 찾을 수 없었습니다.
설정은 다음과 같습니다.
- 스프링 부츠
- 하나의 클래스에서 다른 토픽을 가진 여러 @KafkaListener
- 테스트용 임베디드 카프카(시작이 양호함)
- 토픽으로 전송되는 Kafkatemplate를 사용하여 테스트합니다만, @KafkaListener 메서드는 sleep 시간이 많이 지났는데도 아무것도 수신하지 않습니다.
- 경고나 오류는 표시되지 않으며 로그에는 Kafka로부터의 스팸 정보만 표시됩니다.
제발 도와주세요.대부분 과잉 설정 또는 과잉 행의 예가 있습니다.나는 그것이 간단하다고 확신한다.고마워요, 여러분!
@Controller
public class KafkaController {
private static final Logger LOG = getLogger(KafkaController.class);
@KafkaListener(topics = "test.kafka.topic")
public void receiveDunningHead(final String payload) {
LOG.debug("Receiving event with payload [{}]", payload);
//I will do database stuff here which i could check in db for testing
}
}
프라이빗 스태틱 문자열 SENSER_토픽 = "test.kafka.http";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
@Test
public void testSend() throws InterruptedException, ExecutionException {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
Thread.sleep(10000);
}
임베디드형 Kafka 테스트는 아래 구성에서 작동합니다.
테스트 클래스에 대한 주석
@EnableKafka
@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false,
brokerProperties = {
"listeners=PLAINTEXT://localhost:3333",
"port=3333"
})
public class KafkaConsumerTest {
@Autowired
KafkaEmbedded kafkaEmbeded;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
설정 방법에 대한 주석 전
@Before
public void setUp() throws Exception {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
kafkaEmbeded.getPartitionsPerTopic());
}
}
주의: 사용하고 있지 않습니다.@ClassRule자동 배선이 아닌 임베디드형 Kafka를 만듭니다.
@Autowired embeddedKafka
@Test
public void testReceive() throws Exception {
kafkaTemplate.send(topic, data);
}
이게 도움이 됐으면 좋겠네요!
Edit: 테스트컨피규레이션클래스에 마크가 붙어 있습니다.@TestConfiguration
@TestConfiguration
public class TestConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(topic);
return kafkaTemplate;
}
지금이다@Test메서드는 KafkaTemplate를 자동 배선하고 메시지를 보내는 데 사용됩니다.
kafkaTemplate.send(topic, data);
위 행으로 응답 코드 블록 업데이트
받아들여진 답변이 컴파일되지 않거나 저에게 효과가 없기 때문입니다.https://blog.mimacom.com/testing-apache-kafka-with-spring-boot/을 기반으로 다른 솔루션을 찾아 공유하고자 합니다.
종속성은 '스프링-카프카-테스트' 버전 '2.2.7'입니다.해방'
@RunWith(SpringRunner.class)
@EmbeddedKafka(partitions = 1, topics = { "testTopic" })
@SpringBootTest
public class SimpleKafkaTest {
private static final String TEST_TOPIC = "testTopic";
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;
@Test
public void testReceivingKafkaEvents() {
Consumer<Integer, String> consumer = configureConsumer();
Producer<Integer, String> producer = configureProducer();
producer.send(new ProducerRecord<>(TEST_TOPIC, 123, "my-test-value"));
ConsumerRecord<Integer, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TEST_TOPIC);
assertThat(singleRecord).isNotNull();
assertThat(singleRecord.key()).isEqualTo(123);
assertThat(singleRecord.value()).isEqualTo("my-test-value");
consumer.close();
producer.close();
}
private Consumer<Integer, String> configureConsumer() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<Integer, String> consumer = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps)
.createConsumer();
consumer.subscribe(Collections.singleton(TEST_TOPIC));
return consumer;
}
private Producer<Integer, String> configureProducer() {
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
}
}
나는 지금 문제를 해결했다.
@BeforeClass
public static void setUpBeforeClass() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}
디버깅을 하다가 임베디드 kaka 서버가 랜덤 포트를 사용하고 있는 것을 보았습니다.
설정을 찾을 수 없어서 서버와 동일하게 kafka 설정을 하고 있습니다.아직 좀 못생긴 것 같은데
@Mayur가 언급한 라인만 있으면 좋겠습니다.
@EmbeddedKafka(partitions = 1, controlledShutdown = false, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
인터넷에서 올바른 의존관계를 찾을 수 없습니다.
통합 테스트에서는 9092와 같은 고정 포트를 사용하는 것은 권장되지 않습니다.여러 테스트에서는 임베디드 인스턴스에서 자체 포트를 열 수 있는 유연성이 필요하기 때문입니다.구현은 다음과 같습니다.
NB: 이 실장은 junit5(Jupiter: 5.7.0) 및 spring-boot 2.3.4를 기반으로 합니다.풀어주다
테스트 클래스:
@EnableKafka
@SpringBootTest(classes = {ConsumerTest.Config.class, Consumer.class})
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ConsumerTest {
@Autowired
private EmbeddedKafkaBroker kafkaEmbedded;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@BeforeAll
public void setUp() throws Exception {
for (final MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
kafkaEmbedded.getPartitionsPerTopic());
}
}
@Value("${topic.name}")
private String topicName;
@Autowired
private KafkaTemplate<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestKafkaTemplate;
@Test
public void consume_success() {
requestKafkaTemplate.send(topicName, load);
}
@Configuration
@Import({
KafkaListenerConfig.class,
TopicConfig.class
})
public static class Config {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestProducerFactory() {
final Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestKafkaTemplate() {
return new KafkaTemplate<>(requestProducerFactory());
}
}
}
수신기 클래스:
@Component
public class Consumer {
@KafkaListener(
topics = "${topic.name}",
containerFactory = "listenerContainerFactory"
)
@Override
public void listener(
final ConsumerRecord<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> consumerRecord,
final @Payload Optional<Map<String, List<ImmutablePair<String, String>>>> payload
) {
}
}
리스트너 설정:
@Configuration
public class KafkaListenerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${topic.name}")
private String resolvedTreeQueueName;
@Bean
public ConsumerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> resolvedTreeConsumerFactory() {
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, resolvedTreeQueueName);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new CustomDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> resolvedTreeListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(resolvedTreeConsumerFactory());
return factory;
}
}
토픽 설정:
@Configuration
public class TopicConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${topic.name}")
private String requestQueue;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic requestTopic() {
return new NewTopic(requestQueue, 1, (short) 1);
}
}
application.properties:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
이 할당은 임베디드 인스턴스 포트를 KafkaTemplate 및 KafkaListners에 바인드하는 가장 중요한 할당입니다.
위의 구현 후에는 테스트클래스별로 다이내믹포트를 열 수 있어 편리합니다.
언급URL : https://stackoverflow.com/questions/48753051/simple-embedded-kafka-test-example-with-spring-boot
'sourcecode' 카테고리의 다른 글
| 하위 테마에서 Wordpress 테마 옵션을 제거하는 방법 (0) | 2023.02.18 |
|---|---|
| '가 Angular 컴포넌트인 경우 이 모듈의 일부인지 확인합니다. (0) | 2023.02.15 |
| 스프링: @ResponseBody "ResponseEntity"를 반환합니다. (0) | 2023.02.15 |
| TypeScript 정적 클래스 (0) | 2023.02.15 |
| 2개의 JSON 객체 비교 (0) | 2023.02.15 |