100 adet ileri seviye soru ve cevapla hazırlanmış kapsamlı arşiv
70 adet ileri seviye Kafka mülakat sorusu ve cevabı
30 adet ileri seviye RabbitMQ mülakat sorusu ve cevabı
Bu dokümanda Apache Kafka mülakatlarında en çok sorulan konular soru-cevap mantığıyla açıklanmış ve örnek kodlar eklenmiştir.
Cevap: Apache Kafka, açık kaynaklı bir dağıtık olay akışı platformudur.
Kafka Sunucusunu Başlatma Komutu:
bin/kafka-server-start.sh config/server.properties
Cevap: Topic, Kafka'da mesajların kategorize edildiği mantıksal birimdir.
Topic Oluşturma Komutu:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic my-topic
Cevap: Partition, bir topic'in bölümleridir ve paralel işlemeye olanak tanır.
Cevap:
Java Producer Örneği:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "value"));
producer.close();
Cevap: Consumer Group, aynı topic'i tüketen consumer'ların koleksiyonudur.
Consumer Group ile Consumer Başlatma:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
Cevap: Broker, Kafka kümesindeki bir sunucudur.
Cevap: Zookeeper, Kafka kümesi için koordinasyon hizmeti sağlar.
Cevap: Offset, bir partition içindeki her mesajın benzersiz konumunu belirten bir sayıdır.
Cevap: Replication, veri kopyalarının farklı broker'lar üzerinde saklanmasıdır.
Replication Faktörü Ayarları:
# Topic oluştururken replication faktörü belirtme bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic replicated-topic
Cevap:
Leader partition çöktüğünde, follower'lardan biri yeni leader olarak seçilir.
Cevap: Kafka üç farklı mesajlaşma garantisi sunar:
Producer için Acks Ayarları:
// At most once
props.put("acks", "0");
// At least once (default)
props.put("acks", "1");
// Exactly once / En güvenli
props.put("acks", "all");
Cevap: Kafka Connector'lar, Kafka'yı diğer sistemlerle entegre etmek için kullanılan bileşenlerdir.
Örnek Connector Yapılandırması:
{
"name": "jdbc-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "user",
"connection.password": "password",
"mode": "bulk",
"topic.prefix": "jdbc-",
"tasks.max": "1"
}
}
Cevap: Kafka Streams, Kafka üzerinde gerçek zamanlı veri işleme uygulamaları geliştirmek için kullanılan bir kütüphanedir.
Basit Kafka Stream Örneği:
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); KStreamsource = builder.stream("text-input"); KTable counts = source .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .groupBy((key, value) -> value) .count(); counts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));
Cevap: Compaction, bir topic'te aynı anahtara sahip mesajların sadece en sonuncusunu tutma işlemidir.
Compaction Özellikli Topic Oluşturma:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic compacted-topic --config cleanup.policy=compact
Cevap: Retention Period, mesajların Kafka'da ne kadar süre kalacağını belirten süredir.
Retention Ayarları:
# Topic oluştururken retention ayarlama bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic retention-topic --config retention.ms=604800000 # Mevcut topic'in retention süresini değiştirme bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name retention-topic --alter --add-config retention.ms=259200000
Cevap: Schema Registry, Kafka'da kullanılan veri şemalarını (Avro, JSON, Protobuf) yönetmek için kullanılan bir servistir.
Schema Registry Kullanımı:
# Schema yükleme
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"}' \
http://localhost:8081/subjects/users-value/versions
Cevap: ISR, leader partition ile senkronize olan replikaların kümesidir.
ISR İle İlgili Ayarlar:
# Minimum ISR boyutu min.insync.replicas=2 # Replica senkronizasyon zaman aşımı replica.lag.time.max.ms=30000
Cevap: Consumer Rebalance, bir consumer grubundaki partition'ların consumer'lar arasında yeniden dağıtılması işlemidir.
Rebalance sırasında, consumer'lar geçici olarak veri işlemezler. Bu nedenle, uzun sürebilecek işlemler için dikkatli olunmalıdır.
Cevap:
Static Membership Ayarı:
props.put("group.instance.id", "consumer-1-instance");
Cevap: Kafka'da exactly-once semantik, iki mekanizma ile sağlanır:
Idempotent Producer Ayarı:
props.put("enable.idempotence", "true");
Transactional Producer Örneği:
props.put("transactional.id", "my-transactional-id");
Producer producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic1", "key", "value"));
producer.send(new ProducerRecord<>("topic2", "key", "value"));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
Cevap: Kafka kümesini ölçeklendirirken dikkat edilmesi gereken önemli noktalar:
Partition Yeniden Dağıtma Komutu:
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassign.json --execute
Cevap: Batch processing, Kafka'da birden fazla mesajın tek bir network isteğiyle gönderilmesidir.
Producer Batch Ayarları:
// Batch boyutu (bayt cinsinden)
props.put("batch.size", 16384);
// Bir mesajın ne kadar bekleneceği (milisaniye)
props.put("linger.ms", 5);
Cevap: Partition sayısını belirlerken dikkat edilmesi gereken faktörler:
Genel bir kural olarak, partition sayısı, beklenen maksimum paralel consumer sayısından biraz fazla olmalıdır.
Cevap: Kafka, mesajları göndermeden önce sıkıştırabilir, bu da network trafiğini azaltır.
Compression Ayarları:
// Compression türü
props.put("compression.type", "lz4");
// Sıkıştırma seviyesi (sadece gzip ve zstd için)
props.put("compression.level", "5");
Cevap: Consumer performansını artırmak için:
Consumer Performans Ayarları:
// Minimum byte bekleme
props.put("fetch.min.bytes", 1024);
// Maksimum bekleme süresi
props.put("fetch.max.wait.ms", 500);
// Bir partition'dan çekilecek maksimum veri
props.put("max.partition.fetch.bytes", 1048576);
// Bir poll'da döndürülecek maksimum kayıt
props.put("max.poll.records", 500);
Cevap: Producer performansını artırmak için:
Producer Performans Ayarları:
// Toplam buffer belleği
props.put("buffer.memory", 67108864);
// Yeniden deneme sayısı
props.put("retries", 3);
// Yeniden deneme aralığı
props.put("retry.backoff.ms", 100);
Cevap: Disk I/O performansını optimize etmek için:
Disk I/O Ayarları:
# Farklı disklerde log dizinleri log.dirs=/disk1/kafka-logs,/disk2/kafka-logs # Segment boyutu (varsayılan: 1GB) log.segment.bytes=1073741824 # I/O thread sayısı num.io.threads=8 # Flush aralığı (mesaj sayısı) log.flush.interval.messages=10000 # Flush aralığı (milisaniye) log.flush.interval.ms=1000
Cevap: Network performansını optimize etmek için:
Network Ayarları:
# Socket buffer boyutları socket.send.buffer.bytes=1024000 socket.receive.buffer.bytes=1024000 # Maksimum istek boyutu socket.request.max.bytes=104857600 # Network thread sayısı num.network.threads=4
Cevap: JVM ayarlarını optimize etmek için:
JVM Ayarları:
# Kafka başlatma komutunda JVM ayarları export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G" export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
Cevap: Kafka monitoring için önemli metrikler:
Bu metrikler, Kafka kümesinin sağlığı ve performansı hakkında önemli bilgiler sağlar.
Cevap: Kafka'da güvenlik sağlamak için kullanılan yöntemler:
SSL Ayarları:
# Broker SSL ayarları listeners=SSL://:9093 ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234
SASL Ayarları:
# Broker SASL ayarları listeners=SASL_SSL://:9093 sasl.mechanism.inter.broker.protocol=GSSAPI sasl.enabled.mechanisms=GSSAPI security.inter.broker.protocol=SASL_SSL
Cevap: ACL, Kafka'da kaynaklara (topic, cluster, group vb.) erişimi kontrol etmek için kullanılır.
ACL Örnekleri:
# User1'in test-topic üzerinde WRITE izni bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:user1 --operation Write --topic test-topic # User1'in test-consumer-group üzerinde READ izni bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:user1 --operation Read --group test-consumer-group # User2'nin tüm topic'leri DESCRIBE etmesine izin verme bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:user2 --operation Describe --topic *
Cevap: SASL/SCRAM, kullanıcı adı ve şifre ile kimlik doğrulama sağlayan bir mekanizmadır.
SCRAM Yapılandırması:
# Broker ayarları listeners=SASL_SSL://:9093 sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 security.inter.broker.protocol=SASL_SSL # JAAS konfigürasyonu listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="admin" \ password="admin-secret";
Kullanıcı Oluşturma:
# Kullanıcı ekleme bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice
Cevap: SSL/TLS, Kafka'da iletişimi şifrelemek için kullanılır.
SSL Sertifikası Oluşturma:
# CA sertifikası oluşturma openssl req -new -newkey rsa:2048 -days 365 -x509 -keyout ca-key -out ca-cert -subj "/C=TR/ST=Istanbul/L=Istanbul/O=MyOrg/CN=CA" # Broker sertifikası oluşturma keytool -genkey -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity 365 -storepass test1234 -keypass test1234 -dname "CN=localhost" # CA ile sertifikayı imzalama keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed # Truststore oluşturma keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert -storepass test1234
Broker SSL Ayarları:
listeners=SSL://:9093 ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234
Cevap: Kafka'da veri şifreleme için iki yaklaşım vardır:
Uygulama Katmanında Şifreleme Örneği:
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Base64;
public class EncryptionUtil {
private static final String ALGORITHM = "AES";
public static String encrypt(String data, String key) throws Exception {
SecretKeySpec secretKey = new SecretKeySpec(key.getBytes(), ALGORITHM);
Cipher cipher = Cipher.getInstance(ALGORITHM);
cipher.init(Cipher.ENCRYPT_MODE, secretKey);
byte[] encryptedBytes = cipher.doFinal(data.getBytes());
return Base64.getEncoder().encodeToString(encryptedBytes);
}
public static String decrypt(String encryptedData, String key) throws Exception {
SecretKeySpec secretKey = new SecretKeySpec(key.getBytes(), ALGORITHM);
Cipher cipher = Cipher.getInstance(ALGORITHM);
cipher.init(Cipher.DECRYPT_MODE, secretKey);
byte[] decryptedBytes = cipher.doFinal(Base64.getDecoder().decode(encryptedData));
return new String(decryptedBytes);
}
}
// Producer kullanımı
String encryptedData = EncryptionUtil.encrypt("sensitive data", "my-secret-key");
producer.send(new ProducerRecord<>("encrypted-topic", encryptedData));
Cevap: Kafka monitoring için kullanılan araçlar:
Prometheus JMX Exporter Yapılandırması:
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['localhost:7071'] # JMX Exporter portu
Cevap: Kafka'da log yönetimi için:
log4j Yapılandırması:
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n"/>
</Console>
<RollingFile name="RollingFile" fileName="logs/kafka.log" filePattern="logs/kafka-%d{yyyy-MM-dd}-%i.log">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n"/>
<Policies>
<TimeBasedTriggeringPolicy interval="1" modulate="true"/>
<SizeBasedTriggeringPolicy size="100 MB"/>
</Policies>
<DefaultRolloverStrategy max="10"/>
</RollingFile>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Console"/>
<AppenderRef ref="RollingFile"/>
</Root>
</Loggers>
</Configuration>
Cevap: Kafka'da uyarı sistemi kurmak için:
Prometheus Alertmanager Yapılandırması:
# alertmanager.yml
global:
smtp_smarthost: 'localhost:587'
smtp_from: 'alertmanager@example.com'
route:
group_by: ['alertname', 'severity']
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: 'web.hook'
receivers:
- name: 'web.hook'
email_configs:
- to: 'admin@example.com'
# Prometheus alert rules
groups:
- name: kafka.rules
rules:
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_replicamanager_underreplicatedpartitions > 0
for: 5m
labels:
severity: critical
annotations:
summary: "Kafka under-replicated partitions (instance {{ $labels.instance }})"
description: "Kafka has {{ $value }} under-replicated partitions for more than 5 minutes."
Cevap: Kafka'da JMX metriklerini izlemek için:
JMX Portu Açma:
# Kafka başlatma komutunda JMX ayarları export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
JMX Exporter Yapılandırması:
# jmx-exporter-config.yml
rules:
- pattern: "kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value"
name: "kafka_server_replicamanager_underreplicatedpartitions"
type: GAUGE
value: "{{ $value }}"
Cevap: Kafka'da audit log oluşturmak için:
Audit Log Yapılandırması:
# Broker ayarları authorizer.class.name=kafka.security.authorizer.AclAuthorizer # Audit log ayarları kafka.authorizer.logger.listeners=kafka.security.authorizer.AclAuthorizer$AuditLogListener kafka.authorizer.logger.listeners.log.dir=/var/log/kafka/audit
Custom Interceptor Örneği:
public class AuditProducerInterceptor implements ProducerInterceptor{ private static final Logger LOG = LoggerFactory.getLogger(AuditProducerInterceptor.class); @Override public ProducerRecord onSend(ProducerRecord record) { LOG.info("Sending message to topic: {}, key: {}, value: {}", record.topic(), record.key(), record.value()); return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (exception != null) { LOG.error("Error sending message", exception); } else { LOG.info("Message sent successfully to topic: {}, partition: {}, offset: {}", metadata.topic(), metadata.partition(), metadata.offset()); } } @Override public void close() {} @Override public void configure(Map configs) {} }
Cevap: Consumer sürekli rebalance oluyorsa:
Consumer Ayarları:
// Poll aralığı (varsayılan: 300000 ms)
props.put("max.poll.interval.ms", 600000);
// Heartbeat aralığı (varsayılan: 3000 ms)
props.put("heartbeat.interval.ms", 1000);
// Oturum zaman aşımı (varsayılan: 10000 ms)
props.put("session.timeout.ms", 30000);
Cevap: Mesaj kaybını önlemek için:
Producer Ayarları:
// Tüm replikaların onayını bekle
props.put("acks", "all");
// Yeniden deneme sayısı
props.put("retries", 3);
// Idempotent producer
props.put("enable.idempotence", "true");
Broker Ayarları:
# Minimum ISR sayısı min.insync.replicas=2
Cevap: Consumer lag sorununu çözmek için:
Consumer Lag Kontrolü:
# Consumer lag kontrolü bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer-group
Partition Sayısını Artırma:
# Partition sayısını artırma bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my-topic --partitions 10
Cevap: Broker disk alanı dolduğunda:
Retention Süresini Azaltma:
# Topic için retention süresini azaltma bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config retention.ms=86400000
Log Segment Boyutunu Ayarlama:
# Topic için log segment boyutunu ayarlama bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config log.segment.bytes=536870912
Cevap: Kafka kümesinde broker çöktüğünde:
Broker Durumunu Kontrol Etme:
# Broker durumunu kontrol etme bin/kafka-broker-api-versions --bootstrap-server localhost:9092 # Topic durumunu kontrol etme bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic
Cevap: Duplicate mesajları önlemek için:
Idempotent Producer Ayarı:
props.put("enable.idempotence", "true");
Transactional Producer Örneği:
props.put("transactional.id", "my-transactional-id");
Producer producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic1", "key", "value"));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
Consumer Tarafında Tekilleştirme:
public class DeduplicatingConsumer {
private Set processedIds = new HashSet<>();
public void process(ConsumerRecord record) {
String messageId = record.key() + "-" + record.offset();
if (processedIds.contains(messageId)) {
return; // Zaten işlenmiş
}
// Mesajı işle
processMessage(record.value());
// İşlenmiş olarak işaretle
processedIds.add(messageId);
}
}
Cevap: Mesaj sırasını korumak için:
Producer Ayarları:
// Sıralı gönderim için
props.put("max.in.flight.requests.per.connection", "1");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", "true");
// Aynı key'e sahip mesajlar aynı partition'a gider
producer.send(new ProducerRecord<>("my-topic", "same-key", "value1"));
producer.send(new ProducerRecord<>("my-topic", "same-key", "value2"));
Consumer Tarafında Sıralı İşleme:
public class OrderedConsumer {
public void consume() {
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
// Sıralı olarak işle
processInOrder(record);
}
}
}
private void processInOrder(ConsumerRecord record) {
// Mesajı sıralı olarak işle
System.out.println("Processing: " + record.value());
}
}
Cevap: Mesajlar tüketiciye ulaşıyor ama işlenmiyorsa:
Consumer Hata Ayıklama:
try {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
try {
// Mesajı işle
processMessage(record);
} catch (Exception e) {
// Hata durumunda log yaz
logger.error("Error processing message: " + record.value(), e);
// Hatalı mesajı özel bir topic'e gönder
sendToDlq(record);
}
}
// Başarılı mesajları commit et
consumer.commitSync();
} catch (Exception e) {
logger.error("Error in consumer", e);
}
Cevap: Producer mesaj gönderemiyorsa:
Bağlantı Testi:
# Broker'a bağlantı testi telnet localhost 9092 # Topic listesini kontrol etme bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Producer Hata Ayıklama:
try {
ProducerRecord record = new ProducerRecord<>("my-topic", "key", "value");
// Senkron gönderim ve hata kontrolü
RecordMetadata metadata = producer.send(record).get();
System.out.println("Message sent to partition " + metadata.partition() +
" with offset " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
// Hata durumunda detaylı bilgi
if (e.getCause() instanceof TimeoutException) {
System.err.println("Timeout while sending message");
} else if (e.getCause() instanceof NotLeaderForPartitionException) {
System.err.println("Not leader for partition");
} else {
System.err.println("Error sending message: " + e.getCause().getMessage());
}
}
Cevap: Kafka kümesinde yüksek gecikme yaşıyorsanız:
Gecikme Ölçümü:
# Producer gecikmesi ölçümü bin/kafka-run-class.sh kafka.tools.EndToEndLatency \ broker-list localhost:9092 \ topic test-topic \ num-records 1000 \ record-size 1000 \ producer-props acks=all,linger.ms=0 \ consumer-props group.id=test-group # Network gecikmesi testi ping -c 10 kafka-broker-host
Producer ve Consumer Ayarları:
// Producer için düşük gecikme ayarları
props.put("linger.ms", 0); // Bekleme süresini sıfırla
props.put("batch.size", 0); // Batch'leri devre dışı bırak
props.put("compression.type", "none"); // Sıkıştırmayı devre dışı bırak
// Consumer için düşük gecikme ayarları
props.put("fetch.min.bytes", 1); // Minimum bekleme boyutu
props.put("fetch.max.wait.ms", 100); // Maksimum bekleme süresi
Cevap: Kafka ve Spark Streaming entegrasyonu için:
Spark Streaming Kafka Entegrasyonu:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val spark = SparkSession.builder()
.appName("KafkaSparkIntegration")
.master("local[*]")
.getOrCreate()
// Kafka'dan veri okuma
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input-topic")
.load()
// Mesaj değerlerini string olarak dönüştürme
val values = df.selectExpr("CAST(value AS STRING)")
// Veriyi işleme
val processed = values.withColumn("processed", upper(col("value")))
// Kafka'ya yazma
val query = processed.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
query.awaitTermination()
Cevap: Kafka ve Flink entegrasyonu için:
Flink Kafka Entegrasyonu:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
public class KafkaFlinkIntegration {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-consumer");
// Kafka'dan veri okuma
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(
"input-topic",
new SimpleStringSchema(),
properties
);
DataStream stream = env.addSource(consumer);
// Veriyi işleme
DataStream processed = stream.map(String::toUpperCase);
// Kafka'ya yazma
FlinkKafkaProducer producer = new FlinkKafkaProducer<>(
"localhost:9092",
"output-topic",
new SimpleStringSchema()
);
processed.addSink(producer);
env.execute("Kafka Flink Integration");
}
}
Cevap: Kafka ve Storm entegrasyonu için:
Storm Kafka Entegrasyonu:
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.StringKeyValueScheme;
import org.apache.storm.topology.TopologyBuilder;
public class KafkaStormIntegration {
public static void main(String[] args) throws Exception {
BrokerHosts hosts = new ZkHosts("localhost:2181");
// Kafka Spout yapılandırması
SpoutConfig spoutConfig = new SpoutConfig(hosts, "input-topic", "/zookeeper", "storm-consumer");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
// Kafka Bolt yapılandırması
KafkaBolt kafkaBolt = new KafkaBolt()
.withProducerProperties(new Properties() {{
put("bootstrap.servers", "localhost:9092");
put("acks", "1");
put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}})
.withTopicSelector("output-topic")
.withTupleToKafkaMapper(new FieldNameTupleToKafkaMapper("key", "value"));
// Topoloji oluşturma
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);
builder.setBolt("upper-bolt", new UpperCaseBolt(), 1).shuffleGrouping("kafka-spout");
builder.setBolt("kafka-bolt", kafkaBolt, 1).shuffleGrouping("upper-bolt");
// Topolojiyi çalıştırma
Config config = new Config();
config.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka-storm-integration", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
Cevap: Kafka ve Elasticsearch entegrasyonu için:
Kafka Connect Elasticsearch Connector Yapılandırması:
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "input-topic",
"connection.url": "http://localhost:9200",
"type.name": "_doc",
"key.ignore": "true",
"schema.ignore": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}
Logstash Konfigürasyonu:
# logstash.conf
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["input-topic"]
}
}
filter {
# Gerekli dönüşümleri burada yapın
mutate {
rename => { "message" => "event_message" }
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "kafka-events"
document_type => "_doc"
}
}
Cevap: Kafka ve Cassandra entegrasyonu için:
Kafka Connect Cassandra Connector Yapılandırması:
{
"name": "cassandra-sink",
"config": {
"connector.class": "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "input-topic",
"connect.cassandra.contact.points": "localhost",
"connect.cassandra.port": "9042",
"connect.cassandra.key.space": "test_keyspace",
"connect.cassandra.consistency.level": "ONE",
"connect.cassandra.error.policy": "NOOP",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}
Spark Cassandra Connector Örneği:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector._
val spark = SparkSession.builder()
.appName("KafkaCassandraIntegration")
.master("local[*]")
.config("spark.cassandra.connection.host", "localhost")
.config("spark.cassandra.connection.port", "9042")
.getOrCreate()
// Kafka'dan veri okuma
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input-topic")
.load()
.selectExpr("CAST(value AS STRING) as json")
.select(from_json("json", "name STRING, age INT").as("data"))
.select("data.*")
// Cassandra'ya yazma
val query = df.writeStream
.foreachBatch { (batchDF: org.apache.spark.sql.DataFrame, batchId: Long) =>
batchDF.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "users", "keyspace" -> "test_keyspace"))
.mode("append")
.save()
}
.start()
query.awaitTermination()
Cevap: Kafka ve Hadoop HDFS entegrasyonu için:
Kafka Connect HDFS Connector Yapılandırması:
{
"name": "hdfs-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "input-topic",
"hdfs.url": "hdfs://localhost:9000",
"hadoop.conf.dir": "/etc/hadoop/conf",
"flush.size": "3",
"rotate.interval.ms": "1000",
"logs.dir": "/kafka-data",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}
Spark HDFS Örneği:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.appName("KafkaHdfsIntegration")
.master("local[*]")
.config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000")
.getOrCreate()
// Kafka'dan veri okuma
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input-topic")
.load()
.selectExpr("CAST(value AS STRING)")
// HDFS'e yazma
val query = df.writeStream
.format("parquet")
.option("path", "hdfs://localhost:9000/kafka-data")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
query.awaitTermination()
Cevap: Kafka ve MongoDB entegrasyonu için:
Kafka Connect MongoDB Connector Yapılandırması:
{
"name": "mongodb-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"topics": "input-topic",
"connection.uri": "mongodb://localhost:27017/kafka_db",
"database": "kafka_db",
"collection": "events",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}
Spark MongoDB Örneği:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import com.mongodb.spark.config._
val spark = SparkSession.builder()
.appName("KafkaMongoDBIntegration")
.master("local[*]")
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/kafka_db.events")
.config("spark.mongodb.output.uri", "mongodb://localhost:27017/kafka_db.events")
.getOrCreate()
// Kafka'dan veri okuma
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input-topic")
.load()
.selectExpr("CAST(value AS STRING) as json")
.select(from_json("json", "name STRING, age INT").as("data"))
.select("data.*")
// MongoDB'ye yazma
val query = df.writeStream
.foreachBatch { (batchDF: org.apache.spark.sql.DataFrame, batchId: Long) =>
batchDF.write
.format("com.mongodb.spark.sql.DefaultSource")
.mode("append")
.save()
}
.start()
query.awaitTermination()
Cevap: Kafka ve Redis entegrasyonu için:
Kafka Connect Redis Connector Yapılandırması:
{
"name": "redis-sink",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
"tasks.max": "1",
"topics": "input-topic",
"redis.host": "localhost",
"redis.port": "6379",
"redis.command": "SET",
"redis.key": "kafka-message",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}
Java Redis Entegrasyonu:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import redis.clients.jedis.Jedis;
import java.util.Collections;
import java.util.Properties;
public class KafkaRedisIntegration {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "redis-consumer");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("input-topic"));
Jedis jedis = new Jedis("localhost", 6379);
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
// Redis'e yaz
jedis.set("kafka:" + record.offset(), record.value());
System.out.println("Written to Redis: " + record.value());
}
}
}
}
Cevap: Kafka ve PostgreSQL entegrasyonu için:
Kafka Connect JDBC Connector Yapılandırması:
{
"name": "postgres-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "input-topic",
"connection.url": "jdbc:postgresql://localhost:5432/kafka_db",
"connection.user": "postgres",
"connection.password": "password",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "insert",
"pk.mode": "none",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}
Debezium PostgreSQL Connector Yapılandırması:
{
"name": "postgres-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname": "postgres",
"database.server.name": "postgres-server",
"plugin.name": "pgoutput"
}
}
Cevap: Kafka ve MySQL entegrasyonu için:
Kafka Connect JDBC Connector Yapılandırması:
{
"name": "mysql-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "input-topic",
"connection.url": "jdbc:mysql://localhost:3306/kafka_db",
"connection.user": "root",
"connection.password": "password",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "insert",
"pk.mode": "none",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}
Debezium MySQL Connector Yapılandırması:
{
"name": "mysql-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "password",
"database.server.id": "184054",
"database.server.name": "mysql-server",
"database.include.list": "kafka_db",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.kafka_db"
}
}
Cevap: CQRS pattern, yazma (command) ve okuma (query) işlemlerini ayıran bir mimari pattern'idir. Kafka ile CQRS uygulamak için:
CQRS Mimarisi Örneği:
// Command Service
public class CommandService {
private final KafkaTemplate kafkaTemplate;
public void createUser(CreateUserCommand command) {
// Komutu doğrula
// ...
// Olay oluştur
UserCreatedEvent event = new UserCreatedEvent(
command.getUserId(),
command.getUsername(),
command.getEmail()
);
// Olayı Kafka'ya gönder
kafkaTemplate.send("user-commands", event);
}
}
// Event Processor
public class EventProcessor {
@KafkaListener(topics = "user-commands")
public void processUserCreatedEvent(UserCreatedEvent event) {
// Olayı işle ve veritabanına yaz
User user = new User(
event.getUserId(),
event.getUsername(),
event.getEmail()
);
userRepository.save(user);
// Okuma modelini güncelle
UserView userView = new UserView(
event.getUserId(),
event.getUsername(),
event.getEmail()
);
userViewRepository.save(userView);
}
}
// Query Service
public class QueryService {
public UserView getUser(String userId) {
return userViewRepository.findById(userId);
}
}
Cevap: Event Sourcing, uygulama durumunu olayların bir dizisi olarak saklayan bir pattern'idir. Kafka ile Event Sourcing uygulamak için:
Event Sourcing Örneği:
// Event Interface
public interface Event {
String getAggregateId();
Instant getTimestamp();
}
// Aggregate Class
public class UserAggregate {
private String userId;
private String username;
private String email;
private List events = new ArrayList<>();
public static UserAggregate create(String userId, String username, String email) {
UserAggregate aggregate = new UserAggregate();
aggregate.apply(new UserCreatedEvent(userId, username, email));
return aggregate;
}
public void updateEmail(String email) {
apply(new UserEmailUpdatedEvent(userId, email));
}
private void apply(Event event) {
// Olayı uygula
if (event instanceof UserCreatedEvent) {
UserCreatedEvent e = (UserCreatedEvent) event;
this.userId = e.getUserId();
this.username = e.getUsername();
this.email = e.getEmail();
} else if (event instanceof UserEmailUpdatedEvent) {
UserEmailUpdatedEvent e = (UserEmailUpdatedEvent) event;
this.email = e.getEmail();
}
// Olayı listeye ekle
events.add(event);
}
public List getUncommittedEvents() {
return new ArrayList<>(events);
}
public void markEventsAsCommitted() {
events.clear();
}
}
// Event Store
public class EventStore {
private final KafkaTemplate kafkaTemplate;
public void saveEvents(String aggregateId, List events) {
events.forEach(event -> {
kafkaTemplate.send("user-events", aggregateId, event);
});
}
public List getEvents(String aggregateId) {
// Kafka'dan olayları oku
// ...
return events;
}
}
// Aggregate Repository
public class AggregateRepository {
private final EventStore eventStore;
public void save(UserAggregate aggregate) {
eventStore.saveEvents(aggregate.getUserId(), aggregate.getUncommittedEvents());
aggregate.markEventsAsCommitted();
}
public UserAggregate findById(String userId) {
List events = eventStore.getEvents(userId);
UserAggregate aggregate = new UserAggregate();
// Olayları sırayla uygula
events.forEach(aggregate::apply);
return aggregate;
}
}
Cevap: Saga pattern, dağıtık sistemlerde transaction yönetimi için kullanılan bir pattern'idir. Kafka ile Saga uygulamak için:
Choreography-Based Saga Örneği:
// Order Service
@Service
public class OrderService {
@Autowired
private KafkaTemplate kafkaTemplate;
@Transactional
public Order createOrder(OrderRequest orderRequest) {
// Siparişi oluştur
Order order = new Order(orderRequest);
orderRepository.save(order);
// OrderCreatedEvent gönder
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getCustomerId(),
order.getAmount()
);
kafkaTemplate.send("orders", event);
return order;
}
@KafkaListener(topics = "payment-completed")
public void handlePaymentCompleted(PaymentCompletedEvent event) {
// Ödeme tamamlandı, siparişi güncelle
Order order = orderRepository.findById(event.getOrderId());
order.setStatus(OrderStatus.PAID);
orderRepository.save(order);
// OrderCompletedEvent gönder
OrderCompletedEvent orderEvent = new OrderCompletedEvent(order.getId());
kafkaTemplate.send("orders", orderEvent);
}
@KafkaListener(topics = "payment-failed")
public void handlePaymentFailed(PaymentFailedEvent event) {
// Ödeme başarısız, siparişi iptal et
Order order = orderRepository.findById(event.getOrderId());
order.setStatus(OrderStatus.CANCELLED);
orderRepository.save(order);
// OrderCancelledEvent gönder
OrderCancelledEvent orderEvent = new OrderCancelledEvent(order.getId());
kafkaTemplate.send("orders", orderEvent);
}
}
// Payment Service
@Service
public class PaymentService {
@Autowired
private KafkaTemplate kafkaTemplate;
@KafkaListener(topics = "orders")
public void handleOrderCreated(OrderCreatedEvent event) {
try {
// Ödeme işlemini yap
Payment payment = paymentService.processPayment(
event.getOrderId(),
event.getAmount()
);
// PaymentCompletedEvent gönder
PaymentCompletedEvent paymentEvent = new PaymentCompletedEvent(
payment.getOrderId(),
payment.getId()
);
kafkaTemplate.send("payments", paymentEvent);
} catch (PaymentException e) {
// PaymentFailedEvent gönder
PaymentFailedEvent paymentEvent = new PaymentFailedEvent(
event.getOrderId(),
e.getMessage()
);
kafkaTemplate.send("payments", paymentEvent);
}
}
}
Cevap: Dead Letter Queue, işlenemeyen mesajları saklamak için kullanılan bir pattern'idir. Kafka ile DLQ uygulamak için:
Dead Letter Queue Örneği:
@Service
public class MessageProcessor {
@Autowired
private KafkaTemplate kafkaTemplate;
@Value("${kafka.topic.dlq}")
private String dlqTopic;
@KafkaListener(topics = "input-topic")
public void processMessage(ConsumerRecord record) {
try {
// Mesajı işle
process(record.value());
// Başarılı ise offset'i commit et
// ...
} catch (Exception e) {
// Hata durumunda log yaz
logger.error("Error processing message: " + record.value(), e);
// Mesajı DLQ'ya gönder
sendToDlq(record, e);
}
}
private void sendToDlq(ConsumerRecord record, Exception e) {
// Orijinal mesaj bilgilerini ve hatayı içeren bir DLQ mesajı oluştur
DlqMessage dlqMessage = new DlqMessage(
record.value(),
record.topic(),
record.partition(),
record.offset(),
e.getMessage(),
Instant.now()
);
// DLQ topic'ine gönder
kafkaTemplate.send(dlqTopic, dlqMessage);
}
}
// Retry Mekanizması
@Service
public class RetryableMessageProcessor {
@Autowired
private KafkaTemplate kafkaTemplate;
@Value("${kafka.topic.retry}")
private String retryTopic;
@Value("${kafka.topic.dlq}")
private String dlqTopic;
@KafkaListener(topics = "input-topic")
public void processWithRetry(ConsumerRecord record) {
int maxRetries = 3;
int retryCount = 0;
boolean success = false;
while (retryCount < maxRetries && !success) {
try {
// Mesajı işle
process(record.value());
success = true;
} catch (Exception e) {
retryCount++;
if (retryCount >= maxRetries) {
// Maksimum deneme sayısına ulaşıldı, DLQ'ya gönder
sendToDlq(record, e);
} else {
// Retry topic'ine gönder
sendToRetryTopic(record, retryCount, e);
// Bir sonraki deneme için bekle
try {
Thread.sleep(1000 * retryCount);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
}
}
private void sendToRetryTopic(ConsumerRecord record, int retryCount, Exception e) {
RetryMessage retryMessage = new RetryMessage(
record.value(),
record.topic(),
record.partition(),
record.offset(),
retryCount,
e.getMessage(),
Instant.now()
);
kafkaTemplate.send(retryTopic, retryMessage);
}
}
Cevap: Idempotent consumer, aynı mesajın birden fazla kez işlenmesini önleyen bir pattern'idir. Kafka ile idempotent consumer uygulamak için:
Idempotent Consumer Örneği:
@Service
public class IdempotentConsumer {
@Autowired
private ProcessedMessageRepository processedMessageRepository;
@KafkaListener(topics = "input-topic")
public void processMessage(ConsumerRecord record) {
// Mesaj ID'sini oluştur
String messageId = record.key() + "-" + record.offset();
// Mesajın daha önce işlenip işlenmediğini kontrol et
if (processedMessageRepository.existsById(messageId)) {
logger.info("Message already processed: " + messageId);
return;
}
try {
// Mesajı işle
process(record.value());
// Mesajı işlenmiş olarak işaretle
ProcessedMessage processedMessage = new ProcessedMessage(
messageId,
record.topic(),
record.partition(),
record.offset(),
Instant.now()
);
processedMessageRepository.save(processedMessage);
} catch (Exception e) {
// Hata durumunda log yaz ve işlemi geri al
logger.error("Error processing message: " + messageId, e);
throw new RuntimeException("Failed to process message", e);
}
}
private void process(String message) {
// Mesajı işle
// ...
}
}
// Redis ile Idempotent Consumer
@Service
public class RedisIdempotentConsumer {
@Autowired
private RedisTemplate redisTemplate;
@KafkaListener(topics = "input-topic")
public void processMessage(ConsumerRecord record) {
// Mesaj ID'sini oluştur
String messageId = record.key() + "-" + record.offset();
// Redis'te mesaj ID'sini kontrol et
Boolean isProcessed = redisTemplate.hasKey(messageId);
if (Boolean.TRUE.equals(isProcessed)) {
logger.info("Message already processed: " + messageId);
return;
}
try {
// Mesajı işle
process(record.value());
// Mesajı Redis'te işlenmiş olarak işaretle
redisTemplate.opsForValue().set(messageId, "PROCESSED");
// Mesajın ne kadar süre kalacağını ayarla (örneğin 7 gün)
redisTemplate.expire(messageId, 7, TimeUnit.DAYS);
} catch (Exception e) {
// Hata durumunda log yaz ve işlemi geri al
logger.error("Error processing message: " + messageId, e);
throw new RuntimeException("Failed to process message", e);
}
}
private void process(String message) {
// Mesajı işle
// ...
}
}
Cevap: Backpressure, veri üreticisinin hızını tüketicinin işleyebileceği hızla sınırlayan bir mekanizmadır. Kafka'da backpressure yönetmek için:
Backpressure Yönetimi Örneği:
@Service
public class BackpressureManager {
@Autowired
private KafkaAdmin kafkaAdmin;
@Autowired
private KafkaTemplate kafkaTemplate;
private final double maxLagThreshold = 1000.0; // Maksimum lag eşiği
private final double minLagThreshold = 100.0; // Minimum lag eşiği
@Scheduled(fixedRate = 5000) // Her 5 saniyede bir kontrol et
public void manageBackpressure() {
// Tüm consumer grupları için lag değerlerini al
Map> consumerGroups = getConsumerGroupOffsets();
for (Map.Entry> entry : consumerGroups.entrySet()) {
String groupId = entry.getKey();
Map offsets = entry.getValue();
// Her topic için lag hesapla
Map topicLags = calculateTopicLags(offsets);
for (Map.Entry topicEntry : topicLags.entrySet()) {
String topic = topicEntry.getKey();
Long lag = topicEntry.getValue();
// Lag eşiğine göre throttling uygula
if (lag > maxLagThreshold) {
// Producer hızını düşür
throttleProducer(topic, 0.5); // %50 hızında
logger.info("Throttling producer for topic: " + topic + " due to high lag: " + lag);
} else if (lag < minLagThreshold) {
// Producer hızını artır
throttleProducer(topic, 1.0); // Tam hız
logger.info("Resuming producer for topic: " + topic + " as lag is normal: " + lag);
}
}
}
}
private Map calculateTopicLags(Map offsets) {
Map topicLags = new HashMap<>();
// Her topic için lag hesapla
for (Map.Entry entry : offsets.entrySet()) {
TopicPartition partition = entry.getKey();
OffsetAndMetadata offsetMetadata = entry.getValue();
// Topic son offset'ini al
long endOffset = getEndOffset(partition);
// Lag hesapla
long lag = endOffset - offsetMetadata.offset();
// Topic lag'ini güncelle
topicLags.merge(partition.topic(), lag, Long::sum);
}
return topicLags;
}
private void throttleProducer(String topic, double throttleFactor) {
// Producer'ı kısıtla
// Bu, producer'ın linger.ms ve batch.size ayarlarını değiştirerek yapılabilir
// veya özel bir throttling mekanizması uygulanabilir
// Örnek: linger.ms değerini artır
updateProducerConfig(topic, "linger.ms", (int) (10 / throttleFactor));
// Örnek: batch.size değerini artır
updateProducerConfig(topic, "batch.size", (int) (16384 / throttleFactor));
}
private void updateProducerConfig(String topic, String configKey, int configValue) {
// Producer config'ini güncelle
// Bu, Kafka Admin API veya özel bir config yönetimi ile yapılabilir
// ...
logger.info("Updated producer config for topic: " + topic + ", " + configKey + "=" + configValue);
}
}
Cevap: Exactly-once processing, her mesajın tam olarak bir kez işlendiğini garanti eden bir özelliktir. Kafka'da exactly-once processing sağlamak için:
Exactly-Once Processing Örneği:
@Service
public class ExactlyOnceProcessor {
@Autowired
private KafkaTemplate kafkaTemplate;
@KafkaListener(topics = "input-topic")
@Transactional
public void processMessage(ConsumerRecord record) {
try {
// Mesajı işle
String result = process(record.value());
// Sonucu output topic'ine gönder
kafkaTemplate.send(new ProducerRecord<>("output-topic", record.key(), result));
// İşlem başarılı, transaction commit edilecek
} catch (Exception e) {
// Hata durumunda transaction rollback edilecek
throw new RuntimeException("Failed to process message", e);
}
}
private String process(String message) {
// Mesajı işle ve sonuç döndür
return "PROCESSED: " + message;
}
}
// Transactional Producer Örneği
@Service
public class TransactionalProducerService {
@Autowired
private KafkaTemplate kafkaTemplate;
@PostConstruct
public void init() {
// Transactional producer'ı başlat
kafkaTemplate.setTransactionIdPrefix("tx-");
}
public void sendInTransaction(String topic, String key, String value) {
// Transaction içinde mesaj gönder
kafkaTemplate.executeInTransaction(template -> {
template.send(new ProducerRecord<>(topic, key, value));
return true;
});
}
}
// Exactly-Once Consumer Örneği
@Service
public class ExactlyOnceConsumer {
@KafkaListener(topics = "input-topic")
public void processMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
try {
// Mesajı işle
process(record.value());
// İşlem başarılı, acknowledgment'ı onayla
acknowledgment.acknowledge();
} catch (Exception e) {
// Hata durumunda acknowledgment'ı reddet
acknowledgment.nack(1000); // 1 saniye sonra tekrar dene
throw new RuntimeException("Failed to process message", e);
}
}
private void process(String message) {
// Mesajı işle
// ...
}
}
Cevap: Schema evrimi, veri şemalarının zamanla değişmesini yönetme işlemidir. Kafka'da schema evrimi yönetmek için:
Schema Evrimi Örneği:
// İlk schema
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "string"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}
// Yeni schema (yeni alan eklenmiş)
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "string"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "age", "type": ["null", "int"], "default": null}
]
}
// Java ile Schema Registry kullanımı
public class SchemaEvolutionExample {
public static void main(String[] args) throws IOException, RestClientException {
// Schema Registry istemcisi oluştur
SchemaRegistryClient client = new CachedSchemaRegistryClient("http://localhost:8081", 100);
// Schema'yı kaydet
String schemaJson = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"User\",\n" +
" \"fields\": [\n" +
" {\"name\": \"id\", \"type\": \"string\"},\n" +
" {\"name\": \"name\", \"type\": \"string\"},\n" +
" {\"name\": \"email\", \"type\": \"string\"}\n" +
" ]\n" +
"}";
Schema schema = new Schema.Parser().parse(schemaJson);
SchemaMetadata metadata = client.register("users-value", schema);
System.out.println("Schema registered with id: " + metadata.getId());
// Schema'yı al
Schema retrievedSchema = client.getById(metadata.getId());
System.out.println("Retrieved schema: " + retrievedSchema.toString());
// Yeni schema'yı kaydet (evrim)
String newSchemaJson = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"User\",\n" +
" \"fields\": [\n" +
" {\"name\": \"id\", \"type\": \"string\"},\n" +
" {\"name\": \"name\", \"type\": \"string\"},\n" +
" {\"name\": \"email\", \"type\": \"string\"},\n" +
" {\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\": null}\n" +
" ]\n" +
"}";
Schema newSchema = new Schema.Parser().parse(newSchemaJson);
SchemaMetadata newMetadata = client.register("users-value", newSchema);
System.out.println("New schema registered with id: " + newMetadata.getId());
// Uyumluluk kontrolü
boolean isCompatible = client.testCompatibility("users-value", newSchema);
System.out.println("Is new schema compatible? " + isCompatible);
}
}
Cevap: Multi-tenancy, tek bir Kafka kümesinde birden fazla kiracı (tenant) için veri ayırma sağlayan bir yaklaşımdır. Kafka'da multi-tenancy uygulamak için:
Multi-Tenancy Örneği:
// Tenant bazlı topic isimlendirme
public class TopicNamingStrategy {
public static String getTenantTopic(String tenantId, String baseTopic) {
return tenantId + "-" + baseTopic;
}
}
// Tenant bazlı producer
@Service
public class TenantAwareProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendToTenant(String tenantId, String topic, String key, Object value) {
String tenantTopic = TopicNamingStrategy.getTenantTopic(tenantId, topic);
// Tenant bilgisini header'a ekle
ProducerRecord record = new ProducerRecord<>(
tenantTopic, key, value
);
record.headers().add("tenant-id", tenantId.getBytes());
kafkaTemplate.send(record);
}
}
// Tenant bazlı consumer
@Service
public class TenantAwareConsumer {
@KafkaListener(topics = "#{tenantAwareTopicResolver.getTopics()}")
public void processMessage(ConsumerRecord record) {
// Tenant ID'sini header'dan al
String tenantId = getTenantId(record);
// Tenant'a özgü iş mantığını uygula
processForTenant(tenantId, record.value());
}
private String getTenantId(ConsumerRecord record) {
// Topic adından tenant ID'sini çıkar
String topic = record.topic();
return topic.substring(0, topic.indexOf('-'));
}
private void processForTenant(String tenantId, String message) {
// Tenant'a özgü iş mantığı
// ...
}
}
// Tenant bazlı yetkilendirme
@Service
public class TenantAwareAuthorizer {
public boolean authorize(String tenantId, String resource, String operation) {
// Tenant'ın kaynağa erişim yetkisi olup olmadığını kontrol et
// ...
return true;
}
}
// Tenant bazlı konfigürasyon
@Configuration
public class TenantAwareConfig {
@Bean
public KafkaAdmin kafkaAdmin() {
Map configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic createTenantTopics() {
// Tenant'lar için topic'leri oluştur
return TopicBuilder.name("tenant1-orders")
.partitions(3)
.replicas(1)
.build();
}
}
Cevap: Veri bütünlüğü, Kafka'da verinin doğru ve tutarlı kalmasını sağlayan önlemlerdir. Kafka'da veri bütünlüğü sağlamak için:
Veri Bütünlüğü Örneği:
// Checksum ile veri bütünlüğü
public class DataIntegrityUtils {
public static String calculateChecksum(String data) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(data.getBytes(StandardCharsets.UTF_8));
return Hex.encodeHexString(hash);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Failed to calculate checksum", e);
}
}
public static boolean verifyChecksum(String data, String checksum) {
String calculatedChecksum = calculateChecksum(data);
return calculatedChecksum.equals(checksum);
}
}
// Producer ile checksum gönderme
@Service
public class ChecksumProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendWithChecksum(String topic, String key, String value) {
// Veri ve checksum'ı içeren bir nesne oluştur
DataWithChecksum dataWithChecksum = new DataWithChecksum(
value,
DataIntegrityUtils.calculateChecksum(value)
);
// Mesajı gönder
kafkaTemplate.send(new ProducerRecord<>(topic, key, dataWithChecksum));
}
}
// Consumer ile checksum doğrulama
@Service
public class ChecksumConsumer {
@KafkaListener(topics = "input-topic")
public void processMessage(ConsumerRecord record) {
DataWithChecksum dataWithChecksum = record.value();
// Checksum'ı doğrula
if (!DataIntegrityUtils.verifyChecksum(
dataWithChecksum.getData(),
dataWithChecksum.getChecksum()
)) {
throw new DataIntegrityException("Checksum verification failed");
}
// Veriyi işle
process(dataWithChecksum.getData());
}
private void process(String data) {
// Veriyi işle
// ...
}
}
// Validation ile veri bütünlüğü
@Service
public class ValidationConsumer {
@KafkaListener(topics = "input-topic")
public void processMessage(ConsumerRecord record) {
String data = record.value();
// Veriyi doğrula
ValidationResult validationResult = validate(data);
if (!validationResult.isValid()) {
// Geçersiz veriyi DLQ'ya gönder
sendToDlq(record, validationResult.getErrorMessage());
return;
}
// Geçerli veriyi işle
process(data);
}
private ValidationResult validate(String data) {
// Veriyi doğrula
if (data == null || data.isEmpty()) {
return ValidationResult.invalid("Data is null or empty");
}
if (data.length() > 1000) {
return ValidationResult.invalid("Data is too long");
}
return ValidationResult.valid();
}
}
// Transaction ile veri bütünlüğü
@Service
public class TransactionalProcessor {
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private DatabaseService databaseService;
@Transactional
public void processInTransaction(String input) {
// Veriyi işle
String result = process(input);
// Veritabanına yaz
databaseService.save(result);
// Output topic'ine gönder
kafkaTemplate.send(new ProducerRecord<>("output-topic", result));
// İşlem başarılı, transaction commit edilecek
}
private String process(String input) {
// Veriyi işle ve sonuç döndür
return "PROCESSED: " + input;
}
}
Bu dokümanda RabbitMQ mülakatlarında en çok sorulan konular soru-cevap mantığıyla açıklanmış ve örnek kodlar eklenmiştir.
Cevap: RabbitMQ, açık kaynaklı bir mesajlaşma kuyruğu sistemidir.
RabbitMQ Sunucusunu Başlatma Komutu:
rabbitmq-server
Cevap: Exchange, gelen mesajları belirli kurallara göre kuyruklara yönlendiren bir bileşendir.
Direct Exchange Oluşturma Komutu:
rabbitmqadmin declare exchange name=direct-exchange type=direct
Cevap: Queue, mesajların depolandığı ve tüketiciye ulaştırıldığı bir yapıdır.
Queue Oluşturma Komutu:
rabbitmqadmin declare queue name=my-queue
Cevap: Binding, bir exchange ile bir kuyruk arasındaki ilişkiyi tanımlar.
Binding Oluşturma Komutu:
rabbitmqadmin declare binding source=direct-exchange destination=my-queue routing_key=my-key
Cevap:
Java Producer Örneği:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare("direct-exchange", "direct");
String message = "Hello, RabbitMQ!";
channel.basicPublish("direct-exchange", "my-key", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
Java Consumer Örneği:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare("my-queue", false, false, false, null);
channel.queueBind("my-queue", "direct-exchange", "my-key");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume("my-queue", true, deliverCallback, consumerTag -> {});
}
Cevap: Routing Key, bir mesajın hangi kuyruğa yönlendirileceğini belirten bir etikettir.
Routing Key ile Mesaj Gönderme:
String routingKey = "user.created";
channel.basicPublish("topic-exchange", routingKey, null, message.getBytes());
Cevap: Virtual Host, RabbitMQ sunucusu içindeki mantıksal bir bölümdür.
Virtual Host Oluşturma Komutu:
rabbitmqctl add_vhost my-vhost
Virtual Host'a Kullanıcı Ekleme:
rabbitmqctl set_permissions -p my-vhost my-user ".*" ".*" ".*"
Cevap:
Her connection birden fazla channel içerebilir. Channel'lar, aynı TCP bağlantısı üzerinden birden fazla işlemi paralel olarak gerçekleştirmek için kullanılır.
Cevap: Message Durability, mesajların RabbitMQ sunucusu yeniden başlatıldığında bile kaybolmamasını sağlayan bir özelliktir.
Kalıcı Mesaj Gönderme:
channel.basicPublish("direct-exchange", "my-key",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
Cevap: Acknowledgment, bir mesajın başarıyla işlendiğini RabbitMQ'ya bildiren mekanizmadır.
Manual Acknowledgment Örneği:
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
// Mesajı işle
processMessage(message);
// Onayla
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume("my-queue", false, deliverCallback, consumerTag -> {});
Cevap: Prefetch Count, bir tüketiciye aynı anda gönderilebilecek maksimum mesaj sayısıdır.
Prefetch Count Ayarlama:
channel.basicQos(10); // Her seferinde en fazla 10 mesaj gönder
Cevap: Dead Letter Exchange, işlenemeyen veya reddedilen mesajların yönlendirildiği bir exchange'tir.
DLX Ayarlama:
Mapargs = new HashMap<>(); args.put("x-dead-letter-exchange", "dlx-exchange"); args.put("x-dead-letter-routing-key", "dlx-key"); channel.queueDeclare("my-queue", false, false, false, args);
Cevap: Message TTL, bir mesajın kuyrukta ne kadar süre kalacağını belirten bir süre sınırlamasıdır.
Kuyruk TTL Ayarlama:
Mapargs = new HashMap<>(); args.put("x-message-ttl", 60000); // 60 saniye channel.queueDeclare("my-queue", false, false, false, args);
Mesaj TTL Ayarlama:
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("60000") // 60 saniye
.build();
channel.basicPublish("direct-exchange", "my-key", properties, message.getBytes());
Cevap: Publisher Confirms, bir mesajın RabbitMQ sunucusu tarafından alındığını onaylayan bir mekanizmadır.
Publisher Confirms Örneği:
channel.confirmSelect();
// Senkron onay
channel.basicPublish("direct-exchange", "my-key", null, message.getBytes());
channel.waitForConfirmsOrDie(5000); // 5 saniye bekle
// Asenkron onay
channel.addConfirmListener((deliveryTag, multiple) -> {
System.out.println("Message confirmed with tag: " + deliveryTag);
}, (deliveryTag, multiple) -> {
System.out.println("Message not confirmed with tag: " + deliveryTag);
});
channel.basicPublish("direct-exchange", "my-key", null, message.getBytes());
Cevap: Alternate Exchange, bir exchange'e gönderilen mesajların herhangi bir kuyruğa yönlendirilememesi durumunda yönlendirildiği bir exchange'tir.
Alternate Exchange Ayarlama:
Mapargs = new HashMap<>(); args.put("alternate-exchange", "alternate-exchange"); channel.exchangeDeclare("main-exchange", "direct", true, false, args);
Cevap: Priority Queue, mesajların önceliklerine göre işlendiği bir kuyruk türüdür.
Priority Queue Oluşturma:
Mapargs = new HashMap<>(); args.put("x-max-priority", 10); // Maksimum öncelik seviyesi channel.queueDeclare("priority-queue", false, false, false, args);
Öncelikli Mesaj Gönderme:
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.priority(5) // 1-10 arası öncelik
.build();
channel.basicPublish("direct-exchange", "my-key", properties, message.getBytes());
Cevap: Lazy Queues, mesajları disk üzerinde saklayarak bellek kullanımını optimize eden kuyruk türüdür.
Lazy Queue Oluşturma:
Mapargs = new HashMap<>(); args.put("x-queue-mode", "lazy"); channel.queueDeclare("lazy-queue", false, false, false, args);
Cevap: Quorum Queues, yüksek erişilebilirlik ve veri güvenliği için tasarlanmış bir kuyruk türüdür.
Quorum Queue Oluşturma:
Mapargs = new HashMap<>(); args.put("x-queue-type", "quorum"); channel.queueDeclare("quorum-queue", false, false, false, args);
Cevap: Shovel eklentisi, bir RabbitMQ sunucusundan diğerine veya başka bir mesajlaşma sistemine mesaj kopyalamak için kullanılır.
Shovel Yapılandırması:
rabbitmqctl set_parameter shovel my-shovel \
'{"src-uri": "amqp://source-server", "src-queue": "source-queue", \
"dest-uri": "amqp://destination-server", "dest-queue": "destination-queue"}'
Cevap: Federation eklentisi, farklı RabbitMQ sunucuları arasında mesajların güvenilir bir şekilde dağıtılmasını sağlar.
Federation Ayarlama:
rabbitmqctl set_parameter federation-upstream my-upstream \
'{"uri":"amqp://upstream-server"}'
rabbitmqctl set_parameter federation-upstream-set my-upstream-set \
'{"upstream-set":"my-upstream"}'
rabbitmqctl set_parameter policy federation \
'{"pattern":"", "federation-upstream-set":"my-upstream-set"}'
Cevap: RabbitMQ monitoring için kullanılan araçlar:
Management Plugin Etkinleştirme:
rabbitmq-plugins enable rabbitmq_management
Prometheus ile Entegrasyon:
rabbitmq-plugins enable rabbitmq_prometheus
Cevap: RabbitMQ'da log yönetimi için:
Log Yapılandırması:
# rabbitmq.conf log.file.level = info log.file = /var/log/rabbitmq/rabbit.log log.exchange = true log.queue = true
Cevap: Memory alarm, RabbitMQ sunucusunun bellek kullanımı belirli bir eşiği aştığında tetiklenen bir uyarıdır.
Memory Alarm Eşiği Ayarlama:
# rabbitmq.conf vm_memory_high_watermark.relative = 0.6 # %60 bellek kullanımı
Cevap: Disk alarm, RabbitMQ sunucusunun disk alanı belirli bir eşiği aştığında tetiklenen bir uyarıdır.
Disk Alarm Eşiği Ayarlama:
# rabbitmq.conf disk_free_limit.relative = 1.0 # %1 boş disk alanı
Cevap: RabbitMQ'da kullanıcı ve yetki yönetimi için:
Kullanıcı Oluşturma:
rabbitmqctl add_user my-user my-password
Yetki Atama:
rabbitmqctl set_permissions -p my-vhost my-user ".*" ".*" ".*"
Rol Atama:
rabbitmqctl set_user_tags my-user administrator
Cevap: RabbitMQ cluster kurmak için:
ERLANG Cookie Ayarlama:
# /var/lib/rabbitmq/.erlang.cookie dosyasını tüm node'larda aynı yapın
Cluster Oluşturma:
# Node 1 rabbitmq-server -detached # Node 2 rabbitmq-server -detached rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@node1 rabbitmqctl start_app # Node 3 rabbitmq-server -detached rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@node1 rabbitmqctl start_app
Cevap: RabbitMQ'da yüksek erişilebilirlik sağlamak için:
Mirror Queue Ayarlama:
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all"}'
Hem Mirror Queue Hem Synchronized Queue Ayarlama:
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
Cevap: RabbitMQ'da backup almak için:
Definitions Dışa Aktarma:
rabbitmqctl export_definitions > definitions.json
Mnesia Database Backup:
rabbitmqctl stop cd /var/lib/rabbitmq/mnesia/rabbit@$(hostname) cp -r * /backup/location/ rabbitmq-server -detached
Cevap: RabbitMQ performansını optimize etmek için:
Prefetch Count Ayarlama:
channel.basicQos(100); // Her seferinde en fazla 100 mesaj gönder
Lazy Queue Oluşturma:
Mapargs = new HashMap<>(); args.put("x-queue-mode", "lazy"); channel.queueDeclare("lazy-queue", false, false, false, args);
Cevap: RabbitMQ'da güvenlik sağlamak için:
SSL/TLS Ayarlama:
# rabbitmq.conf listeners.ssl.default = 5671 ssl_options.cacertfile = /path/to/cacert.pem ssl_options.certfile = /path/to/cert.pem ssl_options.keyfile = /path/to/key.pem
Java Client SSL Ayarlama:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5671);
factory.useSslProtocol();
factory.setUsername("my-user");
factory.setPassword("my-password");