Tạo Producer trong Kafka
Tạo Producer trong Kafka
public static void main(String[] args) {
// TODO Auto-generated method stub
System.out.println( "Apache Kafka Producer" );
String kafkaServer = "192.168.0.9:9092";
String topicName = "my-hello-world-topic";
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Tao Kafka Producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//for (int i = 1; i <= 10; i++) {
try {
while(true) {
String key = "Key-" + LocalDateTime.now().toString();
String value = "Hello Kafka " + LocalDateTime.now().toString();
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
// gui tin nhan bat dong bo
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Sent message: (" + key + ", " + value + ") at offset " + metadata.offset());
} else {
exception.printStackTrace();
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}finally {
// Dong producer
producer.close();
}
}