Create Kafka Consumer In Java

How to create Kafka Consumer in Java?

In the previous topic, we have covered about how to setup kafka cluster on windows, how to create kafka topics and create kafka producer in java. Please check the previous topics of how to setup kafka cluster , how to create kafka topic , create kafka producer in java , Kafka Consumer to understand in details so that you can easily create a Java program for the Kafka Consumer.

In this article you will learn about how to read kafka messages in Java. You will see the below example of Java program to read the Kafka messages in Java.

  1. After completed the kafka setup, creation of kafka topic and creation of kafka producer the next part is to create a consumer to read the messages/events.
  2. Need to add below dependency to create a kafka consumer in Java.
    <dependency>
        <groupId> org.apache.kafka </groupId>
        <artifactId> kafka-clients </artifactId>
        <version> 0.9.0.0 </version>
    </dependency>
  3. Java program of Kafka Consumer.
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class KafkaConsumerExample {
    
        static String TOPIC_NAME= "first-topic";
        public static void main(String args[]) {
            try {
                Properties properties = new Properties();
                properties.setProperty( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" );
                properties.setProperty( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName() );
                properties.setProperty( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName() );
                properties.setProperty( ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_group" );
    
                KafkaConsumer<String, String> cons = new KafkaConsumer<>(properties);
                //Subscribe to the topic to consume messages/events.
                cons.subscribe(Arrays.asList(TOPIC_NAME));
                Duration duration = Duration.ofSeconds(1);
                System.out.println("*Started Receiving Messages*");
                while (true) {
                    ConsumerRecords<String, String> records =
                            cons.poll(duration.toMillis());
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("Received event: " + record.value());
                    }
                }
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }
    
    Output:
    *Started Receiving Messages*
    Received event: I am Good!
    Received event: What about You?
    Received event: Hope you are also well?
    

    Kafka Consumer In Java Output

  4. Terminate the Kafka Environment
  5. To terminate the Kafka Environment, go to the bin/windows directory of kafka folder on command prompt and write below command.
    zookeeper-server-stop.bat

In the next topic, you will understand about Stream API In Java 8 Stream API In Java 8