kafka java 예제

따라서 kafka 브로커가 이를 결정할 수 있으므로 java 생산자 코드를 변경할 필요가 없습니다. 문자열 직렬화기와 마찬가지로 Kafka는 int 및 long과 같은 다른 프리미티브에 대한 serializers를 제공합니다. 키 나 값에 대 한 사용자 지정 개체를 사용 하려면 org.apache.kafka.common.serialization.Serialization를 구현 하는 클래스를 만들어야 합니다. 그런 다음 논리를 추가하여 클래스를 바이트[]로 직렬화할 수 있습니다. 또한 소비자 코드에서 해당 직렬화기를 사용해야 합니다. 5. .binwindowskafka-server-start.bat.configserver.properties를 실행하여 카프카를 시작합니다. 이후, 우리는 기본 구성에 어떤 변경 하지 않은, Kafka 실행 해야 하 고 실행 http://localhost:9092 여기 키/값 쌍으로 순차적 숫자를 포함 하는 문자열을 사용 하 여 레코드를 보내는 생산자를 사용 하 여 간단한 예입니다. 배달 콜백을 사용하여 C / C ++와 Python에서 비슷한 기능을 얻을 수 있지만 조금 더 많은 작업이 필요합니다. 전체 예제는 여기에서 찾을 수 있습니다. 파이썬 클라이언트에는 동일한 효과가있는 플러시 () 메서드도 포함되어 있습니다: PARTITIONER_CLASS_CONFIG : 레코드가 갈 파티션을 결정하는 데 사용되는 클래스입니다. 데모 항목에서는 파티션이 하나뿐이므로 이 속성을 주석으로 언급했습니다.

사용자 지정 파티셔인터페이스를 구현하여 사용자 지정 파티셔를 만들 수 있습니다. 예를 들어 librdkafka를 사용하면 먼저 작성하려는 주제에 대한 rd_kafka_topic_t 핸들을 만들어야 합니다. 그런 다음 rd_kafka_produce를 사용하여 메시지를 보낼 수 있습니다. 예: 부트스트랩_SERVERS_CONFIG: 카프카 브로커의 주소입니다. Kafka가 클러스터에서 실행되는 경우 쉼표(,) 분리된 주소를 제공할 수 있습니다. 예를 들어:localhost:9091,localhost:9092 KEY_SERIALIZER_CLASS_CONFIG: 키 개체를 직렬화하는 데 사용할 클래스입니다. 이 예제에서는 키가 길기 때문에 Long Serializer 클래스를 사용하여 키를 직렬화할 수 있습니다. 사용 사례에서 다른 개체를 키로 사용하는 경우 Kafka의 serializer 인터페이스를 구현하고 serialize 메서드를 재정의하여 사용자 지정 serializer 클래스를 만들 수 있습니다. 명령: Kafka에서 bin 폴더 내부의 설치 디렉터리는 스크립트(kafka-topics.sh)이며, 이를 사용하여 토픽을 만들고 삭제하고 주제 목록을 확인할 수 있습니다.

카프카 홈 디렉토리로 이동합니다. 응용 프로그램을 만들기 전에 먼저 ZooKeeper 및 Kafka 브로커를 시작한 다음 주제 만들기 명령을 사용하여 Kafka 브로커에서 고유한 주제를 만듭니다. 그런 다음 Sim-pleProducer.java라는 자바 클래스를 만들고 다음 코딩을 입력합니다. 이 예제에서는 한 Kafka 토픽에서 사용하고 다른 Kafka 토픽으로 프로듀싱하는 방법을 보여 주며, 위의 내용은 for 루프를 반복하여 예제 메시지(“Hello Mom ” + index)를 레코드 값으로 보내고 for 루프 인덱스를 레코드 키로 만듭니다. 각 반복에 대해 runProducer는 생산자의 송신 메서드를 호출합니다(레코드 메타데이터 메타데이터 = producer.send(record.send).get())). send 메서드는 Java Future를 반환합니다. 예제를 시작하기 전에 먼저 Kafka에서 사용되는 일반적인 용어와 몇 가지 명령에 대해 살펴보겠습니다. 기본적으로 버퍼에 사용되지 않는 추가 공간이 있더라도 버퍼를 즉시 보낼 수 있습니다.

그러나 요청 수를 줄이려면 linger.ms 0보다 큰 것으로 설정할 수 있습니다. 이렇게 하면 생산자가 동일한 일괄 처리를 채우기 위해 더 많은 레코드가 도착할 수 있도록 요청을 보내기 전에 해당 밀리초까지 기다려야 합니다. 이는 TCP의 Nagle 알고리즘과 유사합니다. 예를 들어 위의 코드 조각에서 느린 시간을 1밀리초로 설정했기 때문에 단일 요청으로 100개의 레코드가 모두 전송될 수 있습니다. 그러나 이 설정은 버퍼를 채우지 않은 경우 더 많은 레코드가 도착할 때까지 요청에 1밀리초의 대기 시간을 추가합니다. 시간에 가깝게 도착하는 레코드는 일반적으로 linger.ms=0과 함께 일괄 처리되므로 느린 구성에 관계없이 무거운 부하 일괄 처리가 수행됩니다. 그러나 이 것을 0보다 큰 것으로 설정하면 적은 양의 대기 시간으로 최대 부하가 되지 않을 때 보다 적게, 더 효율적인 요청으로 이어질 수 있습니다.

¡Contactanos!