Kafka ve RabbitMQ Mülakat Soruları

100 adet ileri seviye soru ve cevapla hazırlanmış kapsamlı arşiv

Apache Kafka

70 adet ileri seviye Kafka mülakat sorusu ve cevabı

RabbitMQ

30 adet ileri seviye RabbitMQ mülakat sorusu ve cevabı

Apache Kafka Mülakat Soruları

Bu dokümanda Apache Kafka mülakatlarında en çok sorulan konular soru-cevap mantığıyla açıklanmış ve örnek kodlar eklenmiştir.

🔹 Temel Kafka Kavramları

❓ Soru 1: Apache Kafka nedir? Ne işe yarar?

Cevap: Apache Kafka, açık kaynaklı bir dağıtık olay akışı platformudur.

  • Yüksek verimli, gerçek zamanlı veri akışları için kullanılır.
  • Pub-sub modeli temelinde çalışır.
  • Veri kalıcılığı sağlar (persistent messaging).
  • Büyük ölçekli veri işleme sistemlerinde kullanılır.

Kafka Sunucusunu Başlatma Komutu:

bin/kafka-server-start.sh config/server.properties

❓ Soru 2: Kafka'da Topic nedir?

Cevap: Topic, Kafka'da mesajların kategorize edildiği mantıksal birimdir.

  • Mesajlar topic'ler aracılığıyla yayınlanır ve tüketilir.
  • Her topic birden fazla partition içerebilir.

Topic Oluşturma Komutu:

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic my-topic

❓ Soru 3: Partition nedir? Neden önemlidir?

Cevap: Partition, bir topic'in bölümleridir ve paralel işlemeye olanak tanır.

  • Her partition, bir broker üzerinde fiziksel olarak depolanır.
  • Partition sayısı arttıkça okuma ve yazma işlemi paralel olarak yapılabilir.
  • Veri dağıtımı ve yük dengeleme için önemlidir.

❓ Soru 4: Producer ve Consumer arasındaki fark nedir?

Cevap:

  • Producer: Kafka'ya mesaj gönderen uygulamadır.
  • Consumer: Kafka'dan mesaj alan uygulamadır.

Java Producer Örneği:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "value"));
producer.close();

❓ Soru 5: Consumer Group nedir? Ne işe yarar?

Cevap: Consumer Group, aynı topic'i tüketen consumer'ların koleksiyonudur.

  • Bir topic'teki partition'lar consumer group içindeki consumer'lara dağıtılır.
  • Her partition sadece bir consumer tarafından tüketilebilir.
  • Load balancing ve fault tolerance sağlar.

Consumer Group ile Consumer Başlatma:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));

❓ Soru 6: Broker nedir?

Cevap: Broker, Kafka kümesindeki bir sunucudur.

  • Her broker, cluster içinde benzersiz bir ID'ye sahiptir.
  • Partition'ları barındırır ve client isteklerini yönetir.

❓ Soru 7: Zookeeper Kafka'da ne işe yarar?

Cevap: Zookeeper, Kafka kümesi için koordinasyon hizmeti sağlar.

  • Broker'ların, topic'lerin ve partition'ların metadata'sını saklar.
  • Leader seçimi ve cluster yönetimi için kullanılır.
  • Kafka 3.0 ve sonrası için KIP-500 ile Zookeepersiz mod de desteklenmektedir.

❓ Soru 8: Offset nedir?

Cevap: Offset, bir partition içindeki her mesajın benzersiz konumunu belirten bir sayıdır.

  • Her partition kendi offset'lerini yönetir.
  • Consumer'lar hangi mesaja kadar okuduklarını offset'ler aracılığıyla takip ederler.
  • Offset'ler __consumer_offsets topic'inde saklanır.

❓ Soru 9: Replication nedir? Neden önemlidir?

Cevap: Replication, veri kopyalarının farklı broker'lar üzerinde saklanmasıdır.

  • Veri güvenliği ve dayanıklılık sağlar.
  • Bir broker çöktüğünde, veri kaybı önlenmiş olur.
  • Replication faktörü, her partition için kaç kopya olacağını belirtir.

Replication Faktörü Ayarları:

# Topic oluştururken replication faktörü belirtme
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic replicated-topic

❓ Soru 10: Leader ve Follower partition'lar arasındaki fark nedir?

Cevap:

  • Leader Partition: Bir partition için tüm okuma ve yazma işlemlerinden sorumlu olan partition'dır.
  • Follower Partition: Leader partition'dan veriyi çoğaltan (replicate eden) partition'lardır.

Leader partition çöktüğünde, follower'lardan biri yeni leader olarak seçilir.

🔹 Kafka İleri Düzey Konular

❓ Soru 11: Kafka'nın mesajlaşma garantileri nelerdir?

Cevap: Kafka üç farklı mesajlaşma garantisi sunar:

  • At most once: Mesajların en fazla bir kez teslim edildiği, kaybolabileceği garanti.
  • At least once: Mesajların en az bir kez teslim edildiği, tekrarlanabileceği garanti.
  • Exactly once: Mesajların tam olarak bir kez teslim edildiği garanti (Kafka Streams ve Transactions ile).

Producer için Acks Ayarları:

// At most once
props.put("acks", "0");

// At least once (default)
props.put("acks", "1");

// Exactly once / En güvenli
props.put("acks", "all");

❓ Soru 12: Kafka Connector'lar nelerdir? Ne işe yararlar?

Cevap: Kafka Connector'lar, Kafka'yı diğer sistemlerle entegre etmek için kullanılan bileşenlerdir.

  • Source Connector: Diğer sistemlerden Kafka'ya veri aktarır.
  • Sink Connector: Kafka'dan diğer sistemlere veri aktarır.

Örnek Connector Yapılandırması:

{
  "name": "jdbc-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://localhost:5432/mydb",
    "connection.user": "user",
    "connection.password": "password",
    "mode": "bulk",
    "topic.prefix": "jdbc-",
    "tasks.max": "1"
  }
}

❓ Soru 13: Kafka Streams nedir? Ne işe yarar?

Cevap: Kafka Streams, Kafka üzerinde gerçek zamanlı veri işleme uygulamaları geliştirmek için kullanılan bir kütüphanedir.

  • Stream Processing (akış işleme) için kullanılır.
  • Scala veya Java ile yazılabilir.
  • Stateful ve stateless işlemleri destekler.

Basit Kafka Stream Örneği:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

KStream source = builder.stream("text-input");
KTable counts = source
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
    .groupBy((key, value) -> value)
    .count();
counts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));

❓ Soru 14: Kafka'da Compaction nedir? Ne işe yarar?

Cevap: Compaction, bir topic'te aynı anahtara sahip mesajların sadece en sonuncusunu tutma işlemidir.

  • Log compaction olarak da bilinir.
  • Değişen veriler için kullanışlıdır (örneğin, kullanıcı profilleri).
  • Topic'in son durumunu korurken depolama alanından tasarruf sağlar.

Compaction Özellikli Topic Oluşturma:

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic compacted-topic --config cleanup.policy=compact

❓ Soru 15: Kafka'da Retention Period nedir? Nasıl ayarlanır?

Cevap: Retention Period, mesajların Kafka'da ne kadar süre kalacağını belirten süredir.

  • Zamana dayalı (örneğin, 7 gün) veya boyuta dayalı (örneğin, 1GB) olabilir.
  • Süre dolduğunda mesajlar otomatik olarak silinir.

Retention Ayarları:

# Topic oluştururken retention ayarlama
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic retention-topic --config retention.ms=604800000

# Mevcut topic'in retention süresini değiştirme
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name retention-topic --alter --add-config retention.ms=259200000

❓ Soru 16: Kafka'da Schema Registry nedir? Ne işe yarar?

Cevap: Schema Registry, Kafka'da kullanılan veri şemalarını (Avro, JSON, Protobuf) yönetmek için kullanılan bir servistir.

  • Veri şemalarının merkezi olarak saklanmasını sağlar.
  • Producer ve Consumer arasındaki uyumluluğu kontrol eder.
  • Şema evrimini (evolution) yönetir.

Schema Registry Kullanımı:

# Schema yükleme
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"}' \
  http://localhost:8081/subjects/users-value/versions

❓ Soru 17: Kafka'da ISR (In-Sync Replicas) nedir?

Cevap: ISR, leader partition ile senkronize olan replikaların kümesidir.

  • ISR'deki replikalar, leader partition'daki tüm mesajları kopyalamışlardır.
  • Bir replica, belirli bir süre içinde leader ile senkronize olamazsa ISR'den çıkarılır.
  • Leader seçimi ISR içindeki replikalar arasından yapılır.

ISR İle İlgili Ayarlar:

# Minimum ISR boyutu
min.insync.replicas=2

# Replica senkronizasyon zaman aşımı
replica.lag.time.max.ms=30000

❓ Soru 18: Kafka'da Consumer Rebalance nedir? Ne zaman gerçekleşir?

Cevap: Consumer Rebalance, bir consumer grubundaki partition'ların consumer'lar arasında yeniden dağıtılması işlemidir.

  • Yeni bir consumer gruba katıldığında.
  • Bir consumer gruptan ayrıldığında veya çöktüğünde.
  • Bir topic'in partition sayısı değiştiğinde.

Rebalance sırasında, consumer'lar geçici olarak veri işlemezler. Bu nedenle, uzun sürebilecek işlemler için dikkatli olunmalıdır.

❓ Soru 19: Kafka'da Static ve Dynamic Membership arasındaki fark nedir?

Cevap:

  • Static Membership: Consumer'ların sabit bir kimlik (group.instance.id) ile gruba katıldığı bir moddur. Bu modda, bir consumer geçici olarak ayrılıp geri döndüğünde, aynı partition'ları alır.
  • Dynamic Membership: Consumer'ların geçici kimliklerle gruba katıldığı varsayılan moddur. Bir consumer ayrılıp geri döndüğünde, farklı partition'lar alabilir.

Static Membership Ayarı:

props.put("group.instance.id", "consumer-1-instance");

❓ Soru 20: Kafka'da Exactly-Once Semantics nasıl sağlanır?

Cevap: Kafka'da exactly-once semantik, iki mekanizma ile sağlanır:

  • Idempotent Producer: Aynı mesajın birden fazla kez gönderilmesini önler.
  • Transactions: Birden fazla partition'a atomik yazma işlemleri sağlar.

Idempotent Producer Ayarı:

props.put("enable.idempotence", "true");

Transactional Producer Örneği:

props.put("transactional.id", "my-transactional-id");
Producer producer = new KafkaProducer<>(props);

producer.initTransactions();
try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("topic1", "key", "value"));
    producer.send(new ProducerRecord<>("topic2", "key", "value"));
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    producer.close();
} catch (KafkaException e) {
    producer.abortTransaction();
}

🔹 Kafka Performans ve Ölçeklendirme

❓ Soru 21: Kafka kümesini ölçeklendirmek için nelere dikkat etmek gerekir?

Cevap: Kafka kümesini ölçeklendirirken dikkat edilmesi gereken önemli noktalar:

  • Broker Ekleme: Yeni broker'lar küme eklendikten sonra partition'lar yeniden dağıtılmalıdır.
  • Partition Sayısı: Daha fazla paralel işlem için partition sayısı artırılabilir, ancak bu sayı azaltılamaz.
  • Replication Faktörü: Artan broker sayısıyla birlikte replication faktörü artırılabilir.
  • Network ve Disk I/O: Yeni broker'ların ağ ve disk kapasitesi mevcut broker'larla uyumlu olmalıdır.

Partition Yeniden Dağıtma Komutu:

bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassign.json --execute

❓ Soru 22: Kafka'da batch processing nedir? Performansı nasıl etkiler?

Cevap: Batch processing, Kafka'da birden fazla mesajın tek bir network isteğiyle gönderilmesidir.

  • Network trafiğini azaltır ve verimliliği artırır.
  • Gecikme (latency) ile verimlilik (throughput) arasında bir denge kurar.
  • Batch boyutu arttıkça verimlilik artar, ancak gecikme de artar.

Producer Batch Ayarları:

// Batch boyutu (bayt cinsinden)
props.put("batch.size", 16384);

// Bir mesajın ne kadar bekleneceği (milisaniye)
props.put("linger.ms", 5);

❓ Soru 23: Kafka'da partition sayısını belirlerken nelere dikkat etmek gerekir?

Cevap: Partition sayısını belirlerken dikkat edilmesi gereken faktörler:

  • Tüketim Hızı: Partition sayısı, paralel tüketim için maksimum consumer sayısını belirler.
  • Üretim Hızı: Daha fazla partition, daha yüksek paralel yazma imkanı sağlar.
  • Depolama ve Bellek: Her partition, broker'larda ek depolama ve bellek gerektirir.
  • Gelecek Büyüme: Partition sayısı azaltılamayacağı için gelecekteki ihtiyaçlar dikkate alınmalıdır.

Genel bir kural olarak, partition sayısı, beklenen maksimum paralel consumer sayısından biraz fazla olmalıdır.

❓ Soru 24: Kafka'da compression nasıl çalışır? Hangi compression algoritmaları desteklenir?

Cevap: Kafka, mesajları göndermeden önce sıkıştırabilir, bu da network trafiğini azaltır.

  • Compression, producer tarafında yapılır ve consumer tarafında otomatik olarak açılır.
  • Desteklenen algoritmalar: gzip, snappy, lz4, zstd.

Compression Ayarları:

// Compression türü
props.put("compression.type", "lz4");

// Sıkıştırma seviyesi (sadece gzip ve zstd için)
props.put("compression.level", "5");

❓ Soru 25: Kafka'da consumer performansını artırmak için neler yapılabilir?

Cevap: Consumer performansını artırmak için:

  • fetch.min.bytes ve fetch.max.wait.ms: Consumer'ın ne kadar veri bekleyeceğini kontrol eder.
  • max.partition.fetch.bytes: Bir partition'dan çekilebilecek maksimum veri boyutu.
  • max.poll.records: Bir poll() çağrısında döndürülen maksimum kayıt sayısı.
  • Concurrent Processing: Veriyi paralel işlemek için thread'ler kullanılabilir.

Consumer Performans Ayarları:

// Minimum byte bekleme
props.put("fetch.min.bytes", 1024);

// Maksimum bekleme süresi
props.put("fetch.max.wait.ms", 500);

// Bir partition'dan çekilecek maksimum veri
props.put("max.partition.fetch.bytes", 1048576);

// Bir poll'da döndürülecek maksimum kayıt
props.put("max.poll.records", 500);

❓ Soru 26: Kafka'da producer performansını artırmak için neler yapılabilir?

Cevap: Producer performansını artırmak için:

  • batch.size ve linger.ms: Daha büyük batch'ler ve daha uzun bekleme süreleri verimliliği artırır.
  • compression.type: Veri sıkıştırma network trafiğini azaltır.
  • buffer.memory: Toplam buffer belleğini artırmak.
  • acks: Daha düşük acks seviyeleri (0 veya 1) gecikmeyi azaltır.
  • retries ve retry.backoff.ms: Geçici hatalarda yeniden deneme mekanizması.

Producer Performans Ayarları:

// Toplam buffer belleği
props.put("buffer.memory", 67108864);

// Yeniden deneme sayısı
props.put("retries", 3);

// Yeniden deneme aralığı
props.put("retry.backoff.ms", 100);

❓ Soru 27: Kafka'da disk I/O performansını optimize etmek için nelere dikkat etmek gerekir?

Cevap: Disk I/O performansını optimize etmek için:

  • log.dirs: Farklı disklerde birden fazla log dizini kullanmak.
  • log.segment.bytes: Segment boyutunu artırmak.
  • log.flush.interval.messages ve log.flush.interval.ms: Disk flush sıklığını ayarlamak.
  • num.io.threads: I/O işlemleri için thread sayısını artırmak.
  • SSD Diskler: Mekanik disklere göre daha iyi I/O performansı sağlar.

Disk I/O Ayarları:

# Farklı disklerde log dizinleri
log.dirs=/disk1/kafka-logs,/disk2/kafka-logs

# Segment boyutu (varsayılan: 1GB)
log.segment.bytes=1073741824

# I/O thread sayısı
num.io.threads=8

# Flush aralığı (mesaj sayısı)
log.flush.interval.messages=10000

# Flush aralığı (milisaniye)
log.flush.interval.ms=1000

❓ Soru 28: Kafka'da network performansını optimize etmek için nelere dikkat etmek gerekir?

Cevap: Network performansını optimize etmek için:

  • socket.send.buffer.bytes ve socket.receive.buffer.bytes: Socket buffer boyutlarını artırmak.
  • socket.request.max.bytes: Maksimum istek boyutunu artırmak.
  • num.network.threads: Network işlemleri için thread sayısını artırmak.
  • compression.type: Veri sıkıştırma kullanarak network trafiğini azaltmak.
  • batch.size: Daha büyük batch'ler ile daha az network isteği.

Network Ayarları:

# Socket buffer boyutları
socket.send.buffer.bytes=1024000
socket.receive.buffer.bytes=1024000

# Maksimum istek boyutu
socket.request.max.bytes=104857600

# Network thread sayısı
num.network.threads=4

❓ Soru 29: Kafka'da JVM ayarlarını optimize etmek için nelere dikkat etmek gerekir?

Cevap: JVM ayarlarını optimize etmek için:

  • Heap Boyutu: Yeterli heap boyutu ayarlamak (genellikle 6GB ve üzeri).
  • Garbage Collector: G1GC kullanmak, büyük heap boyutları için daha uygundur.
  • JMX Port: Monitoring için JMX portunu açmak.
  • GC Logging: GC performansını izlemek için GC loglarını etkinleştirmek.

JVM Ayarları:

# Kafka başlatma komutunda JVM ayarları
export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"

❓ Soru 30: Kafka'da monitoring için hangi metrikler izlenmelidir?

Cevap: Kafka monitoring için önemli metrikler:

  • UnderReplicatedPartitions: Yeterli replikası olmayan partition sayısı.
  • UnderMinIsrPartitionCount: Minimum ISR sayısının altındaki partition sayısı.
  • IsrShrinksPerSec ve IsrExpandsPerSec: ISR'deki değişim oranı.
  • ActiveControllerCount: Aktif controller sayısı (1 olmalı).
  • OfflinePartitionsCount: Çevrimdışı partition sayısı (0 olmalı).
  • RequestHandlerAvgIdlePercent: Network thread'lerin boşta kalma oranı.
  • BytesInPerSec ve BytesOutPerSec: Network trafiği.
  • MessagesInPerSec: Mesaj giriş hızı.

Bu metrikler, Kafka kümesinin sağlığı ve performansı hakkında önemli bilgiler sağlar.

🔹 Kafka Güvenlik ve İzleme

❓ Soru 31: Kafka'da güvenlik nasıl sağlanır?

Cevap: Kafka'da güvenlik sağlamak için kullanılan yöntemler:

  • SSL/TLS: Broker'lar arası ve client-broker arası iletişim için şifreleme.
  • SASL: Kimlik doğrulama mekanizmaları (PLAIN, SCRAM, GSSAPI/Kerberos).
  • ACL (Access Control Lists): Kullanıcı ve yetkilendirme yönetimi.

SSL Ayarları:

# Broker SSL ayarları
listeners=SSL://:9093
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
ssl.truststore.password=test1234

SASL Ayarları:

# Broker SASL ayarları
listeners=SASL_SSL://:9093
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
security.inter.broker.protocol=SASL_SSL

❓ Soru 32: Kafka'da ACL (Access Control List) nasıl yapılandırılır?

Cevap: ACL, Kafka'da kaynaklara (topic, cluster, group vb.) erişimi kontrol etmek için kullanılır.

  • Principal (kullanıcı veya servis), kaynak ve işlem (READ, WRITE, CREATE, DESCRIBE vb.) bazında erişim kontrolü sağlar.
  • ACL'ler kafka-acls.sh komutu ile yönetilir.

ACL Örnekleri:

# User1'in test-topic üzerinde WRITE izni
bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:user1 --operation Write --topic test-topic

# User1'in test-consumer-group üzerinde READ izni
bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:user1 --operation Read --group test-consumer-group

# User2'nin tüm topic'leri DESCRIBE etmesine izin verme
bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:user2 --operation Describe --topic *

❓ Soru 33: Kafka'da SASL/SCRAM kimlik doğrulama nasıl yapılandırılır?

Cevap: SASL/SCRAM, kullanıcı adı ve şifre ile kimlik doğrulama sağlayan bir mekanizmadır.

  • Kullanıcı bilgileri Zookeeper'da saklanır.
  • SCRAM mekanizması, şifrelerin düz metin olarak saklanmasını önler.

SCRAM Yapılandırması:

# Broker ayarları
listeners=SASL_SSL://:9093
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_SSL

# JAAS konfigürasyonu
listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="admin" \
  password="admin-secret";

Kullanıcı Oluşturma:

# Kullanıcı ekleme
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice

❓ Soru 34: Kafka'da SSL/TLS nasıl yapılandırılır?

Cevap: SSL/TLS, Kafka'da iletişimi şifrelemek için kullanılır.

  • Broker'lar arası iletişimi şifrelemek için.
  • Client-broker arası iletişimi şifrelemek için.
  • İki yönlü kimlik doğrulama (mutual TLS) için.

SSL Sertifikası Oluşturma:

# CA sertifikası oluşturma
openssl req -new -newkey rsa:2048 -days 365 -x509 -keyout ca-key -out ca-cert -subj "/C=TR/ST=Istanbul/L=Istanbul/O=MyOrg/CN=CA"

# Broker sertifikası oluşturma
keytool -genkey -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity 365 -storepass test1234 -keypass test1234 -dname "CN=localhost"

# CA ile sertifikayı imzalama
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed

# Truststore oluşturma
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert -storepass test1234

Broker SSL Ayarları:

listeners=SSL://:9093
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
ssl.truststore.password=test1234

❓ Soru 35: Kafka'da veri şifreleme (encryption) nasıl sağlanır?

Cevap: Kafka'da veri şifreleme için iki yaklaşım vardır:

  • Transport Level Encryption (SSL/TLS): Verinin network üzerinde şifrelenmesi.
  • Application Level Encryption: Verinin uygulama katmanında şifrelenmesi.

Uygulama Katmanında Şifreleme Örneği:

import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Base64;

public class EncryptionUtil {
    private static final String ALGORITHM = "AES";
    
    public static String encrypt(String data, String key) throws Exception {
        SecretKeySpec secretKey = new SecretKeySpec(key.getBytes(), ALGORITHM);
        Cipher cipher = Cipher.getInstance(ALGORITHM);
        cipher.init(Cipher.ENCRYPT_MODE, secretKey);
        byte[] encryptedBytes = cipher.doFinal(data.getBytes());
        return Base64.getEncoder().encodeToString(encryptedBytes);
    }
    
    public static String decrypt(String encryptedData, String key) throws Exception {
        SecretKeySpec secretKey = new SecretKeySpec(key.getBytes(), ALGORITHM);
        Cipher cipher = Cipher.getInstance(ALGORITHM);
        cipher.init(Cipher.DECRYPT_MODE, secretKey);
        byte[] decryptedBytes = cipher.doFinal(Base64.getDecoder().decode(encryptedData));
        return new String(decryptedBytes);
    }
}

// Producer kullanımı
String encryptedData = EncryptionUtil.encrypt("sensitive data", "my-secret-key");
producer.send(new ProducerRecord<>("encrypted-topic", encryptedData));

❓ Soru 36: Kafka'da izleme (monitoring) için hangi araçlar kullanılabilir?

Cevap: Kafka monitoring için kullanılan araçlar:

  • Kafka内置指标: Kafka'nın JMX aracılığıyla sağladığı metrikler.
  • Prometheus + Grafana: Metrikleri toplamak ve görselleştirmek için.
  • Confluent Control Center: Confluent Platform'un sağladığı komerciyel monitoring aracı.
  • Kafka Manager (CMAK): Açık kaynaklı Kafka küme yönetim aracı.
  • Elasticsearch + Kibana: Logları toplamak ve analiz etmek için.
  • Jaeger/Zipkin: Dağıtık izleme (distributed tracing) için.

Prometheus JMX Exporter Yapılandırması:

# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'kafka'
    static_configs:
      - targets: ['localhost:7071'] # JMX Exporter portu

❓ Soru 37: Kafka'da log yönetimi nasıl yapılır?

Cevap: Kafka'da log yönetimi için:

  • log4j veya logback: Loglama için kullanılan kütüphaneler.
  • Log Aggregation: Logları merkezi bir sistemde toplamak (ELK, Splunk vb.).
  • Log Rotation: Log dosyalarının döndürülmesi.
  • Structured Logging: JSON formatında loglama.

log4j Yapılandırması:

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
  <Appenders>
    <Console name="Console" target="SYSTEM_OUT">
      <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n"/>
    </Console>
    <RollingFile name="RollingFile" fileName="logs/kafka.log" filePattern="logs/kafka-%d{yyyy-MM-dd}-%i.log">
      <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n"/>
      <Policies>
        <TimeBasedTriggeringPolicy interval="1" modulate="true"/>
        <SizeBasedTriggeringPolicy size="100 MB"/>
      </Policies>
      <DefaultRolloverStrategy max="10"/>
    </RollingFile>
  </Appenders>
  <Loggers>
    <Root level="INFO">
      <AppenderRef ref="Console"/>
      <AppenderRef ref="RollingFile"/>
    </Root>
  </Loggers>
</Configuration>

❓ Soru 38: Kafka'da uyarı (alerting) sistemi nasıl kurulur?

Cevap: Kafka'da uyarı sistemi kurmak için:

  • Prometheus Alertmanager: Prometheus metriklerine dayalı uyarılar oluşturmak.
  • Grafana Alerting: Grafana üzerinden uyarı oluşturmak.
  • Burrow: Consumer lag izleme ve uyarı için özel araç.
  • Custom Scripts: Kafka metriklerini kontrol eden özel betikler.

Prometheus Alertmanager Yapılandırması:

# alertmanager.yml
global:
  smtp_smarthost: 'localhost:587'
  smtp_from: 'alertmanager@example.com'

route:
  group_by: ['alertname', 'severity']
  group_wait: 10s
  group_interval: 10s
  repeat_interval: 1h
  receiver: 'web.hook'

receivers:
- name: 'web.hook'
  email_configs:
  - to: 'admin@example.com'

# Prometheus alert rules
groups:
- name: kafka.rules
  rules:
  - alert: KafkaUnderReplicatedPartitions
    expr: kafka_server_replicamanager_underreplicatedpartitions > 0
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "Kafka under-replicated partitions (instance {{ $labels.instance }})"
      description: "Kafka has {{ $value }} under-replicated partitions for more than 5 minutes."

❓ Soru 39: Kafka'da JMX metrikleri nasıl izlenir?

Cevap: Kafka'da JMX metriklerini izlemek için:

  • JMX Portu Açma: Kafka broker'ları için JMX portunu açmak.
  • JMX Konsolu: JConsole veya JVisualVM kullanarak metrikleri görüntülemek.
  • JMX Exporter: JMX metriklerini Prometheus formatına dönüştürmek.

JMX Portu Açma:

# Kafka başlatma komutunda JMX ayarları
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

JMX Exporter Yapılandırması:

# jmx-exporter-config.yml
rules:
- pattern: "kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value"
  name: "kafka_server_replicamanager_underreplicatedpartitions"
  type: GAUGE
  value: "{{ $value }}"

❓ Soru 40: Kafka'da audit log nasıl oluşturulur?

Cevap: Kafka'da audit log oluşturmak için:

  • Kafka Authorizer: Kimlik doğrulama ve yetkilendirme olaylarını loglamak.
  • Custom Interceptor: Producer ve Consumer interceptor'ları ile özel loglama.
  • Audit Topic: Audit olaylarını özel bir topic'e göndermek.

Audit Log Yapılandırması:

# Broker ayarları
authorizer.class.name=kafka.security.authorizer.AclAuthorizer

# Audit log ayarları
kafka.authorizer.logger.listeners=kafka.security.authorizer.AclAuthorizer$AuditLogListener
kafka.authorizer.logger.listeners.log.dir=/var/log/kafka/audit

Custom Interceptor Örneği:

public class AuditProducerInterceptor implements ProducerInterceptor {
    private static final Logger LOG = LoggerFactory.getLogger(AuditProducerInterceptor.class);
    
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        LOG.info("Sending message to topic: {}, key: {}, value: {}", 
                record.topic(), record.key(), record.value());
        return record;
    }
    
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            LOG.error("Error sending message", exception);
        } else {
            LOG.info("Message sent successfully to topic: {}, partition: {}, offset: {}", 
                    metadata.topic(), metadata.partition(), metadata.offset());
        }
    }
    
    @Override
    public void close() {}
    
    @Override
    public void configure(Map configs) {}
}

🔹 Kafka Senaryo ve Problem Çözme

❓ Soru 41: Consumer sürekli rebalance oluyor, ne yapmalıyım?

Cevap: Consumer sürekli rebalance oluyorsa:

  • max.poll.interval.ms: Bu süreden daha uzun süren işlemler için değeri artırın.
  • heartbeat.interval.ms: Heartbeat aralığını kısaltın.
  • session.timeout.ms: Oturum zaman aşımını artırın.
  • İşlem Süresi: Veri işleme süresini optimize edin.

Consumer Ayarları:

// Poll aralığı (varsayılan: 300000 ms)
props.put("max.poll.interval.ms", 600000);

// Heartbeat aralığı (varsayılan: 3000 ms)
props.put("heartbeat.interval.ms", 1000);

// Oturum zaman aşımı (varsayılan: 10000 ms)
props.put("session.timeout.ms", 30000);

❓ Soru 42: Kafka'da mesaj kaybı yaşıyorum, ne yapmalıyım?

Cevap: Mesaj kaybını önlemek için:

  • acks=all: Producer için tüm replikaların onayını bekle.
  • min.insync.replicas: Minimum ISR sayısını artır.
  • retries: Producer için yeniden deneme sayısını artır.
  • enable.idempotence: Idempotent producer'ı etkinleştir.

Producer Ayarları:

// Tüm replikaların onayını bekle
props.put("acks", "all");

// Yeniden deneme sayısı
props.put("retries", 3);

// Idempotent producer
props.put("enable.idempotence", "true");

Broker Ayarları:

# Minimum ISR sayısı
min.insync.replicas=2

❓ Soru 43: Kafka'da consumer lag (geride kalma) sorunu yaşıyorum, ne yapmalıyım?

Cevap: Consumer lag sorununu çözmek için:

  • Consumer Sayısını Artırma: Daha fazla consumer ekleyin.
  • Partition Sayısını Artırma: Topic için daha fazla partition oluşturun.
  • İşlem Süresini Optimize Etme: Veri işleme süresini azaltın.
  • Batch İşleme: Veriyi toplu olarak işleyin.
  • Consumer Ayarları: fetch.min.bytes, max.poll.records gibi ayarları optimize edin.

Consumer Lag Kontrolü:

# Consumer lag kontrolü
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer-group

Partition Sayısını Artırma:

# Partition sayısını artırma
bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my-topic --partitions 10

❓ Soru 44: Kafka broker'ı disk alanı doldu, ne yapmalıyım?

Cevap: Broker disk alanı dolduğunda:

  • Retention Süresini Azaltma: Mesajların daha hızlı silinmesi için retention süresini azaltın.
  • Log Segment Boyutunu Azaltma: Segment boyutunu küçültün.
  • Disk Temizleme: Eski log dosyalarını manuel olarak silin.
  • Yeni Disk Ekleme: Broker'a yeni disk ekleyin.
  • Topic'leri Taşıma: Bazı topic'leri başka broker'lara taşıyın.

Retention Süresini Azaltma:

# Topic için retention süresini azaltma
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config retention.ms=86400000

Log Segment Boyutunu Ayarlama:

# Topic için log segment boyutunu ayarlama
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config log.segment.bytes=536870912

❓ Soru 45: Kafka kümesinde broker çöktü, ne yapmalıyım?

Cevap: Kafka kümesinde broker çöktüğünde:

  • Broker'ı Yeniden Başlatma: Broker'ı yeniden başlatın.
  • Logları İnceleme: Neden çöktüğünü anlamak için logları inceleyin.
  • Replication Faktörünü Kontrol Etme: Partition'ların replikaları var mı kontrol edin.
  • Leader Seçimini İzleme: Yeni leader'ların seçilip seçilmediğini izleyin.
  • Under-Replicated Partition'ları İzleme: Yeterli replikası olmayan partition'ları izleyin.

Broker Durumunu Kontrol Etme:

# Broker durumunu kontrol etme
bin/kafka-broker-api-versions --bootstrap-server localhost:9092

# Topic durumunu kontrol etme
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic

❓ Soru 46: Kafka'da duplicate mesajlar alıyorum, ne yapmalıyım?

Cevap: Duplicate mesajları önlemek için:

  • Idempotent Producer: Producer'ı idempotent hale getirin.
  • Exactly-Once Semantics: Transaction kullanarak tam olarak bir kez teslimat sağlayın.
  • Consumer Tarafında Tekilleştirme: Aynı mesajın tekrar işlenmesini önleyin.

Idempotent Producer Ayarı:

props.put("enable.idempotence", "true");

Transactional Producer Örneği:

props.put("transactional.id", "my-transactional-id");
Producer producer = new KafkaProducer<>(props);

producer.initTransactions();
try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("topic1", "key", "value"));
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    producer.close();
} catch (KafkaException e) {
    producer.abortTransaction();
}

Consumer Tarafında Tekilleştirme:

public class DeduplicatingConsumer {
    private Set processedIds = new HashSet<>();
    
    public void process(ConsumerRecord record) {
        String messageId = record.key() + "-" + record.offset();
        if (processedIds.contains(messageId)) {
            return; // Zaten işlenmiş
        }
        
        // Mesajı işle
        processMessage(record.value());
        
        // İşlenmiş olarak işaretle
        processedIds.add(messageId);
    }
}

❓ Soru 47: Kafka'da mesajların sırası bozuluyor, ne yapmalıyım?

Cevap: Mesaj sırasını korumak için:

  • Partition Sayısı: Sıralı mesajlar için tek partition kullanın.
  • Key Kullanma: Aynı key'e sahip mesajlar aynı partition'a gider.
  • Max In Flight Requests: Producer'da max.in.flight.requests.per.connection=1 ayarlayın.
  • İşlem Sırası: Consumer'da mesajları sıralı olarak işleyin.

Producer Ayarları:

// Sıralı gönderim için
props.put("max.in.flight.requests.per.connection", "1");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", "true");

// Aynı key'e sahip mesajlar aynı partition'a gider
producer.send(new ProducerRecord<>("my-topic", "same-key", "value1"));
producer.send(new ProducerRecord<>("my-topic", "same-key", "value2"));

Consumer Tarafında Sıralı İşleme:

public class OrderedConsumer {
    public void consume() {
        while (true) {
            ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord record : records) {
                // Sıralı olarak işle
                processInOrder(record);
            }
        }
    }
    
    private void processInOrder(ConsumerRecord record) {
        // Mesajı sıralı olarak işle
        System.out.println("Processing: " + record.value());
    }
}

❓ Soru 48: Kafka'da mesajlar tüketiciye ulaşıyor ama işlenmiyor, ne yapmalıyım?

Cevap: Mesajlar tüketiciye ulaşıyor ama işlenmiyorsa:

  • Consumer Loglarını İnceleme: Hata veya exception olup olmadığını kontrol edin.
  • Deserializasyon Hataları: Mesaj formatının doğru olup olmadığını kontrol edin.
  • İşlem Hatası: Mesaj işlenirken hata oluşuyor mu kontrol edin.
  • Offset Commit: Offset'lerin commit edilip edilmediğini kontrol edin.

Consumer Hata Ayıklama:

try {
    ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord record : records) {
        try {
            // Mesajı işle
            processMessage(record);
        } catch (Exception e) {
            // Hata durumunda log yaz
            logger.error("Error processing message: " + record.value(), e);
            
            // Hatalı mesajı özel bir topic'e gönder
            sendToDlq(record);
        }
    }
    
    // Başarılı mesajları commit et
    consumer.commitSync();
} catch (Exception e) {
    logger.error("Error in consumer", e);
}

❓ Soru 49: Kafka'da producer mesaj gönderemiyor, ne yapmalıyım?

Cevap: Producer mesaj gönderemiyorsa:

  • Broker Bağlantısı: Broker'a bağlanıp bağlanamadığını kontrol edin.
  • Topic Varlığı: Topic'in var olup olmadığını kontrol edin.
  • Yetkilendirme: Producer'ın topic'e yazma yetkisinin olup olmadığını kontrol edin.
  • Producer Ayarları: Bootstrap servers, serializer gibi ayarların doğru olup olmadığını kontrol edin.

Bağlantı Testi:

# Broker'a bağlantı testi
telnet localhost 9092

# Topic listesini kontrol etme
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

Producer Hata Ayıklama:

try {
    ProducerRecord record = new ProducerRecord<>("my-topic", "key", "value");
    
    // Senkron gönderim ve hata kontrolü
    RecordMetadata metadata = producer.send(record).get();
    
    System.out.println("Message sent to partition " + metadata.partition() + 
                      " with offset " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
    // Hata durumunda detaylı bilgi
    if (e.getCause() instanceof TimeoutException) {
        System.err.println("Timeout while sending message");
    } else if (e.getCause() instanceof NotLeaderForPartitionException) {
        System.err.println("Not leader for partition");
    } else {
        System.err.println("Error sending message: " + e.getCause().getMessage());
    }
}

❓ Soru 50: Kafka kümesi yüksek gecikme (latency) yaşıyor, ne yapmalıyım?

Cevap: Kafka kümesinde yüksek gecikme yaşıyorsanız:

  • Network Gecikmesi: Broker'lar arası ve client-broker arası network gecikmesini ölçün.
  • Disk I/O: Disk I/O performansını kontrol edin.
  • GC Pauses: JVM garbage collection sürelerini izleyin.
  • Broker Kaynakları: CPU, bellek ve disk kullanımını kontrol edin.
  • Producer ve Consumer Ayarları: Batch boyutu, buffer boyutu gibi ayarları optimize edin.

Gecikme Ölçümü:

# Producer gecikmesi ölçümü
bin/kafka-run-class.sh kafka.tools.EndToEndLatency \
  broker-list localhost:9092 \
  topic test-topic \
  num-records 1000 \
  record-size 1000 \
  producer-props acks=all,linger.ms=0 \
  consumer-props group.id=test-group

# Network gecikmesi testi
ping -c 10 kafka-broker-host

Producer ve Consumer Ayarları:

// Producer için düşük gecikme ayarları
props.put("linger.ms", 0); // Bekleme süresini sıfırla
props.put("batch.size", 0); // Batch'leri devre dışı bırak
props.put("compression.type", "none"); // Sıkıştırmayı devre dışı bırak

// Consumer için düşük gecikme ayarları
props.put("fetch.min.bytes", 1); // Minimum bekleme boyutu
props.put("fetch.max.wait.ms", 100); // Maksimum bekleme süresi

🔹 Kafka Entegrasyon ve Kullanım Senaryoları

❓ Soru 51: Kafka ve Spark Streaming nasıl entegre edilir?

Cevap: Kafka ve Spark Streaming entegrasyonu için:

  • Spark Kafka Connector: Spark'ın Kafka ile iletişim kurmasını sağlar.
  • Structured Streaming: Spark'ın yapısal akış işleme API'si.

Spark Streaming Kafka Entegrasyonu:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val spark = SparkSession.builder()
  .appName("KafkaSparkIntegration")
  .master("local[*]")
  .getOrCreate()

// Kafka'dan veri okuma
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "input-topic")
  .load()

// Mesaj değerlerini string olarak dönüştürme
val values = df.selectExpr("CAST(value AS STRING)")

// Veriyi işleme
val processed = values.withColumn("processed", upper(col("value")))

// Kafka'ya yazma
val query = processed.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

query.awaitTermination()

❓ Soru 52: Kafka ve Flink nasıl entegre edilir?

Cevap: Kafka ve Flink entegrasyonu için:

  • Flink Kafka Connector: Flink'in Kafka ile iletişim kurmasını sağlar.
  • FlinkKafkaConsumer ve FlinkKafkaProducer: Kafka'dan veri okuma ve yazma sınıfları.

Flink Kafka Entegrasyonu:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;

public class KafkaFlinkIntegration {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "flink-consumer");
        
        // Kafka'dan veri okuma
        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(
            "input-topic",
            new SimpleStringSchema(),
            properties
        );
        
        DataStream stream = env.addSource(consumer);
        
        // Veriyi işleme
        DataStream processed = stream.map(String::toUpperCase);
        
        // Kafka'ya yazma
        FlinkKafkaProducer producer = new FlinkKafkaProducer<>(
            "localhost:9092",
            "output-topic",
            new SimpleStringSchema()
        );
        
        processed.addSink(producer);
        
        env.execute("Kafka Flink Integration");
    }
}

❓ Soru 53: Kafka ve Storm nasıl entegre edilir?

Cevap: Kafka ve Storm entegrasyonu için:

  • KafkaSpout: Kafka'dan veri okumak için kullanılır.
  • KafkaBolt: Kafka'ya veri yazmak için kullanılır.

Storm Kafka Entegrasyonu:

import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.StringKeyValueScheme;
import org.apache.storm.topology.TopologyBuilder;

public class KafkaStormIntegration {
    public static void main(String[] args) throws Exception {
        BrokerHosts hosts = new ZkHosts("localhost:2181");
        
        // Kafka Spout yapılandırması
        SpoutConfig spoutConfig = new SpoutConfig(hosts, "input-topic", "/zookeeper", "storm-consumer");
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        
        // Kafka Bolt yapılandırması
        KafkaBolt kafkaBolt = new KafkaBolt()
            .withProducerProperties(new Properties() {{
                put("bootstrap.servers", "localhost:9092");
                put("acks", "1");
                put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            }})
            .withTopicSelector("output-topic")
            .withTupleToKafkaMapper(new FieldNameTupleToKafkaMapper("key", "value"));
        
        // Topoloji oluşturma
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);
        builder.setBolt("upper-bolt", new UpperCaseBolt(), 1).shuffleGrouping("kafka-spout");
        builder.setBolt("kafka-bolt", kafkaBolt, 1).shuffleGrouping("upper-bolt");
        
        // Topolojiyi çalıştırma
        Config config = new Config();
        config.setDebug(true);
        
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("kafka-storm-integration", config, builder.createTopology());
        
        Thread.sleep(10000);
        cluster.shutdown();
    }
}

❓ Soru 54: Kafka ve Elasticsearch nasıl entegre edilir?

Cevap: Kafka ve Elasticsearch entegrasyonu için:

  • Kafka Connect Elasticsearch Connector: Kafka'dan Elasticsearch'e veri aktarmak için kullanılır.
  • Logstash: Kafka'dan veri okuyup Elasticsearch'e yazmak için kullanılabilir.

Kafka Connect Elasticsearch Connector Yapılandırması:

{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "input-topic",
    "connection.url": "http://localhost:9200",
    "type.name": "_doc",
    "key.ignore": "true",
    "schema.ignore": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false
  }
}

Logstash Konfigürasyonu:

# logstash.conf
input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["input-topic"]
  }
}

filter {
  # Gerekli dönüşümleri burada yapın
  mutate {
    rename => { "message" => "event_message" }
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "kafka-events"
    document_type => "_doc"
  }
}

❓ Soru 55: Kafka ve Cassandra nasıl entegre edilir?

Cevap: Kafka ve Cassandra entegrasyonu için:

  • Kafka Connect Cassandra Connector: Kafka'dan Cassandra'ya veri aktarmak için kullanılır.
  • Spark Cassandra Connector: Spark ile Kafka'dan okunan veriyi Cassandra'ya yazmak için kullanılır.

Kafka Connect Cassandra Connector Yapılandırması:

{
  "name": "cassandra-sink",
  "config": {
    "connector.class": "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
    "tasks.max": "1",
    "topics": "input-topic",
    "connect.cassandra.contact.points": "localhost",
    "connect.cassandra.port": "9042",
    "connect.cassandra.key.space": "test_keyspace",
    "connect.cassandra.consistency.level": "ONE",
    "connect.cassandra.error.policy": "NOOP",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false
  }
}

Spark Cassandra Connector Örneği:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector._

val spark = SparkSession.builder()
  .appName("KafkaCassandraIntegration")
  .master("local[*]")
  .config("spark.cassandra.connection.host", "localhost")
  .config("spark.cassandra.connection.port", "9042")
  .getOrCreate()

// Kafka'dan veri okuma
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "input-topic")
  .load()
  .selectExpr("CAST(value AS STRING) as json")
  .select(from_json("json", "name STRING, age INT").as("data"))
  .select("data.*")

// Cassandra'ya yazma
val query = df.writeStream
  .foreachBatch { (batchDF: org.apache.spark.sql.DataFrame, batchId: Long) =>
    batchDF.write
      .format("org.apache.spark.sql.cassandra")
      .options(Map("table" -> "users", "keyspace" -> "test_keyspace"))
      .mode("append")
      .save()
  }
  .start()

query.awaitTermination()

❓ Soru 56: Kafka ve Hadoop HDFS nasıl entegre edilir?

Cevap: Kafka ve Hadoop HDFS entegrasyonu için:

  • Kafka Connect HDFS Connector: Kafka'dan HDFS'e veri aktarmak için kullanılır.
  • Spark HDFS Connector: Spark ile Kafka'dan okunan veriyi HDFS'e yazmak için kullanılır.

Kafka Connect HDFS Connector Yapılandırması:

{
  "name": "hdfs-sink",
  "config": {
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "input-topic",
    "hdfs.url": "hdfs://localhost:9000",
    "hadoop.conf.dir": "/etc/hadoop/conf",
    "flush.size": "3",
    "rotate.interval.ms": "1000",
    "logs.dir": "/kafka-data",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false
  }
}

Spark HDFS Örneği:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder()
  .appName("KafkaHdfsIntegration")
  .master("local[*]")
  .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000")
  .getOrCreate()

// Kafka'dan veri okuma
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "input-topic")
  .load()
  .selectExpr("CAST(value AS STRING)")

// HDFS'e yazma
val query = df.writeStream
  .format("parquet")
  .option("path", "hdfs://localhost:9000/kafka-data")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

query.awaitTermination()

❓ Soru 57: Kafka ve MongoDB nasıl entegre edilir?

Cevap: Kafka ve MongoDB entegrasyonu için:

  • Kafka Connect MongoDB Connector: Kafka'dan MongoDB'ye veri aktarmak için kullanılır.
  • Spark MongoDB Connector: Spark ile Kafka'dan okunan veriyi MongoDB'ye yazmak için kullanılır.

Kafka Connect MongoDB Connector Yapılandırması:

{
  "name": "mongodb-sink",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "tasks.max": "1",
    "topics": "input-topic",
    "connection.uri": "mongodb://localhost:27017/kafka_db",
    "database": "kafka_db",
    "collection": "events",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false
  }
}

Spark MongoDB Örneği:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import com.mongodb.spark.config._

val spark = SparkSession.builder()
  .appName("KafkaMongoDBIntegration")
  .master("local[*]")
  .config("spark.mongodb.input.uri", "mongodb://localhost:27017/kafka_db.events")
  .config("spark.mongodb.output.uri", "mongodb://localhost:27017/kafka_db.events")
  .getOrCreate()

// Kafka'dan veri okuma
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "input-topic")
  .load()
  .selectExpr("CAST(value AS STRING) as json")
  .select(from_json("json", "name STRING, age INT").as("data"))
  .select("data.*")

// MongoDB'ye yazma
val query = df.writeStream
  .foreachBatch { (batchDF: org.apache.spark.sql.DataFrame, batchId: Long) =>
    batchDF.write
      .format("com.mongodb.spark.sql.DefaultSource")
      .mode("append")
      .save()
  }
  .start()

query.awaitTermination()

❓ Soru 58: Kafka ve Redis nasıl entegre edilir?

Cevap: Kafka ve Redis entegrasyonu için:

  • Kafka Connect Redis Connector: Kafka'dan Redis'e veri aktarmak için kullanılır.
  • Redis Stream: Redis'in stream veri yapısı ile Kafka arasında veri aktarımı.

Kafka Connect Redis Connector Yapılandırması:

{
  "name": "redis-sink",
  "config": {
    "connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
    "tasks.max": "1",
    "topics": "input-topic",
    "redis.host": "localhost",
    "redis.port": "6379",
    "redis.command": "SET",
    "redis.key": "kafka-message",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false
  }
}

Java Redis Entegrasyonu:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import redis.clients.jedis.Jedis;
import java.util.Collections;
import java.util.Properties;

public class KafkaRedisIntegration {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "redis-consumer");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("input-topic"));
        
        Jedis jedis = new Jedis("localhost", 6379);
        
        while (true) {
            ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord record : records) {
                // Redis'e yaz
                jedis.set("kafka:" + record.offset(), record.value());
                System.out.println("Written to Redis: " + record.value());
            }
        }
    }
}

❓ Soru 59: Kafka ve PostgreSQL nasıl entegre edilir?

Cevap: Kafka ve PostgreSQL entegrasyonu için:

  • Kafka Connect JDBC Connector: Kafka'dan PostgreSQL'e veri aktarmak için kullanılır.
  • Debezium: PostgreSQL'den değişiklikleri yakalayıp Kafka'ya göndermek için kullanılır.

Kafka Connect JDBC Connector Yapılandırması:

{
  "name": "postgres-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "input-topic",
    "connection.url": "jdbc:postgresql://localhost:5432/kafka_db",
    "connection.user": "postgres",
    "connection.password": "password",
    "auto.create": "true",
    "auto.evolve": "true",
    "insert.mode": "insert",
    "pk.mode": "none",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false
  }
}

Debezium PostgreSQL Connector Yapılandırması:

{
  "name": "postgres-source",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "localhost",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "password",
    "database.dbname": "postgres",
    "database.server.name": "postgres-server",
    "plugin.name": "pgoutput"
  }
}

❓ Soru 60: Kafka ve MySQL nasıl entegre edilir?

Cevap: Kafka ve MySQL entegrasyonu için:

  • Kafka Connect JDBC Connector: Kafka'dan MySQL'e veri aktarmak için kullanılır.
  • Debezium: MySQL'den değişiklikleri yakalayıp Kafka'ya göndermek için kullanılır.

Kafka Connect JDBC Connector Yapılandırması:

{
  "name": "mysql-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "input-topic",
    "connection.url": "jdbc:mysql://localhost:3306/kafka_db",
    "connection.user": "root",
    "connection.password": "password",
    "auto.create": "true",
    "auto.evolve": "true",
    "insert.mode": "insert",
    "pk.mode": "none",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false
  }
}

Debezium MySQL Connector Yapılandırması:

{
  "name": "mysql-source",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "password",
    "database.server.id": "184054",
    "database.server.name": "mysql-server",
    "database.include.list": "kafka_db",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "schema-changes.kafka_db"
  }
}

🔹 Kafka İleri Düzey Senaryolar

❓ Soru 61: Kafka'da CQRS (Command Query Responsibility Segregation) pattern nasıl uygulanır?

Cevap: CQRS pattern, yazma (command) ve okuma (query) işlemlerini ayıran bir mimari pattern'idir. Kafka ile CQRS uygulamak için:

  • Command Side: Kullanıcı eylemleri (create, update, delete) için bir topic.
  • Event Store: Komutları olaylara dönüştüren ve olayları bir topic'e yazan bir servis.
  • Query Side: Olayları dinleyen ve okuma modelini güncelleyen bir servis.

CQRS Mimarisi Örneği:

// Command Service
public class CommandService {
    private final KafkaTemplate kafkaTemplate;
    
    public void createUser(CreateUserCommand command) {
        // Komutu doğrula
        // ...
        
        // Olay oluştur
        UserCreatedEvent event = new UserCreatedEvent(
            command.getUserId(),
            command.getUsername(),
            command.getEmail()
        );
        
        // Olayı Kafka'ya gönder
        kafkaTemplate.send("user-commands", event);
    }
}

// Event Processor
public class EventProcessor {
    @KafkaListener(topics = "user-commands")
    public void processUserCreatedEvent(UserCreatedEvent event) {
        // Olayı işle ve veritabanına yaz
        User user = new User(
            event.getUserId(),
            event.getUsername(),
            event.getEmail()
        );
        
        userRepository.save(user);
        
        // Okuma modelini güncelle
        UserView userView = new UserView(
            event.getUserId(),
            event.getUsername(),
            event.getEmail()
        );
        
        userViewRepository.save(userView);
    }
}

// Query Service
public class QueryService {
    public UserView getUser(String userId) {
        return userViewRepository.findById(userId);
    }
}

❓ Soru 62: Kafka'da Event Sourcing pattern nasıl uygulanır?

Cevap: Event Sourcing, uygulama durumunu olayların bir dizisi olarak saklayan bir pattern'idir. Kafka ile Event Sourcing uygulamak için:

  • Event Store: Olayları saklamak için bir Kafka topic'i.
  • Aggregate: Olayları uygulayarak mevcut durumu yeniden oluşturan bir sınıf.
  • Snapshot: Performansı artırmak için belirli aralıklarla durumun anlık görüntüsünü alma.

Event Sourcing Örneği:

// Event Interface
public interface Event {
    String getAggregateId();
    Instant getTimestamp();
}

// Aggregate Class
public class UserAggregate {
    private String userId;
    private String username;
    private String email;
    private List events = new ArrayList<>();
    
    public static UserAggregate create(String userId, String username, String email) {
        UserAggregate aggregate = new UserAggregate();
        aggregate.apply(new UserCreatedEvent(userId, username, email));
        return aggregate;
    }
    
    public void updateEmail(String email) {
        apply(new UserEmailUpdatedEvent(userId, email));
    }
    
    private void apply(Event event) {
        // Olayı uygula
        if (event instanceof UserCreatedEvent) {
            UserCreatedEvent e = (UserCreatedEvent) event;
            this.userId = e.getUserId();
            this.username = e.getUsername();
            this.email = e.getEmail();
        } else if (event instanceof UserEmailUpdatedEvent) {
            UserEmailUpdatedEvent e = (UserEmailUpdatedEvent) event;
            this.email = e.getEmail();
        }
        
        // Olayı listeye ekle
        events.add(event);
    }
    
    public List getUncommittedEvents() {
        return new ArrayList<>(events);
    }
    
    public void markEventsAsCommitted() {
        events.clear();
    }
}

// Event Store
public class EventStore {
    private final KafkaTemplate kafkaTemplate;
    
    public void saveEvents(String aggregateId, List events) {
        events.forEach(event -> {
            kafkaTemplate.send("user-events", aggregateId, event);
        });
    }
    
    public List getEvents(String aggregateId) {
        // Kafka'dan olayları oku
        // ...
        return events;
    }
}

// Aggregate Repository
public class AggregateRepository {
    private final EventStore eventStore;
    
    public void save(UserAggregate aggregate) {
        eventStore.saveEvents(aggregate.getUserId(), aggregate.getUncommittedEvents());
        aggregate.markEventsAsCommitted();
    }
    
    public UserAggregate findById(String userId) {
        List events = eventStore.getEvents(userId);
        UserAggregate aggregate = new UserAggregate();
        
        // Olayları sırayla uygula
        events.forEach(aggregate::apply);
        
        return aggregate;
    }
}

❓ Soru 63: Kafka'da Saga pattern nasıl uygulanır?

Cevap: Saga pattern, dağıtık sistemlerde transaction yönetimi için kullanılan bir pattern'idir. Kafka ile Saga uygulamak için:

  • Orchestrator-Based Saga: Merkezi bir orchestrator tüm adımları yönetir.
  • Choreography-Based Saga: Her servis kendi adımını tamamlar ve bir sonraki adımı tetikler.

Choreography-Based Saga Örneği:

// Order Service
@Service
public class OrderService {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    @Transactional
    public Order createOrder(OrderRequest orderRequest) {
        // Siparişi oluştur
        Order order = new Order(orderRequest);
        orderRepository.save(order);
        
        // OrderCreatedEvent gönder
        OrderCreatedEvent event = new OrderCreatedEvent(
            order.getId(),
            order.getCustomerId(),
            order.getAmount()
        );
        
        kafkaTemplate.send("orders", event);
        
        return order;
    }
    
    @KafkaListener(topics = "payment-completed")
    public void handlePaymentCompleted(PaymentCompletedEvent event) {
        // Ödeme tamamlandı, siparişi güncelle
        Order order = orderRepository.findById(event.getOrderId());
        order.setStatus(OrderStatus.PAID);
        orderRepository.save(order);
        
        // OrderCompletedEvent gönder
        OrderCompletedEvent orderEvent = new OrderCompletedEvent(order.getId());
        kafkaTemplate.send("orders", orderEvent);
    }
    
    @KafkaListener(topics = "payment-failed")
    public void handlePaymentFailed(PaymentFailedEvent event) {
        // Ödeme başarısız, siparişi iptal et
        Order order = orderRepository.findById(event.getOrderId());
        order.setStatus(OrderStatus.CANCELLED);
        orderRepository.save(order);
        
        // OrderCancelledEvent gönder
        OrderCancelledEvent orderEvent = new OrderCancelledEvent(order.getId());
        kafkaTemplate.send("orders", orderEvent);
    }
}

// Payment Service
@Service
public class PaymentService {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    @KafkaListener(topics = "orders")
    public void handleOrderCreated(OrderCreatedEvent event) {
        try {
            // Ödeme işlemini yap
            Payment payment = paymentService.processPayment(
                event.getOrderId(),
                event.getAmount()
            );
            
            // PaymentCompletedEvent gönder
            PaymentCompletedEvent paymentEvent = new PaymentCompletedEvent(
                payment.getOrderId(),
                payment.getId()
            );
            
            kafkaTemplate.send("payments", paymentEvent);
        } catch (PaymentException e) {
            // PaymentFailedEvent gönder
            PaymentFailedEvent paymentEvent = new PaymentFailedEvent(
                event.getOrderId(),
                e.getMessage()
            );
            
            kafkaTemplate.send("payments", paymentEvent);
        }
    }
}

❓ Soru 64: Kafka'da Dead Letter Queue (DLQ) nasıl uygulanır?

Cevap: Dead Letter Queue, işlenemeyen mesajları saklamak için kullanılan bir pattern'idir. Kafka ile DLQ uygulamak için:

  • Hata Yakalama: Mesaj işlenirken oluşan hataları yakala.
  • DLQ Topic: İşlenemeyen mesajları göndermek için özel bir topic.
  • Retry Mekanizması: Mesajları yeniden denemek için bir mekanizma.

Dead Letter Queue Örneği:

@Service
public class MessageProcessor {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    @Value("${kafka.topic.dlq}")
    private String dlqTopic;
    
    @KafkaListener(topics = "input-topic")
    public void processMessage(ConsumerRecord record) {
        try {
            // Mesajı işle
            process(record.value());
            
            // Başarılı ise offset'i commit et
            // ...
        } catch (Exception e) {
            // Hata durumunda log yaz
            logger.error("Error processing message: " + record.value(), e);
            
            // Mesajı DLQ'ya gönder
            sendToDlq(record, e);
        }
    }
    
    private void sendToDlq(ConsumerRecord record, Exception e) {
        // Orijinal mesaj bilgilerini ve hatayı içeren bir DLQ mesajı oluştur
        DlqMessage dlqMessage = new DlqMessage(
            record.value(),
            record.topic(),
            record.partition(),
            record.offset(),
            e.getMessage(),
            Instant.now()
        );
        
        // DLQ topic'ine gönder
        kafkaTemplate.send(dlqTopic, dlqMessage);
    }
}

// Retry Mekanizması
@Service
public class RetryableMessageProcessor {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    @Value("${kafka.topic.retry}")
    private String retryTopic;
    
    @Value("${kafka.topic.dlq}")
    private String dlqTopic;
    
    @KafkaListener(topics = "input-topic")
    public void processWithRetry(ConsumerRecord record) {
        int maxRetries = 3;
        int retryCount = 0;
        boolean success = false;
        
        while (retryCount < maxRetries && !success) {
            try {
                // Mesajı işle
                process(record.value());
                success = true;
            } catch (Exception e) {
                retryCount++;
                
                if (retryCount >= maxRetries) {
                    // Maksimum deneme sayısına ulaşıldı, DLQ'ya gönder
                    sendToDlq(record, e);
                } else {
                    // Retry topic'ine gönder
                    sendToRetryTopic(record, retryCount, e);
                    
                    // Bir sonraki deneme için bekle
                    try {
                        Thread.sleep(1000 * retryCount);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }
    
    private void sendToRetryTopic(ConsumerRecord record, int retryCount, Exception e) {
        RetryMessage retryMessage = new RetryMessage(
            record.value(),
            record.topic(),
            record.partition(),
            record.offset(),
            retryCount,
            e.getMessage(),
            Instant.now()
        );
        
        kafkaTemplate.send(retryTopic, retryMessage);
    }
}

❓ Soru 65: Kafka'da idempotent consumer nasıl uygulanır?

Cevap: Idempotent consumer, aynı mesajın birden fazla kez işlenmesini önleyen bir pattern'idir. Kafka ile idempotent consumer uygulamak için:

  • Mesaj ID'si: Her mesaj için benzersiz bir ID.
  • İşlenmiş ID'ler: Zaten işlenmiş mesaj ID'lerini saklayan bir mekanizma.

Idempotent Consumer Örneği:

@Service
public class IdempotentConsumer {
    @Autowired
    private ProcessedMessageRepository processedMessageRepository;
    
    @KafkaListener(topics = "input-topic")
    public void processMessage(ConsumerRecord record) {
        // Mesaj ID'sini oluştur
        String messageId = record.key() + "-" + record.offset();
        
        // Mesajın daha önce işlenip işlenmediğini kontrol et
        if (processedMessageRepository.existsById(messageId)) {
            logger.info("Message already processed: " + messageId);
            return;
        }
        
        try {
            // Mesajı işle
            process(record.value());
            
            // Mesajı işlenmiş olarak işaretle
            ProcessedMessage processedMessage = new ProcessedMessage(
                messageId,
                record.topic(),
                record.partition(),
                record.offset(),
                Instant.now()
            );
            
            processedMessageRepository.save(processedMessage);
        } catch (Exception e) {
            // Hata durumunda log yaz ve işlemi geri al
            logger.error("Error processing message: " + messageId, e);
            throw new RuntimeException("Failed to process message", e);
        }
    }
    
    private void process(String message) {
        // Mesajı işle
        // ...
    }
}

// Redis ile Idempotent Consumer
@Service
public class RedisIdempotentConsumer {
    @Autowired
    private RedisTemplate redisTemplate;
    
    @KafkaListener(topics = "input-topic")
    public void processMessage(ConsumerRecord record) {
        // Mesaj ID'sini oluştur
        String messageId = record.key() + "-" + record.offset();
        
        // Redis'te mesaj ID'sini kontrol et
        Boolean isProcessed = redisTemplate.hasKey(messageId);
        
        if (Boolean.TRUE.equals(isProcessed)) {
            logger.info("Message already processed: " + messageId);
            return;
        }
        
        try {
            // Mesajı işle
            process(record.value());
            
            // Mesajı Redis'te işlenmiş olarak işaretle
            redisTemplate.opsForValue().set(messageId, "PROCESSED");
            
            // Mesajın ne kadar süre kalacağını ayarla (örneğin 7 gün)
            redisTemplate.expire(messageId, 7, TimeUnit.DAYS);
        } catch (Exception e) {
            // Hata durumunda log yaz ve işlemi geri al
            logger.error("Error processing message: " + messageId, e);
            throw new RuntimeException("Failed to process message", e);
        }
    }
    
    private void process(String message) {
        // Mesajı işle
        // ...
    }
}

❓ Soru 66: Kafka'da backpressure nasıl yönetilir?

Cevap: Backpressure, veri üreticisinin hızını tüketicinin işleyebileceği hızla sınırlayan bir mekanizmadır. Kafka'da backpressure yönetmek için:

  • Consumer Lag İzleme: Consumer'ın geride kalmasını izleme.
  • Dynamic Throttling: Producer hızını dinamik olarak ayarlama.
  • Buffer Boyutunu Ayarlama: Producer ve consumer buffer boyutlarını ayarlama.

Backpressure Yönetimi Örneği:

@Service
public class BackpressureManager {
    @Autowired
    private KafkaAdmin kafkaAdmin;
    
    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    private final double maxLagThreshold = 1000.0; // Maksimum lag eşiği
    private final double minLagThreshold = 100.0;  // Minimum lag eşiği
    
    @Scheduled(fixedRate = 5000) // Her 5 saniyede bir kontrol et
    public void manageBackpressure() {
        // Tüm consumer grupları için lag değerlerini al
        Map> consumerGroups = getConsumerGroupOffsets();
        
        for (Map.Entry> entry : consumerGroups.entrySet()) {
            String groupId = entry.getKey();
            Map offsets = entry.getValue();
            
            // Her topic için lag hesapla
            Map topicLags = calculateTopicLags(offsets);
            
            for (Map.Entry topicEntry : topicLags.entrySet()) {
                String topic = topicEntry.getKey();
                Long lag = topicEntry.getValue();
                
                // Lag eşiğine göre throttling uygula
                if (lag > maxLagThreshold) {
                    // Producer hızını düşür
                    throttleProducer(topic, 0.5); // %50 hızında
                    logger.info("Throttling producer for topic: " + topic + " due to high lag: " + lag);
                } else if (lag < minLagThreshold) {
                    // Producer hızını artır
                    throttleProducer(topic, 1.0); // Tam hız
                    logger.info("Resuming producer for topic: " + topic + " as lag is normal: " + lag);
                }
            }
        }
    }
    
    private Map calculateTopicLags(Map offsets) {
        Map topicLags = new HashMap<>();
        
        // Her topic için lag hesapla
        for (Map.Entry entry : offsets.entrySet()) {
            TopicPartition partition = entry.getKey();
            OffsetAndMetadata offsetMetadata = entry.getValue();
            
            // Topic son offset'ini al
            long endOffset = getEndOffset(partition);
            
            // Lag hesapla
            long lag = endOffset - offsetMetadata.offset();
            
            // Topic lag'ini güncelle
            topicLags.merge(partition.topic(), lag, Long::sum);
        }
        
        return topicLags;
    }
    
    private void throttleProducer(String topic, double throttleFactor) {
        // Producer'ı kısıtla
        // Bu, producer'ın linger.ms ve batch.size ayarlarını değiştirerek yapılabilir
        // veya özel bir throttling mekanizması uygulanabilir
        
        // Örnek: linger.ms değerini artır
        updateProducerConfig(topic, "linger.ms", (int) (10 / throttleFactor));
        
        // Örnek: batch.size değerini artır
        updateProducerConfig(topic, "batch.size", (int) (16384 / throttleFactor));
    }
    
    private void updateProducerConfig(String topic, String configKey, int configValue) {
        // Producer config'ini güncelle
        // Bu, Kafka Admin API veya özel bir config yönetimi ile yapılabilir
        // ...
        
        logger.info("Updated producer config for topic: " + topic + ", " + configKey + "=" + configValue);
    }
}

❓ Soru 67: Kafka'da exactly-once processing nasıl sağlanır?

Cevap: Exactly-once processing, her mesajın tam olarak bir kez işlendiğini garanti eden bir özelliktir. Kafka'da exactly-once processing sağlamak için:

  • Idempotent Producer: Aynı mesajın birden fazla kez gönderilmesini önler.
  • Transactions: Birden fazla partition'a atomik yazma işlemleri sağlar.
  • Read Committed Isolation Level: Sadece commit edilmiş mesajları okur.
  • Consumer Position Commit: Mesaj işlendikten sonra offset'leri commit eder.

Exactly-Once Processing Örneği:

@Service
public class ExactlyOnceProcessor {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    @KafkaListener(topics = "input-topic")
    @Transactional
    public void processMessage(ConsumerRecord record) {
        try {
            // Mesajı işle
            String result = process(record.value());
            
            // Sonucu output topic'ine gönder
            kafkaTemplate.send(new ProducerRecord<>("output-topic", record.key(), result));
            
            // İşlem başarılı, transaction commit edilecek
        } catch (Exception e) {
            // Hata durumunda transaction rollback edilecek
            throw new RuntimeException("Failed to process message", e);
        }
    }
    
    private String process(String message) {
        // Mesajı işle ve sonuç döndür
        return "PROCESSED: " + message;
    }
}

// Transactional Producer Örneği
@Service
public class TransactionalProducerService {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    @PostConstruct
    public void init() {
        // Transactional producer'ı başlat
        kafkaTemplate.setTransactionIdPrefix("tx-");
    }
    
    public void sendInTransaction(String topic, String key, String value) {
        // Transaction içinde mesaj gönder
        kafkaTemplate.executeInTransaction(template -> {
            template.send(new ProducerRecord<>(topic, key, value));
            return true;
        });
    }
}

// Exactly-Once Consumer Örneği
@Service
public class ExactlyOnceConsumer {
    @KafkaListener(topics = "input-topic")
    public void processMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
        try {
            // Mesajı işle
            process(record.value());
            
            // İşlem başarılı, acknowledgment'ı onayla
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // Hata durumunda acknowledgment'ı reddet
            acknowledgment.nack(1000); // 1 saniye sonra tekrar dene
            throw new RuntimeException("Failed to process message", e);
        }
    }
    
    private void process(String message) {
        // Mesajı işle
        // ...
    }
}

❓ Soru 68: Kafka'da schema evrimi (schema evolution) nasıl yönetilir?

Cevap: Schema evrimi, veri şemalarının zamanla değişmesini yönetme işlemidir. Kafka'da schema evrimi yönetmek için:

  • Schema Registry: Şemaları merkezi olarak yönetir.
  • Uyumluluk Kuralları: Şema değişikliklerinin uyumluluğunu kontrol eder.
  • Avro, Protobuf, JSON Schema: Schema evrimini destekleyen formatlar.

Schema Evrimi Örneği:

// İlk schema
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"}
  ]
}

// Yeni schema (yeni alan eklenmiş)
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "string"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "age", "type": ["null", "int"], "default": null}
  ]
}

// Java ile Schema Registry kullanımı
public class SchemaEvolutionExample {
    public static void main(String[] args) throws IOException, RestClientException {
        // Schema Registry istemcisi oluştur
        SchemaRegistryClient client = new CachedSchemaRegistryClient("http://localhost:8081", 100);
        
        // Schema'yı kaydet
        String schemaJson = "{\n" +
            "  \"type\": \"record\",\n" +
            "  \"name\": \"User\",\n" +
            "  \"fields\": [\n" +
            "    {\"name\": \"id\", \"type\": \"string\"},\n" +
            "    {\"name\": \"name\", \"type\": \"string\"},\n" +
            "    {\"name\": \"email\", \"type\": \"string\"}\n" +
            "  ]\n" +
            "}";
        
        Schema schema = new Schema.Parser().parse(schemaJson);
        SchemaMetadata metadata = client.register("users-value", schema);
        
        System.out.println("Schema registered with id: " + metadata.getId());
        
        // Schema'yı al
        Schema retrievedSchema = client.getById(metadata.getId());
        System.out.println("Retrieved schema: " + retrievedSchema.toString());
        
        // Yeni schema'yı kaydet (evrim)
        String newSchemaJson = "{\n" +
            "  \"type\": \"record\",\n" +
            "  \"name\": \"User\",\n" +
            "  \"fields\": [\n" +
            "    {\"name\": \"id\", \"type\": \"string\"},\n" +
            "    {\"name\": \"name\", \"type\": \"string\"},\n" +
            "    {\"name\": \"email\", \"type\": \"string\"},\n" +
            "    {\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\": null}\n" +
            "  ]\n" +
            "}";
        
        Schema newSchema = new Schema.Parser().parse(newSchemaJson);
        SchemaMetadata newMetadata = client.register("users-value", newSchema);
        
        System.out.println("New schema registered with id: " + newMetadata.getId());
        
        // Uyumluluk kontrolü
        boolean isCompatible = client.testCompatibility("users-value", newSchema);
        System.out.println("Is new schema compatible? " + isCompatible);
    }
}

❓ Soru 69: Kafka'da multi-tenancy nasıl uygulanır?

Cevap: Multi-tenancy, tek bir Kafka kümesinde birden fazla kiracı (tenant) için veri ayırma sağlayan bir yaklaşımdır. Kafka'da multi-tenancy uygulamak için:

  • Tenant Bazlı Topic'ler: Her tenant için ayrı topic'ler.
  • Tenant Bazlı Partition'lar: Her tenant için ayrı partition'lar.
  • Tenant Bazlı Consumer Grupları: Her tenant için ayrı consumer grupları.
  • Tenant Bazlı Yetkilendirme: Her tenant için ayrı ACL'ler.

Multi-Tenancy Örneği:

// Tenant bazlı topic isimlendirme
public class TopicNamingStrategy {
    public static String getTenantTopic(String tenantId, String baseTopic) {
        return tenantId + "-" + baseTopic;
    }
}

// Tenant bazlı producer
@Service
public class TenantAwareProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    public void sendToTenant(String tenantId, String topic, String key, Object value) {
        String tenantTopic = TopicNamingStrategy.getTenantTopic(tenantId, topic);
        
        // Tenant bilgisini header'a ekle
        ProducerRecord record = new ProducerRecord<>(
            tenantTopic, key, value
        );
        
        record.headers().add("tenant-id", tenantId.getBytes());
        
        kafkaTemplate.send(record);
    }
}

// Tenant bazlı consumer
@Service
public class TenantAwareConsumer {
    @KafkaListener(topics = "#{tenantAwareTopicResolver.getTopics()}")
    public void processMessage(ConsumerRecord record) {
        // Tenant ID'sini header'dan al
        String tenantId = getTenantId(record);
        
        // Tenant'a özgü iş mantığını uygula
        processForTenant(tenantId, record.value());
    }
    
    private String getTenantId(ConsumerRecord record) {
        // Topic adından tenant ID'sini çıkar
        String topic = record.topic();
        return topic.substring(0, topic.indexOf('-'));
    }
    
    private void processForTenant(String tenantId, String message) {
        // Tenant'a özgü iş mantığı
        // ...
    }
}

// Tenant bazlı yetkilendirme
@Service
public class TenantAwareAuthorizer {
    public boolean authorize(String tenantId, String resource, String operation) {
        // Tenant'ın kaynağa erişim yetkisi olup olmadığını kontrol et
        // ...
        return true;
    }
}

// Tenant bazlı konfigürasyon
@Configuration
public class TenantAwareConfig {
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        return new KafkaAdmin(configs);
    }
    
    @Bean
    public NewTopic createTenantTopics() {
        // Tenant'lar için topic'leri oluştur
        return TopicBuilder.name("tenant1-orders")
                .partitions(3)
                .replicas(1)
                .build();
    }
}

❓ Soru 70: Kafka'da veri bütünlüğü (data integrity) nasıl sağlanır?

Cevap: Veri bütünlüğü, Kafka'da verinin doğru ve tutarlı kalmasını sağlayan önlemlerdir. Kafka'da veri bütünlüğü sağlamak için:

  • Checksum: Veriye checksum ekleyerek bozulmayı tespit etme.
  • Validation: Veriyi işlemeden önce doğrulama.
  • Idempotent Operations: Aynı işlemin birden fazla kez uygulanmasını önleme.
  • Transactions: Birden fazla işlemi atomik olarak gerçekleştirme.

Veri Bütünlüğü Örneği:

// Checksum ile veri bütünlüğü
public class DataIntegrityUtils {
    public static String calculateChecksum(String data) {
        try {
            MessageDigest digest = MessageDigest.getInstance("SHA-256");
            byte[] hash = digest.digest(data.getBytes(StandardCharsets.UTF_8));
            return Hex.encodeHexString(hash);
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("Failed to calculate checksum", e);
        }
    }
    
    public static boolean verifyChecksum(String data, String checksum) {
        String calculatedChecksum = calculateChecksum(data);
        return calculatedChecksum.equals(checksum);
    }
}

// Producer ile checksum gönderme
@Service
public class ChecksumProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    public void sendWithChecksum(String topic, String key, String value) {
        // Veri ve checksum'ı içeren bir nesne oluştur
        DataWithChecksum dataWithChecksum = new DataWithChecksum(
            value,
            DataIntegrityUtils.calculateChecksum(value)
        );
        
        // Mesajı gönder
        kafkaTemplate.send(new ProducerRecord<>(topic, key, dataWithChecksum));
    }
}

// Consumer ile checksum doğrulama
@Service
public class ChecksumConsumer {
    @KafkaListener(topics = "input-topic")
    public void processMessage(ConsumerRecord record) {
        DataWithChecksum dataWithChecksum = record.value();
        
        // Checksum'ı doğrula
        if (!DataIntegrityUtils.verifyChecksum(
            dataWithChecksum.getData(),
            dataWithChecksum.getChecksum()
        )) {
            throw new DataIntegrityException("Checksum verification failed");
        }
        
        // Veriyi işle
        process(dataWithChecksum.getData());
    }
    
    private void process(String data) {
        // Veriyi işle
        // ...
    }
}

// Validation ile veri bütünlüğü
@Service
public class ValidationConsumer {
    @KafkaListener(topics = "input-topic")
    public void processMessage(ConsumerRecord record) {
        String data = record.value();
        
        // Veriyi doğrula
        ValidationResult validationResult = validate(data);
        
        if (!validationResult.isValid()) {
            // Geçersiz veriyi DLQ'ya gönder
            sendToDlq(record, validationResult.getErrorMessage());
            return;
        }
        
        // Geçerli veriyi işle
        process(data);
    }
    
    private ValidationResult validate(String data) {
        // Veriyi doğrula
        if (data == null || data.isEmpty()) {
            return ValidationResult.invalid("Data is null or empty");
        }
        
        if (data.length() > 1000) {
            return ValidationResult.invalid("Data is too long");
        }
        
        return ValidationResult.valid();
    }
}

// Transaction ile veri bütünlüğü
@Service
public class TransactionalProcessor {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    @Autowired
    private DatabaseService databaseService;
    
    @Transactional
    public void processInTransaction(String input) {
        // Veriyi işle
        String result = process(input);
        
        // Veritabanına yaz
        databaseService.save(result);
        
        // Output topic'ine gönder
        kafkaTemplate.send(new ProducerRecord<>("output-topic", result));
        
        // İşlem başarılı, transaction commit edilecek
    }
    
    private String process(String input) {
        // Veriyi işle ve sonuç döndür
        return "PROCESSED: " + input;
    }
}

RabbitMQ Mülakat Soruları

Bu dokümanda RabbitMQ mülakatlarında en çok sorulan konular soru-cevap mantığıyla açıklanmış ve örnek kodlar eklenmiştir.

🔹 Temel RabbitMQ Kavramları

❓ Soru 1: RabbitMQ nedir? Ne işe yarar?

Cevap: RabbitMQ, açık kaynaklı bir mesajlaşma kuyruğu sistemidir.

  • AMQP (Advanced Message Queuing Protocol) standardını uygular.
  • Farklı uygulamalar arasında asenkron iletişim sağlar.
  • Yüksek performanslı, güvenilir ve ölçeklenebilir bir mesajlaşma çözümüdür.

RabbitMQ Sunucusunu Başlatma Komutu:

rabbitmq-server

❓ Soru 2: RabbitMQ'da Exchange nedir? Türleri nelerdir?

Cevap: Exchange, gelen mesajları belirli kurallara göre kuyruklara yönlendiren bir bileşendir.

  • Direct Exchange: Mesajı routing key'e tam olarak eşleşen kuyruklara yönlendirir.
  • Topic Exchange: Mesajı routing key'inin bir desenine (pattern) göre kuyruklara yönlendirir.
  • Fanout Exchange: Mesajı bağlı tüm kuyruklara yayınlar.
  • Headers Exchange: Mesajı header'lara göre kuyruklara yönlendirir.

Direct Exchange Oluşturma Komutu:

rabbitmqadmin declare exchange name=direct-exchange type=direct

❓ Soru 3: RabbitMQ'da Queue nedir? Türleri nelerdir?

Cevap: Queue, mesajların depolandığı ve tüketiciye ulaştırıldığı bir yapıdır.

  • Classic Queue: Standart kuyruk türü.
  • Quorum Queue: Yüksek erişilebilirlik ve veri güvenliği için kullanılan kuyruk türü.
  • Stream Queue: Log tabanlı, yüksek performanslı kuyruk türü.

Queue Oluşturma Komutu:

rabbitmqadmin declare queue name=my-queue

❓ Soru 4: RabbitMQ'da Binding nedir?

Cevap: Binding, bir exchange ile bir kuyruk arasındaki ilişkiyi tanımlar.

  • Exchange'ten kuyruğa mesajların nasıl yönlendirileceğini belirler.
  • Direct ve Topic exchange'lerde routing key veya binding key kullanılır.

Binding Oluşturma Komutu:

rabbitmqadmin declare binding source=direct-exchange destination=my-queue routing_key=my-key

❓ Soru 5: RabbitMQ'da Producer ve Consumer arasındaki fark nedir?

Cevap:

  • Producer (Publisher): RabbitMQ'ya mesaj gönderen uygulamadır.
  • Consumer: RabbitMQ'dan mesaj alan uygulamadır.

Java Producer Örneği:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {
    
    channel.exchangeDeclare("direct-exchange", "direct");
    
    String message = "Hello, RabbitMQ!";
    channel.basicPublish("direct-exchange", "my-key", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
}

Java Consumer Örneği:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {
    
    channel.queueDeclare("my-queue", false, false, false, null);
    channel.queueBind("my-queue", "direct-exchange", "my-key");
    
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume("my-queue", true, deliverCallback, consumerTag -> {});
}

❓ Soru 6: RabbitMQ'da Routing Key nedir?

Cevap: Routing Key, bir mesajın hangi kuyruğa yönlendirileceğini belirten bir etikettir.

  • Direct ve Topic exchange'lerde kullanılır.
  • Direct exchange'te tam eşleşme, Topic exchange'te desen eşleşmesi sağlar.

Routing Key ile Mesaj Gönderme:

String routingKey = "user.created";
channel.basicPublish("topic-exchange", routingKey, null, message.getBytes());

❓ Soru 7: RabbitMQ'da Virtual Host nedir?

Cevap: Virtual Host, RabbitMQ sunucusu içindeki mantıksal bir bölümdür.

  • Farklı uygulamalar için izolasyon sağlar.
  • Her virtual host kendi exchange'lerine, kuyruklarına ve kullanıcılarına sahiptir.

Virtual Host Oluşturma Komutu:

rabbitmqctl add_vhost my-vhost

Virtual Host'a Kullanıcı Ekleme:

rabbitmqctl set_permissions -p my-vhost my-user ".*" ".*" ".*"

❓ Soru 8: RabbitMQ'da Connection ve Channel arasındaki fark nedir?

Cevap:

  • Connection: Uygulama ile RabbitMQ sunucusu arasındaki TCP bağlantısıdır.
  • Channel: Bir connection üzerinden açılan sanal bir bağlantıdır.

Her connection birden fazla channel içerebilir. Channel'lar, aynı TCP bağlantısı üzerinden birden fazla işlemi paralel olarak gerçekleştirmek için kullanılır.

❓ Soru 9: RabbitMQ'da Message Durability (Kalıcılık) nedir?

Cevap: Message Durability, mesajların RabbitMQ sunucusu yeniden başlatıldığında bile kaybolmamasını sağlayan bir özelliktir.

  • Persistent Messages: Disk üzerinde saklanan mesajlardır.
  • Transient Messages: Bellekte saklanan ve sunucu yeniden başlatıldığında kaybolan mesajlardır.

Kalıcı Mesaj Gönderme:

channel.basicPublish("direct-exchange", "my-key", 
                     MessageProperties.PERSISTENT_TEXT_PLAIN, 
                     message.getBytes());

❓ Soru 10: RabbitMQ'da Acknowledgment (Onay) nedir?

Cevap: Acknowledgment, bir mesajın başarıyla işlendiğini RabbitMQ'ya bildiren mekanizmadır.

  • Automatic Acknowledgment: Mesaj tüketiciye ulaştığında otomatik olarak onaylanır.
  • Manual Acknowledgment: Tüketici mesajı işledikten sonra manuel olarak onaylar.

Manual Acknowledgment Örneği:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
    
    // Mesajı işle
    processMessage(message);
    
    // Onayla
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume("my-queue", false, deliverCallback, consumerTag -> {});

🔹 RabbitMQ İleri Düzey Konular

❓ Soru 11: RabbitMQ'da Prefetch Count nedir? Ne işe yarar?

Cevap: Prefetch Count, bir tüketiciye aynı anda gönderilebilecek maksimum mesaj sayısıdır.

  • Tüketici performansını optimize etmek için kullanılır.
  • Düşük değerler, mesajların daha eşit dağıtılmasını sağlar.
  • Yüksek değerler, daha yüksek verimlilik sağlar.

Prefetch Count Ayarlama:

channel.basicQos(10); // Her seferinde en fazla 10 mesaj gönder

❓ Soru 12: RabbitMQ'da Dead Letter Exchange (DLX) nedir? Ne işe yarar?

Cevap: Dead Letter Exchange, işlenemeyen veya reddedilen mesajların yönlendirildiği bir exchange'tir.

  • Mesajlar süresi dolduğunda, kuyruk dolu olduğunda veya reddedildiğinde DLX'e gönderilir.
  • İşlenemeyen mesajları analiz etmek ve yeniden işlemek için kullanılır.

DLX Ayarlama:

Map args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx-exchange");
args.put("x-dead-letter-routing-key", "dlx-key");

channel.queueDeclare("my-queue", false, false, false, args);

❓ Soru 13: RabbitMQ'da Message TTL (Time-To-Live) nedir?

Cevap: Message TTL, bir mesajın kuyrukta ne kadar süre kalacağını belirten bir süre sınırlamasıdır.

  • Süre dolduğunda mesaj kuyruktan kaldırılır veya DLX'e gönderilir.
  • Kuyruk seviyesinde veya mesaj seviyesinde ayarlanabilir.

Kuyruk TTL Ayarlama:

Map args = new HashMap<>();
args.put("x-message-ttl", 60000); // 60 saniye

channel.queueDeclare("my-queue", false, false, false, args);

Mesaj TTL Ayarlama:

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .expiration("60000") // 60 saniye
    .build();

channel.basicPublish("direct-exchange", "my-key", properties, message.getBytes());

❓ Soru 14: RabbitMQ'da Publisher Confirms nedir?

Cevap: Publisher Confirms, bir mesajın RabbitMQ sunucusu tarafından alındığını onaylayan bir mekanizmadır.

  • Mesajların kaybolmamasını sağlar.
  • Asenkron veya senkron olarak kullanılabilir.

Publisher Confirms Örneği:

channel.confirmSelect();

// Senkron onay
channel.basicPublish("direct-exchange", "my-key", null, message.getBytes());
channel.waitForConfirmsOrDie(5000); // 5 saniye bekle

// Asenkron onay
channel.addConfirmListener((deliveryTag, multiple) -> {
    System.out.println("Message confirmed with tag: " + deliveryTag);
}, (deliveryTag, multiple) -> {
    System.out.println("Message not confirmed with tag: " + deliveryTag);
});

channel.basicPublish("direct-exchange", "my-key", null, message.getBytes());

❓ Soru 15: RabbitMQ'da Alternate Exchange nedir?

Cevap: Alternate Exchange, bir exchange'e gönderilen mesajların herhangi bir kuyruğa yönlendirilememesi durumunda yönlendirildiği bir exchange'tir.

  • Yanlış routing key ile gönderilen mesajları yakalamak için kullanılır.
  • Exchange oluşturulurken ayarlanır.

Alternate Exchange Ayarlama:

Map args = new HashMap<>();
args.put("alternate-exchange", "alternate-exchange");

channel.exchangeDeclare("main-exchange", "direct", true, false, args);

❓ Soru 16: RabbitMQ'da Priority Queue nedir?

Cevap: Priority Queue, mesajların önceliklerine göre işlendiği bir kuyruk türüdür.

  • Yüksek öncelikli mesajlar düşük öncelikli mesajlardan önce işlenir.
  • Kuyruk oluşturulurken maksimum öncelik seviyesi belirlenir.

Priority Queue Oluşturma:

Map args = new HashMap<>();
args.put("x-max-priority", 10); // Maksimum öncelik seviyesi

channel.queueDeclare("priority-queue", false, false, false, args);

Öncelikli Mesaj Gönderme:

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .priority(5) // 1-10 arası öncelik
    .build();

channel.basicPublish("direct-exchange", "my-key", properties, message.getBytes());

❓ Soru 17: RabbitMQ'da Lazy Queues nedir?

Cevap: Lazy Queues, mesajları disk üzerinde saklayarak bellek kullanımını optimize eden kuyruk türüdür.

  • Büyük miktarda mesajı saklamak için kullanılır.
  • Mesajlar tüketiciye gönderilene kadar diskte kalır.

Lazy Queue Oluşturma:

Map args = new HashMap<>();
args.put("x-queue-mode", "lazy");

channel.queueDeclare("lazy-queue", false, false, false, args);

❓ Soru 18: RabbitMQ'da Quorum Queues nedir?

Cevap: Quorum Queues, yüksek erişilebilirlik ve veri güvenliği için tasarlanmış bir kuyruk türüdür.

  • Raft konsensus algoritmasını kullanır.
  • Çoğunluk (quorum) ile karar verir.
  • Veri kaybını önlemek için en az 3 node gerektirir.

Quorum Queue Oluşturma:

Map args = new HashMap<>();
args.put("x-queue-type", "quorum");

channel.queueDeclare("quorum-queue", false, false, false, args);

❓ Soru 19: RabbitMQ'da Shovel eklentisi nedir? Ne işe yarar?

Cevap: Shovel eklentisi, bir RabbitMQ sunucusundan diğerine veya başka bir mesajlaşma sistemine mesaj kopyalamak için kullanılır.

  • Farklı sunucular arasında mesaj senkronizasyonu sağlar.
  • Yedekleme veya veri migrasyonu için kullanılabilir.

Shovel Yapılandırması:

rabbitmqctl set_parameter shovel my-shovel \
'{"src-uri": "amqp://source-server", "src-queue": "source-queue", \
"dest-uri": "amqp://destination-server", "dest-queue": "destination-queue"}'

❓ Soru 20: RabbitMQ'da Federation eklentisi nedir? Ne işe yarar?

Cevap: Federation eklentisi, farklı RabbitMQ sunucuları arasında mesajların güvenilir bir şekilde dağıtılmasını sağlar.

  • Coğrafi olarak dağılmış sunucular için kullanılır.
  • Shovel'dan farklı olarak, federation upstream sunucudaki exchange'leri downstream sunucudaki exchange'lere bağlar.

Federation Ayarlama:

rabbitmqctl set_parameter federation-upstream my-upstream \
'{"uri":"amqp://upstream-server"}'

rabbitmqctl set_parameter federation-upstream-set my-upstream-set \
'{"upstream-set":"my-upstream"}'

rabbitmqctl set_parameter policy federation \
'{"pattern":"", "federation-upstream-set":"my-upstream-set"}'

🔹 RabbitMQ İzleme ve Yönetim

❓ Soru 21: RabbitMQ'da monitoring için hangi araçlar kullanılır?

Cevap: RabbitMQ monitoring için kullanılan araçlar:

  • RabbitMQ Management Plugin: Web tabanlı yönetim arayüzü.
  • Prometheus + Grafana: Metrikleri toplamak ve görselleştirmek için.
  • rabbitmq_exporter: Prometheus için metrikleri dışa aktaran bir araç.
  • ELK Stack: Logları toplamak ve analiz etmek için.

Management Plugin Etkinleştirme:

rabbitmq-plugins enable rabbitmq_management

Prometheus ile Entegrasyon:

rabbitmq-plugins enable rabbitmq_prometheus

❓ Soru 22: RabbitMQ'da log yönetimi nasıl yapılır?

Cevap: RabbitMQ'da log yönetimi için:

  • Log Dosyaları: RabbitMQ log dosyaları /var/log/rabbitmq/ dizininde bulunur.
  • Log Seviyeleri: debug, info, warning, error, critical.
  • Log Döndürme: Log dosyalarının belirli boyuta ulaştığında döndürülmesi.

Log Yapılandırması:

# rabbitmq.conf
log.file.level = info
log.file = /var/log/rabbitmq/rabbit.log
log.exchange = true
log.queue = true

❓ Soru 23: RabbitMQ'da memory alarm nedir?

Cevap: Memory alarm, RabbitMQ sunucusunun bellek kullanımı belirli bir eşiği aştığında tetiklenen bir uyarıdır.

  • Bellek kritik seviyeye ulaştığında, yeni mesajları kabul etmeyi durdurur.
  • Publisher'lar bloke olur ve mesaj gönderemez.
  • Memory alarmı temizlendikten sonra normal çalışmaya devam eder.

Memory Alarm Eşiği Ayarlama:

# rabbitmq.conf
vm_memory_high_watermark.relative = 0.6 # %60 bellek kullanımı

❓ Soru 24: RabbitMQ'da disk alarm nedir?

Cevap: Disk alarm, RabbitMQ sunucusunun disk alanı belirli bir eşiği aştığında tetiklenen bir uyarıdır.

  • Disk alanı kritik seviyeye ulaştığında, yeni mesajları kabul etmeyi durdurur.
  • Publisher'lar bloke olur ve mesaj gönderemez.
  • Disk alarmı temizlendikten sonra normal çalışmaya devam eder.

Disk Alarm Eşiği Ayarlama:

# rabbitmq.conf
disk_free_limit.relative = 1.0 # %1 boş disk alanı

❓ Soru 25: RabbitMQ'da kullanıcı ve yetki yönetimi nasıl yapılır?

Cevap: RabbitMQ'da kullanıcı ve yetki yönetimi için:

  • Kullanıcı Oluşturma: rabbitmqctl add_user komutu ile.
  • Yetki Atama: rabbitmqctl set_permissions komutu ile.
  • Rol Atama: rabbitmqctl set_user_tags komutu ile.

Kullanıcı Oluşturma:

rabbitmqctl add_user my-user my-password

Yetki Atama:

rabbitmqctl set_permissions -p my-vhost my-user ".*" ".*" ".*"

Rol Atama:

rabbitmqctl set_user_tags my-user administrator

❓ Soru 26: RabbitMQ'da cluster nasıl kurulur?

Cevap: RabbitMQ cluster kurmak için:

  • ERLANG Cookie: Tüm node'ların aynı ERLANG cookie'ye sahip olması gerekir.
  • Node'ları Birleştirme: rabbitmqctl join_cluster komutu ile.
  • Mirror Queues: Kuyrukların cluster içindeki tüm node'larda kopyalanması.

ERLANG Cookie Ayarlama:

# /var/lib/rabbitmq/.erlang.cookie dosyasını tüm node'larda aynı yapın

Cluster Oluşturma:

# Node 1
rabbitmq-server -detached

# Node 2
rabbitmq-server -detached
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

# Node 3
rabbitmq-server -detached
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

❓ Soru 27: RabbitMQ'da yüksek erişilebilirlik (High Availability) nasıl sağlanır?

Cevap: RabbitMQ'da yüksek erişilebilirlik sağlamak için:

  • Cluster: Birden fazla node içeren bir cluster oluşturmak.
  • Mirror Queues: Kuyrukların cluster içindeki tüm node'larda kopyalanması.
  • Load Balancer: Client'ların cluster node'larına dağıtılması için.

Mirror Queue Ayarlama:

rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all"}'

Hem Mirror Queue Hem Synchronized Queue Ayarlama:

rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

❓ Soru 28: RabbitMQ'da backup nasıl alınır?

Cevap: RabbitMQ'da backup almak için:

  • Definitions: Exchange, kuyruk, binding ve kullanıcı tanımlarını dışa aktarmak.
  • Mnesia Database: Veritabanını kopyalamak.
  • Mesajların Backup'ı: Kuyruklardaki mesajları kurtarmak.

Definitions Dışa Aktarma:

rabbitmqctl export_definitions > definitions.json

Mnesia Database Backup:

rabbitmqctl stop
cd /var/lib/rabbitmq/mnesia/rabbit@$(hostname)
cp -r * /backup/location/
rabbitmq-server -detached

❓ Soru 29: RabbitMQ'da performans nasıl optimize edilir?

Cevap: RabbitMQ performansını optimize etmek için:

  • Prefetch Count: Tüketiciye gönderilecek mesaj sayısını ayarlamak.
  • Batch Processing: Mesajları toplu olarak işlemek.
  • Persistent vs. Non-Persistent: Gerekmedikçe kalıcı mesajlar kullanmamak.
  • Lazy Queues: Büyük miktarda mesaj için lazy kuyruklar kullanmak.
  • Connection Pooling: Bağlantı havuzu kullanmak.

Prefetch Count Ayarlama:

channel.basicQos(100); // Her seferinde en fazla 100 mesaj gönder

Lazy Queue Oluşturma:

Map args = new HashMap<>();
args.put("x-queue-mode", "lazy");

channel.queueDeclare("lazy-queue", false, false, false, args);

❓ Soru 30: RabbitMQ'da güvenlik nasıl sağlanır?

Cevap: RabbitMQ'da güvenlik sağlamak için:

  • SSL/TLS: Client ile sunucu arasındaki iletişimi şifrelemek.
  • Kullanıcı Doğrulama: Kullanıcı adı ve şifre ile doğrulama.
  • Yetkilendirme: Kullanıcıların kaynaklara erişimini kontrol etmek.
  • Virtual Hosts: Farklı uygulamaları izole etmek.

SSL/TLS Ayarlama:

# rabbitmq.conf
listeners.ssl.default = 5671
ssl_options.cacertfile = /path/to/cacert.pem
ssl_options.certfile = /path/to/cert.pem
ssl_options.keyfile = /path/to/key.pem

Java Client SSL Ayarlama:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5671);
factory.useSslProtocol();
factory.setUsername("my-user");
factory.setPassword("my-password");