In the previous topic, we have covered about how to setup kafka cluster on windows, how to create kafka topics and create kafka producer in java. Please check the previous topics of how to setup kafka cluster , how to create kafka topic , create kafka producer in java , Kafka Consumer to understand in details so that you can easily create a Java program for the Kafka Consumer.
In this article you will learn about how to read kafka messages in Java. You will see the below example of Java program to read the Kafka messages in Java.
<
dependency> <
groupId> org.apache.kafka <
/groupId> <
artifactId> kafka-clients <
/artifactId> <
version> 0.9.0.0 <
/version> <
/dependency>
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerExample { static String TOPIC_NAME= "first-topic"; public static void main(String args[]) { try { Properties properties = new Properties(); properties.setProperty( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); properties.setProperty( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName() ); properties.setProperty( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName() ); properties.setProperty( ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_group" ); KafkaConsumer<
String, String
> cons = new KafkaConsumer<>(properties); //Subscribe to the topic to consume messages/events. cons.subscribe(Arrays.asList(TOPIC_NAME)); Duration duration = Duration.ofSeconds(1); System.out.println("*Started Receiving Messages*"); while (true) { ConsumerRecords<
String, String
> records = cons.poll(duration.toMillis()); for (ConsumerRecord<
String, String
> record : records) { System.out.println("Received event: " + record.value()); } } }catch(Exception e){ e.printStackTrace(); } } } Output: *Started Receiving Messages* Received event: I am Good! Received event: What about You? Received event: Hope you are also well?
zookeeper-server-stop.bat
In the next topic, you will understand about Stream API In Java 8
Stream API In Java 8