Install Kafka
Kunjungi link berikut https://kafka.apache.org/downloads
sh
tar -xzf kafka_2.13-3.2.1.tgz
sh
cd kafka_2.13-3.2.1
Untuk menjalankan zookeper server kafka
sh
bin/zookeeper-server-start.sh config/zookeeper.properties
Untuk menjalankan apache kafka
sh
bin/kafka-server-start.sh config/server.properties
Membuat Topic
untuk zookepeer
sh
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic <nama_topic>
untuk kafka
sh
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic <nama_topic>
Mengirimkan Topic
untuk partition 1
sh
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic <nama_topic>
untuk partition 3
sh
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic <nama_topic>
Melihat List Topic
untuk kafka
sh
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
untuk zookepeer
sh
./bin/kafka-topics.sh --list --zookeeper localhost:2181
Melihat Detail Topic
sh
./kafka-topics.sh --zookeeper localhost:2181 --describe
sh
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic <nama_topic>
Mengirim Data Ke Topic
sh
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic <nama_topic>
Mengirim Data Ke Topic dengan key
sh
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic <nama_topic>--property "key.separator=-" --property "parse.key=true"
Menerima Data dari Topic
melihat data topic kafka dari akhir
sh
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <nama_topic>
melihat data topic kafka dari awal
sh
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <nama_topic> --from-beginning
melihat data topic kafka berdasarkan jumlah yang ditentukan
sh
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <nama_topic> --from-beginning --max-messages 10
melihat data topic kakfa berdasarkan group
sh
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <nama_topic> --group belajar
melihat data topic kafka berdasarkan key
sh
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <nama_topic> --from-beginning --property "key.separator=-" --property "print.key=true"
melihat data topic kafka berdasarkan partition
sh
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <nama_topic> --partition 0
Melihat List Consumer
sh
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Consumer Group
sh
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <nama_topic> --group belajar
Melihat Commit log
sh
./kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --files /tmp/kafka-logs/test-topic-0/00000000000000000000.log
Menghapus topic
untuk zookepeer
sh
./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic <nama_topic>
untuk kafka
sh
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic <nama_topic>
Log Compaction
membuat topic jika pakai zookepeer maka ganti --bootstrap-server menjadi --zookeeper localhost:2181
sh
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic <nama_topic> --partitions 1 --replication-factor 1 --config cleanup.policy=compact --config min.cleanable.dirty.ratio=0.001 --config segment.ms=5000
melihat detail topic
sh
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic <nama_topic>
jalankan kafka consumer
sh
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <nama_topic> --from-beginning --property print.key=true --property key.separator=,
Kafka Producer
public static void main( String[] args )
{
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 10; i ++){
ProducerRecord<String, String> record = new ProducerRecord<>("topic-java", "Data ke " + i);
producer.send(record);
}
producer.close();
}
untuk menjalankan
sh
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <nama_topic>
Kafka Consumer
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic-java"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records){
System.out.println("Receive Data : " + record.value());
}
}
}
Kafka di Systemd
Edit dan buat file baru di /etc/systemd/system/kafka.service
[Unit]
After=network.target
Requires=zk.service
After=zk.service
[Service]
Type=simple
user=kafka
Restart=on-abnormal
ExecStart=/bin/sh -c '/home/hard/apache/kafka/bin/kafka-server-start.sh /home/hard/apache/kafka/config/server.properties > /home/hard/apache/kafka/kafka.log 2>&1'
ExecStop=/home/hard/apache/kafka/bin/kafka-server-stop.sh
[Install]
WantedBy=multi-user.target