Java로 간단한 Apache Kafka Producer/Consumer 클라이언트 만들기
여기서는 앞에서 만든 카프카 서버를 활용해서 Java로 간단한 카프카 프로듀서와 컨슈머 클라이언트를 만들어 보도록 하겠다.
Kafka Producer 프로젝트 생성
카프카 프로젝트를 IDE를 통해서 프로듀셔 프로젝트 생성한다.
Kafka Producer 빌드 스크립트
빌드 스크립트는 아래와 같이 설정한다. build.gradle
plugins {
id 'java'
}
group 'com.devkuma'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.apache.kafka:kafka-clients:2.8.1' // (1)
implementation 'org.slf4j:slf4j-simple:1.7.36' // (2)
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}
test {
useJUnitPlatform()
}
(1) 카프카 클라이언트 라이브러리를 사용하기 위한 의존성 설정. 여기서는 2.8.1 버전의 클라이언트를 설정하였다. (2) 카프카 클라이언트의 로그를 확인하기 위해 slf4j 구현체 라이브러리를 추가하였다. slf4j의 api는 이미 카프카 라이브러리에 추가되어 있다.
Kafka Producer 프로젝트 코드
이제 실제 프로듀서 코드를 작성해 보자. src/main/java/com/devkuma/kafka/SimpleKafkaProducer.java
package com.devkuma.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Scanner;
public class SimpleKafkaProducer {
private final static String TOPIC_NAME = "devkuma-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(properties);
while (true) {
Scanner sc = new Scanner(System.in);
System.out.print("> ");
String message = sc.nextLine();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
try {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
}
});
} finally {
producer.flush();
}
if (message.equals("/quit")) {
producer.close();
break;
}
}
}
}
Properties 클래스를 사용해서 옵셥값을 설정하여 Kafka Producer 인스턴스를 초기화시키고 있다. 관련된 설정 key값은 ProducerConfig 클래스에서 제공되니 필요한 옵션값은 이 통해서 설정할 수 있다.
코드에서 간단히 설정한 값에 설명을 아래와 같다.
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
: Kafka 클러스터 IP 목록을 입력해 준다.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
: 레코드의 메시지 키를 직렬화하는 클래스를 지정한다.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
: 레코드의 메시지 값를 직렬화하는 클래스를 지정한다.
카프카의 모든 메시지는 직렬화된 상태로 전송되는데 이때 사용할 시리얼라이저를 지정해 줘야 하는데, 여기서는 StringSerializer
문자열 시리얼라이즈 클래스를 사용하였다. 그 밖에 JSON, Apache Abro 등 사용할 수도 있다.
당연한 말이지만, 프로듀셔, 컨슈머의 시리얼라이저가 같지않으면 메시지 직렬화 또는 역직렬화가 불가능할 수 있으니 주의해야 한다.
Kafka Consumer 프로젝트 생성
카프카 프로젝트를 IDE를 통해서 컨슈머 프로젝트 생성한다.
Kafka Consumer 빌드 스크립트
빌드 스크립트는 프로듀셔와 동일하게 아래와 같이 설정한다. build.gradle
plugins {
id 'java'
}
group 'org.example'
version '1.0-SNAPSHOT'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.apache.kafka:kafka-clients:2.8.1' // (1)
implementation 'org.slf4j:slf4j-simple:1.7.36' // (2)
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}
test {
useJUnitPlatform()
}
(1) 카프카 클라이언트 라이브러리를 사용하기 위한 의존성 설정. 여기서는 2.8.1 버전의 클라이언트를 설정하였다. (2) 카프카 클라이언트의 로그를 확인하기 위해 slf4j 구현체 라이브러리를 추가하였다. slf4j의 api는 이미 카프카 라이브러리에 추가되어 있다.
Kafka Consumer 프로젝트 코드
이제 실제 컨슈머 코드를 작성해 보자. src/main/java/com/devkuma/kafka/SimpleKafkaConsumer.java
package com.devkuma.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleKafkaConsumer {
private final static String TOPIC_NAME = "devkuma-topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME);
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
String message = null;
try {
do {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100000));
for (ConsumerRecord<String, String> record : records) {
message = record.value();
System.out.println(message);
}
} while (!message.equals("/quit"));
} finally {
consumer.close();
}
}
}
컨슈머도 프로듀셔와 동일하게 Properties 클래스를 사용해서 옵셥값을 설정하여 Kafka Producer 인스턴스를 초기화를 하면 된다. 관련된 설정 key값도 프로듀서와 동일하게 ProducerConfig 클래스에서 제공되니 필요한 옵션값은 이 통해서 설정할 수 있다.
ProducerConfig에 옵션에 대해서는 프로듀서를 확인하도록 하자.
옵션 설명
프로듀셔, 컨슈머에 대한 주요 옵션은 아래와 같다.
ProducerConfig 클라이언트 옵션 | 옵션값 | 설명 |
---|---|---|
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG |
bootstrap.server | Kafka 클러스터 IP 목록을 입력해 준다. |
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG |
key.serializer | 레코드의 메시지 키를 직렬화하는 클래스를 지정한다. |
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG |
value.serializer | 레코드의 메시지 값를 직렬화하는 클래스를 지정한다. |
마무리
위에 예제 코드는 GitHub에서 확인해 볼 수 있다.