Physical Address
304 North Cardinal St.
Dorchester Center, MA 02124
Physical Address
304 North Cardinal St.
Dorchester Center, MA 02124
Mesaj kuyruğu, iki veya daha fazla yazılımın asenkron olarak veri alışverişi yapmasını sağlayan bir iletişim modelidir. Bu modelde, bir yazılım bir kuyruk aracılığıyla başka bir yazılıma mesaj gönderir. Kuyrukta depolanan mesajlar, tüketici yazılım tarafından işlenir. RabbitMQ gibi mesaj kuyruğu sistemleri, birbirinden bağımsız sistemler arasında veri alışverişi sağlamak için kullanılır. Bu bağımsızlık; yazılımın platformu, dili, veritabanı veya başka özelliklerinden kaynaklanabilir.
Bir örnek üzerinden ilerlersek:
Bir web uygulaması bir işlem yaptıktan sonra, bu işlemle ilgili verileri bir kuyruk aracılığıyla başka bir yazılıma gönderir. Kuyruğu takip eden ikinci yazılım, bu mesajı alır ve belirli işlemleri gerçekleştirir. Örneğin, bir web uygulaması kullanıcının yaptığı bir siparişin faturasını oluşturmak için kuyrukta bir mesaj bırakır, fatura oluşturma servisi bu mesajı işleyerek fatura işlemini tamamlar. Bu işlem asenkron olarak gerçekleşir, yani web uygulaması fatura oluşturma sürecini beklemez, fatura daha sonra oluşturulur ve kullanıcıya bildirilir.
Mesaj aracısı, mesaj kuyruğunu yöneten sistemdir. RabbitMQ, Kafka gibi sistemler mesaj aracısı olarak adlandırılır. Mesaj aracısı, üretici ve tüketici arasında köprü görevi görür. Mesajları doğru şekilde kuyrukta depolar, gerektiğinde yönlendirir ve tüketicinin alması için hazır hale getirir.
RabbitMQ, bir mesaj aracısı olarak şu özelliklere sahiptir:
Bazı senaryolarda sistemler arasındaki iletişim asenkron olmalıdır. Örneğin, bir e-ticaret sitesinde kullanıcı bir ürün siparişi verdiğinde, sipariş onayı aldıktan sonra fatura oluşturma, e-posta gönderme gibi işlemler sistemin arka planında çalışır. Bu işlemleri anlık olarak yapmak kullanıcıyı bekletmeye ve sistem performansını düşürmeye neden olur. İşte bu noktada mesaj kuyrukları devreye girer.
Örneğin:
RabbitMQ’nun sağladığı temel özellikler şunlardır:
RabbitMQ, özellikle şu tür senaryolarda tercih edilir:
RabbitMQ, bir mesajın üreticiden (publisher) tüketiciye (consumer) iletilmesini şu yapılarla gerçekleştirir:
Mesajın iletim süreci şu şekildedir:
Aşağıda basit bir RabbitMQ kullanımı örneği verilmiştir:
Producer (Mesaj Üreten) Kod Örneği (Python):
import pika
# RabbitMQ'ya bağlanma
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Bir kuyruk oluşturma
channel.queue_declare(queue='hello')
# Mesaj gönderme
channel.basic_publish(exchange='', routing_key='hello', body='Merhaba RabbitMQ!')
print(" [x] Mesaj gönderildi: 'Merhaba RabbitMQ!'")
# Bağlantıyı kapatma
connection.close()
Consumer (Mesaj Tüketen) Kod Örneği (Python):
import pika
# RabbitMQ'ya bağlanma
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Kuyruğu oluşturma
channel.queue_declare(queue='hello')
# Mesajı alma
def callback(ch, method, properties, body):
print(f" [x] Mesaj alındı: {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Mesaj bekleniyor. Çıkmak için CTRL+C yapın')
channel.start_consuming()
Bu örneklerde, bir Producer
(mesaj üreten) bir mesaj gönderir ve Consumer
(mesaj tüketen) bu mesajı alır ve işleyebilir. RabbitMQ’nun kullanımının temel mantığı bu şekildedir.
Docker kullanarak RabbitMQ’yu yerel ortamda çalıştırmak oldukça kolaydır. Docker, uygulamaları konteyner içinde çalıştıran bir platformdur ve bu, RabbitMQ’yu izole bir ortamda çalıştırmak için idealdir. İlk olarak, RabbitMQ’nun Docker ortamında nasıl kullanılacağını inceleyelim.
docker run -d --hostname my-rabbit --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3-management
Bu komut, RabbitMQ’yu çalıştırır ve RabbitMQ’nun yönetim paneline (port 15672) ve mesajların iletimi için gerekli olan AMQP portuna (port 5672) erişim sağlar.
http://localhost:15672
adresine giderek RabbitMQ’nun yönetim paneline erişebilirsiniz. Varsayılan giriş bilgileri:Docker dışında bir diğer seçenek ise RabbitMQ’yu CloudAMQP gibi bulut tabanlı bir hizmet üzerinden kullanmaktır. CloudAMQP, RabbitMQ’nun bulut ortamında çalışmasını sağlayan bir hizmettir ve herhangi bir yerel kurulum gerektirmez.
Docker ve CloudAMQP’nin her ikisi de RabbitMQ’yu çalıştırmak için uygun seçeneklerdir ve kullanıcının ihtiyacına göre tercih edilebilir. Docker daha çok yerel testler ve geliştirme için idealdir, CloudAMQP ise gerçek zamanlı bulut uygulamaları için tercih edilebilir.
CloudAMQP veya Docker ile RabbitMQ’yu kullanırken uygulamanızın bu servislere nasıl bağlanacağını bilmeniz gerekir. Her iki ortamda da bağlantı bilgileri önemlidir.
localhost
üzerinden bağlanabilirsiniz. Örneğin: import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
URL
sağlar ve uygulamanızı bu URL üzerinden bağlayabilirsiniz. Örneğin: import pika
url = "amqp://username:password@hostname/vhost"
connection = pika.BlockingConnection(pika.URLParameters(url))
channel = connection.channel()
Bu bağlantı bilgileri, uygulamanızın RabbitMQ sunucusuna bağlanıp mesaj alışverişi yapmasını sağlar.
Exchange, RabbitMQ’da mesajların hangi kuyruğa yönlendirileceğini belirleyen bir yapılandırmadır. Publisher (mesajı gönderen) tarafından gönderilen mesajlar önce Exchange’e gelir ve Exchange, bu mesajları belirli kurallara göre uygun kuyruklara iletir.
RabbitMQ’da birden fazla kuyruk olabilir ve hangi mesajın hangi kuyruğa gideceği, Exchange tarafından belirlenir. Exchange, mesajı doğrudan bir kuyrukla eşleştirebilir veya daha karmaşık senaryolarda mesajları birden fazla kuyruğa yönlendirebilir.
Binding, Exchange ile kuyruklar arasındaki ilişkiyi tanımlayan bir kavramdır. Bir kuyruk, bir Exchange’e bağlanır ve bu bağlanma işlemi Binding olarak adlandırılır. Bir Exchange birden fazla kuyruğa bağlanabilir ve mesajlar bu kuyruğa bağlanan Binding’lere göre yönlendirilir.
Bir Exchange’e birden fazla kuyruk bağlanabilir ve bu durumda, Exchange gelen mesajları hangi kuyruğa yönlendireceğini Binding’e göre belirler. Binding işlemi, mesajların doğru kuyruğa yönlendirilmesini sağlar.
RabbitMQ’da farklı türlerde Exchange’ler bulunur. Bu Exchange türleri, mesajların kuyruklara nasıl yönlendirileceğini belirler ve her türün kendine özgü işleyişi vardır.
Direct Exchange, mesajların doğrudan belirli bir Routing Key’e sahip kuyruklara iletilmesini sağlar. Bu tür Exchange, en basit ve yaygın kullanılan türlerden biridir.
Kullanım Senaryoları:
Kod Örneği (Python):
import pika
# RabbitMQ'ya bağlan
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Direct Exchange oluştur
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# Kuyruk oluştur ve bind et
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='error')
# Mesaj gönder
channel.basic_publish(exchange='direct_logs', routing_key='error', body='Hata Mesajı!')
Fanout Exchange, mesajların tüm bağlı kuyruklara iletilmesini sağlar. Bu tür Exchange, herhangi bir Routing Key kullanmaz ve mesajlar Exchange’e bağlı olan tüm kuyruklara yönlendirilir.
Kullanım Senaryoları:
Topic Exchange, Routing Key kullanarak mesajların kuyruklara yönlendirilmesini sağlar, ancak burada Routing Key daha karmaşıktır. Routing Key, bir desen (pattern) olarak tanımlanır ve mesajlar bu desenlere göre yönlendirilir.
*
, #
) ile birlikte kullanılır. Örneğin, “dosya.*.hatasi” şeklinde bir anahtar, “dosya.yukleme.hatasi” veya “dosya.silme.hatasi” gibi mesajları yakalayabilir.Kullanım Senaryoları:
Headers Exchange, Routing Key yerine mesaj başlıklarını (header) kullanarak mesajları yönlendirir. Mesaj başlıkları, key-value formatında belirlenir ve mesajlar bu başlıkların eşleşmesine göre kuyruklara iletilir.
Kullanım Senaryoları:
RabbitMQ’daki Exchange türleri, farklı mesaj yönlendirme senaryolarına uyacak şekilde tasarlanmıştır:
Öncelikle, RabbitMQ’yu .NET Core uygulamalarıyla kullanabilmek için gerekli kütüphanenin yüklenmesi gerekmektedir. Bu kütüphane, NuGet paketi olarak indirilebilir:
Install-Package RabbitMQ.Client
Bu kütüphane, hem Publisher hem de Consumer uygulamalarında kullanılacak.
Publisher uygulaması, bir kuyruğa mesaj gönderme işlemini gerçekleştirir. Bu uygulamanın temel adımları şu şekildedir:
Publisher uygulaması, RabbitMQ sunucusuna bağlanarak bir kanal açar. Bu kanal üzerinden kuyruğa mesajlar gönderilir. Bağlantı oluşturmak için şu adımlar izlenir:
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
// Kuyruk oluşturma
channel.QueueDeclare(queue: "example_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
// Mesaj gönderme
string message = "Merhaba, RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "example_queue",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Gönderilen Mesaj: {0}", message);
}
Yukarıdaki kod, “example_queue” isimli bir kuyruk oluşturur ve bu kuyruğa bir mesaj gönderir. RabbitMQ’da mesajlar byte array olarak işlenir, bu yüzden mesajlar gönderilmeden önce byte[] formatına dönüştürülür.
channel.QueueDeclare(queue: "example_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
Bu kod parçası, RabbitMQ’da “example_queue” isimli bir kuyruk oluşturur. Kuyruğun durable özelliği, mesajların RabbitMQ yeniden başlatıldığında korunup korunmayacağını belirtir. Exclusive ise kuyruğun sadece bu bağlantı için geçerli olup olmadığını belirler. autoDelete true olduğunda, kuyruk en son consumer bağlantısını kapattığında silinir.
Consumer uygulaması, kuyruktaki mesajları okuma işlemini gerçekleştirir. Publisher uygulamasında kuyruğa gönderilen mesajlar, Consumer tarafından işlenir.
Publisher ile benzer şekilde, Consumer da RabbitMQ’ya bağlanır ve bir kanal açar. Ancak burada fark, kuyruktan mesajları almak için BasicConsume metodunun kullanılmasıdır:
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "example_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Alınan Mesaj: {0}", message);
};
channel.BasicConsume(queue: "example_queue",
autoAck: true,
consumer: consumer);
Console.WriteLine(" Mesaj bekleniyor...");
Console.ReadLine();
}
Yukarıdaki kodda, EventingBasicConsumer sınıfı kullanılarak RabbitMQ kuyruğundaki mesajlar alınıyor. Mesaj alındığında, Received olayı tetiklenir ve gelen mesaj byte array formatından string formatına dönüştürülerek konsola yazdırılır.
RabbitMQ, Publisher ve Consumer arasında asenkron bir iletişim sağlar. Publisher tarafından kuyruğa gönderilen mesajlar, Consumer tarafından sırasıyla alınır ve işlenir. Bu iletişim, BasicPublish ve BasicConsume metodları ile sağlanır.
RabbitMQ, mesajları kuyruğa ekleme ve tüketme süreçlerinde oldukça esnek bir yapıya sahiptir. Ancak uygulamaların büyümesi ve ölçeklenmesi durumunda daha gelişmiş kuyruk mimarileri gereklidir.
RabbitMQ’nun ana fikri, yoğun kaynak gerektiren işleri hemen tamamlamak yerine, bu işleri daha sonra yapılacak şekilde planlamaktır. Bu nedenle kuyruklar kullanılır ve mesajlar kuyruğa atılır, ardından tüketiciler (consumer) bu mesajları asenkron olarak işler.
Gelişmiş kuyruk mimarisi, aşağıdaki başlıkları içerir:
Bu yapıların her biri, ölçeklenebilir ve güvenilir bir mesaj kuyruğu sistemi kurmak için hayati önem taşır.
RabbitMQ’nun varsayılan davranışı, mesajları round robin (sıralı) bir şekilde tüketicilere dağıtmaktır. Bu, bir mesajın ilk tüketiciye, ikinci mesajın ikinci tüketiciye, vb. sırayla gönderilmesi anlamına gelir. Bu sıralı dağıtım, ölçeklenebilirlik ve performans için gereklidir.
Kod Örneği: Round Robin Dağıtımı
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "hello",
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
Yukarıdaki örnekte, her tüketici sırayla mesajları alır. Her bir mesaj, round robin mantığıyla farklı tüketicilere gönderilir.
RabbitMQ’nun varsayılan davranışı, bir mesaj tüketiciye gönderildikten sonra kuyruktan silinmesidir. Ancak bu durum, tüketici işlemi sırasında hata meydana gelirse veri kaybına neden olabilir. Bu nedenle mesaj onaylama sistemi kullanılır. Mesajlar, ancak tüketici başarıyla işlediğini onayladıktan sonra kuyruktan silinir.
Kod Örneği: Mesaj Onaylama
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
// Mesaj başarıyla işlendikten sonra onay gönder
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
Burada, BasicAck
fonksiyonu mesajın başarıyla işlenip işlenmediğini RabbitMQ’ya bildirir. Eğer BasicAck
çağrılmazsa, mesaj kuyruktan silinmez ve başka bir tüketiciye yeniden gönderilir.
RabbitMQ, varsayılan olarak kuyruk ve mesajları bellekte tutar. Bu, RabbitMQ sunucusu kapandığında mesajların kaybolmasına neden olabilir. Mesajların ve kuyrukların kalıcı olabilmesi için durable ve persistent yapılandırmalarının yapılması gereklidir.
Kod Örneği: Kalıcı Kuyruk ve Mesaj
// Kalıcı kuyruk oluşturma
channel.QueueDeclare(queue: "durable_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
// Kalıcı mesaj gönderme
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
var body = Encoding.UTF8.GetBytes("Kalıcı mesaj");
channel.BasicPublish(exchange: "", routingKey: "durable_queue", basicProperties: properties, body: body);
Bu örnekte, durable
parametresi ile kuyruk kalıcı hale getirilir. Ayrıca, mesajın kalıcı olması için Persistent
özelliği true
olarak ayarlanmıştır. Bu şekilde, RabbitMQ sunucusu yeniden başlatılsa bile mesajlar kaybolmaz.
Varsayılan Round Robin dağıtımı, tüm tüketicilere sırayla mesaj gönderir, ancak bazı tüketiciler daha yavaş olabilir ve bu durumda eşit olmayan bir yük dağılımı ortaya çıkabilir. Fair Dispatch, her bir tüketicinin yalnızca işleyebildiği kadar mesaj almasını sağlar.
Kod Örneği: Fair Dispatch
// Prefetch count ile adil dağıtım
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Bu yapılandırmada, her bir tüketici sadece bir mesaj alır ve mesajı işleyene kadar yeni bir mesaj gönderilmez. Bu, yükün daha adil bir şekilde dağıtılmasını sağlar.
Tüketici, bazı durumlarda mesajı işleyemeyebilir ve bu durumda mesajı yeniden kuyruğa göndermek isteyebilir. BasicNack ile mesajın işlenmediğini ve tekrar işlenmesi gerektiğini bildirebiliriz.
Kod Örneği: Mesajı Reddetme
// Mesajı işleyememe durumu
channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
BasicNack
ile mesajı yeniden kuyruğa ekleyebiliriz. Eğer requeue
parametresi true
ise, mesaj tekrar kuyruğa eklenir ve başka bir tüketici tarafından işlenir.
RabbitMQ’da mesajlar Exchange adı verilen yapılar üzerinden kuyruklara yönlendirilir. Bir Exchange, mesajları alır ve belirlenen kurallara göre bu mesajları ilgili kuyruğa (queue) gönderir. Direct Exchange, bu Exchange türlerinden biridir ve mesajların belirli bir anahtar (routing key) kullanılarak hedef kuyruklara yönlendirilmesini sağlar.
Direct Exchange, mesajların belirli bir routing key (yönlendirme anahtarı) aracılığıyla doğrudan belirli bir kuyruğa gönderilmesini sağlayan Exchange türüdür. Bu davranışta, birden fazla kuyruk olabilir, ancak gönderilen mesajlar yalnızca belirtilen anahtar (routing key) ile uyuşan kuyruğa gider.
Direct Exchange’in ana mantığı şudur:
Bir RabbitMQ uygulamasında Direct Exchange kullanmak için aşağıdaki adımlar takip edilir:
Direct Exchange ile mesaj göndermenin temel adımlarını ve kod örneklerini aşağıda açıklıyoruz.
Öncelikle bir Direct Exchange oluşturulur. Bu Exchange, mesajları routing key kullanarak belirli kuyruklara yönlendirecektir.
// Exchange oluşturma
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
Yukarıdaki kodda, direct_logs
adında bir Direct Exchange oluşturulmuştur. Exchange türü “direct” olarak belirtilmiştir.
Mesajların yönlendirileceği bir kuyruk oluşturulur.
// Kuyruk oluşturma
channel.QueueDeclare(queue: "error_logs", durable: false, exclusive: false, autoDelete: false, arguments: null);
Bu örnekte, error_logs
adında bir kuyruk oluşturulmuştur.
Exchange ile kuyruk arasında bir bağlantı kurulur. Bu bağlantı, routing key kullanılarak yapılır.
// Binding işlemi
channel.QueueBind(queue: "error_logs", exchange: "direct_logs", routingKey: "error");
Yukarıdaki kodda, error_logs
kuyruğu, direct_logs
Exchange’ine “error” routing key ile bağlanmıştır. Bu, sadece routing key “error” olan mesajların bu kuyruğa gitmesini sağlar.
Publisher, oluşturulan Direct Exchange üzerinden bir mesaj gönderir ve bir routing key belirtir. Mesaj, bu key ile eşleşen kuyruğa yönlendirilir.
// Mesaj gönderme
string message = "Error: Something went wrong!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs", routingKey: "error", basicProperties: null, body: body);
Yukarıdaki örnekte, error
routing key’ine sahip bir mesaj gönderilmektedir. Bu mesaj, error
key ile bağlı olan kuyrukta (yani error_logs
kuyruğunda) son bulacaktır.
Tüketici tarafında, belirli bir kuyruktaki mesajlar alınır ve işlenir.
// Tüketici oluşturma
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
// Kuyruğu dinleme
channel.BasicConsume(queue: "error_logs", autoAck: true, consumer: consumer);
Bu kod, error_logs
kuyruğunu dinler ve yeni bir mesaj geldiğinde, mesajı alır ve ekrana yazdırır.
Bir e-ticaret uygulamasında, siparişlerle ilgili log mesajlarının farklı tiplerde olabileceğini varsayalım: info, warning, error. Her bir log tipi için ayrı kuyruklar oluşturup, bu logları farklı yönlendirme anahtarları ile ilgili kuyruklara gönderebiliriz.
Bu yapı, her bir log tipine özel işlemler yapılmasını sağlar. Örneğin, error logları için bir hata çözümleme servisi devreye girebilir.
Fanout Exchange, mesajların herhangi bir routing key kullanmaksızın Exchange’e bağlı olan tüm kuyruklara gönderilmesini sağlar. Bu Exchange türü, mesajın hangi kuyruğa gideceğini ayırt etmez; bir mesajı alır ve tüm bağlı kuyruklara iletir. Broadcast (yayın) mantığıyla çalışır, yani mesaj bir defa yayınlanır ve birden fazla kuyruğa yönlendirilir.
Fanout Exchange şu durumlar için idealdir:
Fanout Exchange’i kullanarak bir mesajı tüm kuyruklara iletmek için aşağıdaki adımları izliyoruz:
Öncelikle bir Fanout Exchange oluştururuz. Bu Exchange, mesajları bağlı olan tüm kuyruklara yönlendirecektir.
// Fanout Exchange oluşturma
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
Bu örnekte, logs
adında bir Fanout Exchange oluşturulmuştur. type
parametresi olarak "fanout"
belirtilmiştir, çünkü bu bir Fanout Exchange’dir.
Fanout Exchange‘e bağlı kuyruklar oluşturulur ve bu kuyruklar Exchange‘e bağlanır (binding yapılır).
// Kuyruk oluşturma ve Exchange'e bağlama
string queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
Bu kodda, rastgele bir isimle bir kuyruk oluşturulur ve bu kuyruk logs adlı Fanout Exchange’e bağlanır. routingKey parametresi boş bırakılmıştır, çünkü Fanout Exchange’de routing key kullanılmaz.
Publisher, Fanout Exchange üzerinden bir mesaj gönderir. Mesaj, tüm bağlı kuyruklara iletilecektir.
// Mesaj gönderme
string message = "info: Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
Yukarıdaki örnekte, logs
Exchange’ine bir mesaj gönderilmektedir. routingKey boş bırakılmıştır, çünkü Fanout Exchange routing key’e ihtiyaç duymaz.
Tüketiciler (Consumers), ilgili kuyruktan mesajları alır ve işleme koyar.
// Tüketici oluşturma
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
// Kuyruğu dinleme
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Bu kod, belirlenen kuyruktaki mesajları dinler ve yeni bir mesaj geldiğinde bu mesajı alır, ardından ekrana yazdırır.
Şimdi Fanout Exchange’in daha pratik bir senaryoda nasıl kullanılabileceğine bakalım. Diyelim ki bir yazılım güncelleme sisteminiz var ve bir sunucuya yeni bir güncelleme geldiğinde bu bilgiyi aynı anda farklı servislere iletmek istiyorsunuz. Örneğin, bir servisin e-posta bildirimleri, bir diğer servisin ise log tutma işlemleri olabilir. Bu senaryoda Fanout Exchange kullanmak ideal olacaktır.
Aşağıda, bir Fanout Exchange kullanarak birden fazla kuyruğa mesaj gönderen bir uygulama örneği bulunmaktadır.
Publisher (Mesaj Gönderen)
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// Fanout Exchange oluştur
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
// Mesaj gönder
string message = "info: Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
Console.WriteLine(" [x] Sent {0}", message);
Consumer (Mesaj Alan)
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// Rastgele bir kuyruk oluştur
string queueName = channel.QueueDeclare().QueueName;
// Kuyruğu Fanout Exchange'e bağla
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
// Mesajları dinle
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
Bu örnekte, bir mesaj Fanout Exchange aracılığıyla birden fazla kuyrukta dinlenebilir ve işlenebilir.
Topic Exchange, mesajların routing key‘lere dayalı olarak kuyruklara iletilmesini sağlar. Routing key, nokta (.) ile ayrılmış kelimelerden oluşur ve kuyruklar, bu routing key‘lere göre mesajları alır. Routing key kullanarak daha esnek bir mesaj yönlendirme mekanizması kurabiliriz.
Topic Exchange‘in ana özellikleri:
*
(yıldız): Noktayla ayrılmış bir kelimeyi temsil eder.#
(dies): Noktayla ayrılmış bir veya daha fazla kelimeyi temsil eder.Topic Exchange‘i kullanarak mesajları uygun kuyruklara yönlendirmek için şu adımları izleriz:
İlk adım olarak, bir Topic Exchange oluşturalım.
// Topic Exchange oluşturma
channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
Bu kodda, topic_logs
adında bir Topic Exchange oluşturulmuştur. Exchange’in türü topic olarak belirtilmiştir.
Her bir kuyruk, belirli bir routing key ile Topic Exchange‘e bağlanmalıdır.
// Kuyruk oluşturma ve Topic Exchange'e bağlama
string queueName = channel.QueueDeclare().QueueName;
// Kuyruğu Topic Exchange'e bağlama (binding)
channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "kern.*");
Bu kodda, rastgele bir isimle bir kuyruk oluşturulmuş ve bu kuyruk kern.*
pattern’ine uygun tüm mesajları almak üzere Topic Exchange‘e bağlanmıştır. *
, noktayla ayrılmış bir kelimenin yerine geçer. Yani, kern.*
pattern’i, kern.info
, kern.warning
gibi routing key‘lere sahip mesajları alacaktır.
Publisher, belirli bir routing key kullanarak Topic Exchange üzerinden mesaj gönderir.
// Mesaj gönderme
string routingKey = "kern.info";
string message = "A kernel info log.";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "topic_logs", routingKey: routingKey, basicProperties: null, body: body);
Bu örnekte, routing key olarak kern.info
kullanılarak bir mesaj gönderilmiştir. Bu mesaj, kern.*
pattern’ine uyan tüm kuyruklara iletilecektir.
Tüketiciler, belirli bir routing key pattern’ine uygun mesajları almak üzere ayarlanır.
// Tüketici oluşturma
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
// Kuyruğu dinleme
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Bu kod, belirtilen kuyruktaki mesajları dinler ve gelen mesajı ekrana yazdırır.
Topic Exchange‘i kullanarak mesaj yönlendirmesini oldukça esnek hale getirebiliriz. Örneğin, bir log sistemi oluşturduğunuzu düşünelim. Routing key‘ler ile farklı seviyelerde loglar (info, warning, error) ve farklı bileşenlerden (kernel, auth, database) gelen mesajları yönlendirebiliriz.
Bu yapı, şu şekilde oluşturulabilir:
kern.*
: Kernel ile ilgili tüm logları alır.*.error
: Hangi bileşenden olursa olsun, hata (error) loglarını alır.auth.#
: Auth ile ilgili tüm logları alır, çünkü #
birden fazla kelimeyi temsil eder.Aşağıda, Topic Exchange‘i kullanarak esnek bir mesaj yönlendirme sistemi kuran bir örnek bulunmaktadır:
Publisher (Mesaj Gönderen)
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// Topic Exchange oluştur
channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
// Mesaj gönder
string routingKey = "auth.error";
string message = "An authentication error occurred.";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "topic_logs", routingKey: routingKey, basicProperties: null, body: body);
Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
Consumer (Mesaj Alan)
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// Rastgele bir kuyruk oluştur
string queueName = channel.QueueDeclare().QueueName;
// Kuyruğu Topic Exchange'e bağla
channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.error");
// Mesajları dinle
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received '{0}':'{1}'", ea.RoutingKey, message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
Bu senaryoda, routing key olarak auth.error
kullanılarak bir mesaj gönderilmiştir. Tüketici ise *.error
pattern’ine uygun mesajları almaktadır, dolayısıyla auth.error
mesajını alır ve işler.
Topic Exchange, routing key‘lerde joker karakterler kullanarak mesaj yönlendirmeyi esnek hale getirir:
*
(yıldız): Noktayla ayrılmış bir kelimeyi temsil eder.#
(dies): Noktayla ayrılmış bir veya daha fazla kelimeyi temsil eder.Bu joker karakterler sayesinde oldukça esnek bir mesaj yönlendirme mekanizması kurabiliriz. Örneğin:
kern.*
: Kernel ile ilgili herhangi bir log alır (kern.info
, kern.error
gibi).*.error
: Bileşeni fark etmeksizin tüm error loglarını alır (auth.error
, db.error
gibi).auth.#
: Auth ile ilgili tüm logları alır (auth.info
, auth.warning
, auth.error
gibi).Header Exchange, mesajları header‘da tanımlı anahtar-değer çiftlerine (key-value pairs) göre kuyruklara ileten bir RabbitMQ Exchange türüdür. Header Exchange kullanırken, mesajların yönlendirileceği kuyruklar, mesajın içinde bulunan header verilerine göre belirlenir. Diğer Exchange türlerinden farklı olarak, routing key kullanılmaz; bunun yerine mesajın başlık kısmında tanımlanan özel bilgiler (header) kullanılır.
Direct ve Topic Exchange‘lerde mesaj yönlendirmesi routing key‘e dayalı olarak yapılır. Mesajın hangi kuyruklara iletileceği routing key‘in değeri üzerinden belirlenir. Ancak Header Exchange‘de yönlendirme tamamen header bilgilerine dayalıdır ve routing key kullanılmaz.
Mesela, Direct Exchange‘de bir mesajın yönlendirileceği kuyruk routing key ile eşleştirilirken, Header Exchange‘de kuyruk ile mesajın başlığındaki bilgiler eşleştiğinde mesaj ilgili kuyruğa gönderilir. Bu, daha esnek ve belirli ihtiyaçlara göre şekillendirilebilir bir mesaj yönlendirme sistemi sunar.
Header Exchange‘i kullanarak mesaj yönlendirmek için şu adımları izlememiz gerekir:
İlk olarak, bir Header Exchange oluşturulur. Bu Exchange, mesajları başlık bilgilerine göre yönlendirecektir.
// Header Exchange oluşturma
channel.ExchangeDeclare(exchange: "header_logs", type: "headers");
Bu kod, header_logs
adında bir Header Exchange oluşturur. Exchange’in türü headers olarak belirtilmiştir.
Her kuyruk, belirli header bilgilerine göre Header Exchange‘e bağlanmalıdır. Yani, mesajda tanımlı header bilgileri ile kuyrukta tanımlı header bilgileri eşleşirse mesaj ilgili kuyruğa yönlendirilir.
// Kuyruk oluşturma ve Header Exchange'e bağlama
string queueName = channel.QueueDeclare().QueueName;
var args = new Dictionary<string, object>
{
{ "x-match", "all" }, // all: Tüm header'lar eşleşmeli, any: Bir header eşleşse yeterli
{ "format", "pdf" },
{ "type", "report" }
};
// Kuyruğu Header Exchange'e bağlama
channel.QueueBind(queue: queueName, exchange: "header_logs", routingKey: "", arguments: args);
Bu örnekte, x-match
parametresi ile eşleştirme kuralları belirlenmiştir:
x-match: "all"
: Mesajın tüm header’larının belirtilen kuyrukla eşleşmesi gerekir.x-match: "any"
: Mesajın herhangi bir header’ı eşleşirse yeterlidir.Ayrıca, format
ve type
header’ları tanımlanmış ve bu kuyruk yalnızca bu başlık bilgilerine sahip mesajları alacaktır.
Publisher, mesajı göndermeden önce mesajın başlık kısmına (header) gerekli bilgileri ekler.
// Mesaj gönderme
var properties = channel.CreateBasicProperties();
properties.Headers = new Dictionary<string, object>
{
{ "format", "pdf" },
{ "type", "report" }
};
string message = "PDF report generated.";
var body = Encoding.UTF8.GetBytes(message);
// Header bilgileri ile mesaj gönderme
channel.BasicPublish(exchange: "header_logs", routingKey: "", basicProperties: properties, body: body);
Bu örnekte, mesajın başlık kısmına format ve type bilgileri eklenmiştir. Mesaj, yalnızca bu header bilgilerine uygun kuyruklara iletilecektir.
Tüketiciler, belirtilen header bilgilerine sahip mesajları alacak şekilde ayarlanır.
// Tüketici oluşturma
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received '{0}'", message);
};
// Kuyruğu dinleme
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Bu kod, belirtilen kuyruktaki mesajları dinler ve gelen mesajı ekrana yazdırır.
Header Exchange, mesaj yönlendirme konusunda esnek bir yapı sunar. Farklı mesajları, başlıklarında tanımlı bilgiler üzerinden doğru kuyruklara yönlendirebilirsiniz. Özellikle x-match parametresi sayesinde yönlendirme işlemini çok daha kontrollü bir şekilde yapabilirsiniz.
x-match değerleri:
Bir raporlama sistemi düşünelim. PDF ve Excel formatlarında raporlar üretiliyor. Ayrıca bu raporlar, farklı kategorilere (finans, insan kaynakları, vb.) göre ayrılıyor. Bu sistemde, farklı format ve kategoriye sahip raporlar doğru kuyruklara yönlendirilmelidir.
Publisher (Mesaj Gönderen)
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// Header Exchange oluştur
channel.ExchangeDeclare(exchange: "header_logs", type: "headers");
// Mesaj gönder
var properties = channel.CreateBasicProperties();
properties.Headers = new Dictionary<string, object>
{
{ "format", "pdf" },
{ "category", "finance" }
};
string message = "Finance PDF report.";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "header_logs", routingKey: "", basicProperties: properties, body: body);
Console.WriteLine(" [x] Sent '{0}'", message);
Consumer (Mesaj Alan)
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// Rastgele bir kuyruk oluştur ve Header Exchange'e bağla
string queueName = channel.QueueDeclare().QueueName;
var args = new Dictionary<string, object>
{
{ "x-match", "all" },
{ "format", "pdf" },
{ "category", "finance" }
};
// Kuyruğu Header Exchange'e bağla
channel.QueueBind(queue: queueName, exchange: "header_logs", routingKey: "", arguments: args);
// Mesajları dinle
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received '{0}'", message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
Bu senaryoda, mesajın header kısmına format: "pdf"
ve category: "finance"
bilgileri eklenmiştir. Bu mesaj, yalnızca bu başlık bilgilerine uygun kuyruklara iletilecektir.
Mesaj tasarımı, belirli bir senaryoya göre mesajların iletilmesi ve işlenmesi için kullanılan yapılandırılmış bir modeldir. Yazılım dünyasındaki tasarım desenlerine benzer şekilde, mesaj tasarımları da belirli problemleri çözmek için uygulanır. Bu tasarımlar, mesajların nasıl bir yapıda olacağını, iki veya daha fazla servis arasında nasıl bir iletişim kurulacağını belirler.
RabbitMQ’da kullanılan en yaygın mesaj tasarımları şunlardır:
Bu tasarımlar farklı senaryolara göre şekillenir ve her biri mesajların farklı şekillerde işlenmesini sağlar.
Point-to-Point (P2P), bir mesajın doğrudan bir kuyruğa gönderilmesi ve bu kuyruğun işleyen bir tüketici (consumer) tarafından alınması prensibine dayanır. Bu tasarımda, mesaj yalnızca tek bir tüketici tarafından işlenir. Eğer bir mesajın yalnızca tek bir tüketici tarafından alınması gerekiyorsa, bu tasarım uygundur.
// Publisher
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "example_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
string message = "Hello, World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "example_queue", basicProperties: null, body: body);
Console.WriteLine(" [x] Sent {0}", message);
// Consumer
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "example_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "example_queue", autoAck: true, consumer: consumer);
Publish/Subscribe tasarımında, publisher mesajı bir exchange (dağıtıcı) üzerinden birçok kuyruğa gönderir. Bu tasarım, bir mesajın birden fazla tüketici tarafından işlenmesi gerektiği senaryolar için uygundur. Bu modelde mesajlar, exchange’e bağlı olan tüm kuyruklara iletilir.
// Publisher
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
string message = "Log message!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
Console.WriteLine(" [x] Sent {0}", message);
// Consumer
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Work Queue tasarımı, birden fazla tüketici arasında iş yükünü eşit dağıtmayı amaçlar. Bu tasarımda, bir mesaj birden fazla tüketici tarafından değil, yalnızca biri tarafından işlenir. Bu, paralel işleme ve iş yükü dağılımı için idealdir.
// Publisher
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
string message = "Task message";
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);
Console.WriteLine(" [x] Sent {0}", message);
// Consumer
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
Request/Response tasarımı, publisher’ın bir istekte bulunup, tüketiciden bir yanıt (response) beklediği senaryolarda kullanılır. Publisher, bir kuyruktan mesaj gönderir ve tüketici bu mesajı işleyip yanıtını başka bir kuyruktan döner.
// Publisher
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var replyQueueName = channel.QueueDeclare().QueueName;
var consumer = new EventingBasicConsumer(channel);
var correlationId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = correlationId;
string message = "Request message";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: body);
consumer.Received += (model, ea) =>
{
if (ea.BasicProperties.CorrelationId == correlationId)
{
var response = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine(" [x] Received response: {0}", response);
}
};
channel.BasicConsume(queue: replyQueueName, autoAck: true, consumer: consumer);
// Consumer
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var response = "";
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received request: {0}", message);
response = "Response to " + message;
var reply
Props = channel.CreateBasicProperties();
replyProps.CorrelationId = ea.BasicProperties.CorrelationId;
var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(exchange: "", routingKey: ea.BasicProperties.ReplyTo, basicProperties: replyProps, body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);
Enterprise Service Bus (ESB), farklı yazılım sistemlerinin birbiriyle iletişim kurmasını sağlayan bir mimaridir. Bu yapı, servisler arasında entegrasyon sağlayarak iletişim süreçlerini kolaylaştırır. Farklı sistemler arasında kullanılan çeşitli protokolleri soyutlayarak tek bir iletişim katmanı üzerinden veri alışverişi yapılmasını mümkün kılar. Böylece, sistemler arası bağımlılığı en aza indirir ve iletişim standartlarını belirler.
Örnek:
İki mikro servis arasında güvenli ve standart bir şekilde veri alışverişi sağlamak istiyorsanız, ESB kullanarak bu süreci yönetebilirsiniz. ESB, verilerin hangi formatta, hangi güvenlik protokolleriyle gönderileceğini belirler ve bu işlemi tek bir mimari çatıda toplar.
MassTransit, özellikle .NET için geliştirilmiş olan, açık kaynak kodlu bir mesajlaşma kütüphanesidir. MassTransit, dağıtık sistemler arasında asenkron, mesaj tabanlı iletişimi sağlar. Dağıtık sistemlerde yüksek kullanılabilirlik, ölçeklenebilirlik ve güvenilirlik sağlayarak mikro servis mimarilerinde ideal bir çözümdür.
Kod Örneği:
MassTransit’i RabbitMQ ile entegre ederek basit bir mesaj gönderme ve alma işlemi gerçekleştirelim.
Publisher (Mesaj Gönderici):
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "example_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
string message = "Hello, World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "example_queue", basicProperties: null, body: body);
Console.WriteLine(" [x] Sent {0}", message);
Consumer (Mesaj Alıcı):
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "example_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "example_queue", autoAck: true, consumer: consumer);
MassTransit, RabbitMQ gibi mesaj broker’lar ile kolayca entegre edilebilir. MassTransit, dağıtık mikro servis yapılarında, mesaj iletimini yönetmek için kullanılan bir kütüphanedir. Bu entegrasyon, servisler arası iletişim süreçlerinde mesajlaşmanın kolay bir şekilde yapılandırılmasını sağlar.
MassTransit RabbitMQ Entegrasyonu ile Mesaj Gönderme:
MassTransit kullanarak, mesajlar publish/subscribe (yayınla/abone ol) modeline göre işlenir. Publish, bir mesajın birden fazla kuyruk tarafından alınmasını sağlar.
// MassTransit Publisher (Mesaj Gönderici)
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(new Uri("rabbitmq://localhost"), h => { });
});
await busControl.Publish<IMessage>(new { Text = "Hello, MassTransit!" });
MassTransit Consumer (Mesaj Alıcı):
public class ExampleConsumer : IConsumer<IMessage>
{
public Task Consume(ConsumeContext<IMessage> context)
{
Console.WriteLine("Received message: {0}", context.Message.Text);
return Task.CompletedTask;
}
}
// Consumer configuration
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.ReceiveEndpoint("example_queue", e =>
{
e.Consumer<ExampleConsumer>();
});
});
await busControl.StartAsync();
Send Kullanımı:
await sendEndpoint.Send<IMessage>(new { Text = "Direct Message to a Queue" });
Publish Kullanımı:
await busControl.Publish<IMessage>(new { Text = "Message to All Subscribers" });
Enterprise Service Bus, özellikle mikro servis mimarilerinde, farklı servislerin birbiriyle iletişimini kolaylaştırır. MassTransit ise .NET uygulamalarında dağıtık sistemler arasında mesaj tabanlı iletişimi sağlar ve RabbitMQ gibi mesaj broker’larla entegrasyon imkanı sunar. Bu araçlar, özellikle aşağıdaki durumlarda tercih edilir:
Request/Response Pattern, bir sistemin bir istek (request) göndermesi ve bu isteğe bir yanıt (response) alması sürecini ifade eder. Bu pattern genellikle iki bileşen arasında asenkron bir iletişim sağlamak için kullanılır. İstek, belirli bir işlem yapılması amacıyla gönderilir, yanıt ise işlem sonucunu döndürür.
Bu desenin başlıca avantajları:
MassTransit ile bu desenin uygulanmasını adım adım inceleyelim.
İlk adım olarak, isteğin ve yanıtın formatını belirleyen mesaj kontratlarını oluşturmalıyız. Bu kontratlar, iki servis arasındaki iletişimi belirler. Bir servis, belirli bir formatta istek gönderir ve diğer servis bu isteği yine belirli bir formatta yanıtlar.
Request ve Response Mesajlarının Oluşturulması:
public record RequestMessage(string Text, int MessageNo);
public record ResponseMessage(string Text);
Bu örnekte, RequestMessage
sınıfı, gönderilecek isteğin formatını tanımlar. İki alan içerir:
Text
: İsteğin içeriği.MessageNo
: Mesajın sıra numarası.ResponseMessage
sınıfı ise yanıtın formatını belirler ve yalnızca bir Text
alanı içerir.
Publisher, isteği gönderen bileşendir. Bu bileşen, belirli bir kuyruğa mesaj gönderir ve yanıtı bekler. MassTransit ile bu işlem oldukça basittir.
Publisher’ın Yapılandırılması:
Publisher, MassTransit’in RequestClient
yapısını kullanarak isteği gönderir ve yanıtı bekler. Bu işlem asenkron olarak gerçekleştirilir.
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(new Uri("rabbitmq://localhost"), h => { });
});
await bus.StartAsync();
try
{
var requestClient = bus.CreateRequestClient<RequestMessage>(new Uri("rabbitmq://localhost/request_queue"));
int i = 1;
while (true)
{
var request = new RequestMessage($"Mesaj {i}", i);
var response = await requestClient.GetResponse<ResponseMessage>(request);
Console.WriteLine($"Response received: {response.Message.Text}");
i++;
await Task.Delay(2000); // 2 saniyede bir mesaj gönder
}
}
finally
{
await bus.StopAsync();
}
Bu kodda, MassTransit RequestClient
kullanılarak request_queue
kuyruğuna mesaj gönderilir. Gönderilen her mesajın ardından, yanıtı (ResponseMessage
) almak için beklenir.
Consumer (kanser), gelen isteği işleyen ve yanıtı gönderen bileşendir. Bu bileşen, isteği aldıktan sonra gerekli işlemleri yapar ve yanıtı geri gönderir.
Consumer’ın Yapılandırılması:
Consumer, belirli bir kuyruğu dinleyerek isteği alır ve işleme tabi tutar. Sonrasında ise yanıtı gönderen Respond
fonksiyonu kullanılır.
public class RequestMessageConsumer : IConsumer<RequestMessage>
{
public async Task Consume(ConsumeContext<RequestMessage> context)
{
Console.WriteLine($"Received request: {context.Message.Text}, MessageNo: {context.Message.MessageNo}");
// İsteği işleme ve yanıt gönderme
var response = new ResponseMessage($"Yanıt: Mesaj {context.Message.MessageNo} işlendi.");
await context.RespondAsync(response);
}
}
Bu örnekte, RequestMessageConsumer
sınıfı, RequestMessage
türündeki mesajları dinler. Her mesaj geldiğinde, bu mesaj işlenir ve bir ResponseMessage
ile yanıtlanır.
Consumer’ın mesaj kuyruğunu dinlemesi ve gelen isteklere yanıt vermesi için, RabbitMQ ile bağlantı kurup yapılandırılması gerekir.
Consumer’ı Başlatma:
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(new Uri("rabbitmq://localhost"), h => { });
cfg.ReceiveEndpoint("request_queue", ep =>
{
ep.Consumer<RequestMessageConsumer>();
});
});
await bus.StartAsync();
Console.WriteLine("Consumer is running. Press any key to exit");
Console.ReadKey();
await bus.StopAsync();
Bu kodda, request_queue
isimli bir kuyruk dinlenir ve bu kuyruktan gelen istekler RequestMessageConsumer
tarafından işlenir.
Her iki uygulamayı da çalıştırdığınızda, Publisher belirli aralıklarla istek gönderir ve Consumer bu isteklere yanıt verir. Publisher, yanıtı aldıktan sonra ekrana yazar. Mesajlar asenkron olarak işlenir ve her iki bileşen de bağımsız olarak çalışır.
Örnek Çıktı:
Received request: Mesaj 1, MessageNo: 1
Response received: Yanıt: Mesaj 1 işlendi.
Received request: Mesaj 2, MessageNo: 2
Response received: Yanıt: Mesaj 2 işlendi.
Bu örnek, isteğin başarılı bir şekilde alındığını ve yanıtın geri döndürüldüğünü gösterir.
RabbitMQ veya benzeri mesaj kuyruklama sistemlerinde, bir mesaj bir consumer (tüketici) tarafından alındığında, bu mesajın başarılı bir şekilde işlendiğini onaylamak gerekir. Bu onay, mesajın kuyruktan silinmesini sağlar. Eğer mesaj onaylanmazsa, mesaj tekrar kuyruğa geri gönderilir (requeue) ve başka bir tüketici tarafından işlenebilir.
Mesaj onayı, bir consumer‘ın mesajı işleyip işlemediğini belirlemek için kullanılır. Eğer mesaj düzgün işlenmezse, consumer tekrar denemek üzere mesajı yeniden kuyruğa alabilir. Bu mekanizma, mesajların kaybolmamasını ve güvenli bir şekilde işlenmesini sağlar.
MassTransit kütüphanesi, RabbitMQ gibi mesajlaşma sistemleriyle çalışırken message acknowledgment mekanizmasını otomatik olarak yönetir. Bu sayede, geliştiricilerin manuel olarak mesaj onayı yapmalarına gerek kalmaz.
MassTransit’in varsayılan davranışı şu şekildedir:
MassTransit, bu süreci otomatik olarak yönetir ve geliştiricinin manuel olarak BasicAck
veya BasicNack
gibi RabbitMQ işlevlerini çağırmasına gerek kalmaz.
Aşağıdaki örnek, MassTransit kullanarak bir mesajı işleyen ve otomatik olarak onaylayan bir consumer‘ı göstermektedir.
public class ExampleConsumer : IConsumer<ExampleMessage>
{
public async Task Consume(ConsumeContext<ExampleMessage> context)
{
try
{
// Mesajı işliyoruz
Console.WriteLine($"Mesaj alındı: {context.Message.Text}");
// İşlem başarılı olursa mesaj otomatik olarak kuyruktan silinir
await Task.CompletedTask;
}
catch (Exception ex)
{
// Hata oluşursa mesaj tekrar işlenmek üzere kuyrukta kalır
Console.WriteLine($"Hata: {ex.Message}");
throw;
}
}
}
Bu örnekte, ExampleConsumer
bir mesajı alır ve işler. Eğer mesajın işlenmesi sırasında herhangi bir hata olmazsa, MassTransit mesajı otomatik olarak onaylar. Ancak, bir hata meydana gelirse, mesaj yeniden işlenmek üzere kuyrukta kalır.
MassTransit, mesaj işleme sırasında oluşan hataları otomatik olarak yönetir. Bir mesajın işlenmesi sırasında bir hata oluştuğunda, MassTransit şu adımları takip eder:
Aşağıdaki örnekte, bir hata oluştuğunda mesajın yeniden kuyruğa alınmasını gösteriyoruz:
public class FaultTolerantConsumer : IConsumer<ExampleMessage>
{
public async Task Consume(ConsumeContext<ExampleMessage> context)
{
try
{
// Mesajı işlemeye çalış
Console.WriteLine($"Mesaj alındı: {context.Message.Text}");
// İşlem başarılı olursa, mesaj kuyruktan silinir
await Task.CompletedTask;
}
catch (Exception ex)
{
// Hata oluşursa mesajı tekrar işleme
Console.WriteLine($"İşleme sırasında hata oluştu: {ex.Message}");
// Hata yakalanır ve MassTransit mesajı tekrar işlemeye çalışır
throw; // Hata yeniden fırlatılır, mesaj kuyrukta kalır
}
}
}
MassTransit, bir mesaj işlenirken hata oluştuğunda, mesajın kaç kez tekrar işleneceğine dair politikaları yapılandırmanıza izin verir. Bu politika, mesajların kaybolmasını engeller ve başarısız mesajların güvenilir bir şekilde işlenmesini sağlar.
Örneğin, aşağıdaki kodda mesajın 3 kez tekrar işlenmesi sağlanmıştır:
cfg.ReceiveEndpoint("example_queue", e =>
{
e.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));
e.Consumer<ExampleConsumer>();
});
Bu örnekte, mesaj işleme başarısız olursa 3 kez, her biri 5 saniye arayla tekrar denenecektir.
Bir sonraki yazıda görüşmek dileğiyle!”