Tạo Consumer trong Kafka
Tạo Consummer trong Kafka
public static void main(String[] args) {
// TODO Auto-generated method stub
System.out.println("Apache Kafka Consummer");
String kafkaServer = "192.168.0.9:9092";
String topicName = "my-hello-world-topic";
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, topicName);
//properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put("enable.auto.commit", "false");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topicName));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String, String> record : records) {
System.out.printf("Consummer: Key: %s, Value: %s, Offset: %d, Partition: %d \r\n.", record.key(), record.value(), record.offset(), record.partition());
}
consumer.commitSync();
}
} catch (Exception e) {
System.out.println("Loi: " + e.getMessage());
} finally {
consumer.close(); // Đóng consumer khi kết thúc
}
}