Physical Address

304 North Cardinal St.
Dorchester Center, MA 02124

RabbitMQ Eğitimi

RabbitMQ, dağıtık sistemler için güvenilir mesajlaşma ve kuyruk yönetimi sağlayan açık kaynaklı bir mesaj aracısıdır. Uygulamalar arasında veri ve görev iletimini kolaylaştırarak, iş yüklerini dağıtmak, asenkron işlem yapmak ve sistemlerin ölçeklenebilirliğini artırmak için kullanılır. Yüksek performans, güvenilirlik ve mesajlaşma esnekliği sunan RabbitMQ, mikro hizmet mimarileri ve bulut tabanlı çözümler için ideal bir çözümdür.

1. Mesaj Kuyruğu (Message Queue) Nedir?

Mesaj kuyruğu, iki veya daha fazla yazılımın asenkron olarak veri alışverişi yapmasını sağlayan bir iletişim modelidir. Bu modelde, bir yazılım bir kuyruk aracılığıyla başka bir yazılıma mesaj gönderir. Kuyrukta depolanan mesajlar, tüketici yazılım tarafından işlenir. RabbitMQ gibi mesaj kuyruğu sistemleri, birbirinden bağımsız sistemler arasında veri alışverişi sağlamak için kullanılır. Bu bağımsızlık; yazılımın platformu, dili, veritabanı veya başka özelliklerinden kaynaklanabilir.

Mesaj Kuyruğu İşleyişi:
  • Gönderen (Producer/Publisher): Mesajı kuyrukta depolayan yazılım.
  • Tüketen (Consumer): Mesajı kuyruğa düştükten sonra alan ve işleyen yazılım.

Bir örnek üzerinden ilerlersek:
Bir web uygulaması bir işlem yaptıktan sonra, bu işlemle ilgili verileri bir kuyruk aracılığıyla başka bir yazılıma gönderir. Kuyruğu takip eden ikinci yazılım, bu mesajı alır ve belirli işlemleri gerçekleştirir. Örneğin, bir web uygulaması kullanıcının yaptığı bir siparişin faturasını oluşturmak için kuyrukta bir mesaj bırakır, fatura oluşturma servisi bu mesajı işleyerek fatura işlemini tamamlar. Bu işlem asenkron olarak gerçekleşir, yani web uygulaması fatura oluşturma sürecini beklemez, fatura daha sonra oluşturulur ve kullanıcıya bildirilir.

2. Mesaj Aracısı (Message Broker) Nedir?

Mesaj aracısı, mesaj kuyruğunu yöneten sistemdir. RabbitMQ, Kafka gibi sistemler mesaj aracısı olarak adlandırılır. Mesaj aracısı, üretici ve tüketici arasında köprü görevi görür. Mesajları doğru şekilde kuyrukta depolar, gerektiğinde yönlendirir ve tüketicinin alması için hazır hale getirir.

RabbitMQ, bir mesaj aracısı olarak şu özelliklere sahiptir:

  • Cross-Platform Desteği: Farklı işletim sistemlerinde kullanılabilir.
  • Open Source: Açık kaynaklıdır, yani kullanıcılar tarafından geliştirilebilir ve düzenlenebilir.
  • Zengin Dokümantasyon: Kullanıcıların kolayca öğrenip kullanabilmesi için geniş bir dokümantasyon sunar.
  • Cloud Desteği: RabbitMQ, bulut hizmeti olarak da kullanılabilir ve ölçeklenebilir.
3. Mesaj Kuyruğunun ve Mesaj Aracısının Kullanım Amacı

Bazı senaryolarda sistemler arasındaki iletişim asenkron olmalıdır. Örneğin, bir e-ticaret sitesinde kullanıcı bir ürün siparişi verdiğinde, sipariş onayı aldıktan sonra fatura oluşturma, e-posta gönderme gibi işlemler sistemin arka planında çalışır. Bu işlemleri anlık olarak yapmak kullanıcıyı bekletmeye ve sistem performansını düşürmeye neden olur. İşte bu noktada mesaj kuyrukları devreye girer.

Örneğin:

  • Kullanıcı ödeme yaptıktan sonra fatura oluşturma işlemi için kuyrukta bir mesaj bırakılır.
  • Fatura servisi bu mesajı alır, işlemi gerçekleştirir ve kullanıcıya fatura iletilir.
    Bu süreç asenkron olduğundan, kullanıcı ödeme yaptıktan sonra beklemeden diğer işlemlerini yapabilir.
Senkron ve Asenkron İletişim
  • Senkron İletişim: İki yazılım birbirinden hemen sonuç bekler. Örneğin, bir web sayfası bir API’den veri istediğinde, yanıt gelene kadar bekler.
  • Asenkron İletişim: İki yazılım birbirinden hemen yanıt beklemez. Bir yazılım diğerine bir mesaj gönderir ve işine devam eder. Mesajı alan yazılım, uygun bir zamanda bu mesaja yanıt verir. Mesaj kuyruğu sistemleri bu modeli kullanır.
4. RabbitMQ’nun Özellikleri

RabbitMQ’nun sağladığı temel özellikler şunlardır:

  • Asenkron İletişim: Uzun süren işlemleri asenkron bir şekilde gerçekleştirir ve böylece sistem performansını artırır.
  • Ölçeklenebilirlik: RabbitMQ ile mikroservis mimarilerinde uygulamaların ölçeklenebilirliğini artırabilirsiniz. Örneğin, bir uygulamanın yoğunluğu artarsa, RabbitMQ kuyrukları ile işlemleri farklı servisler arasında dağıtabilirsiniz.
  • Güvenilir Mesaj İletimi: RabbitMQ, mesajların güvenli bir şekilde iletilmesini ve tüketilmesini garanti eder. Mesajların sırasıyla işlenmesini sağlar (FIFO – First In First Out).
5. RabbitMQ’nun Kullanım Senaryoları

RabbitMQ, özellikle şu tür senaryolarda tercih edilir:

  • Asenkron İşlem Gerektiren Durumlar: Fatura oluşturma, PDF dosya dönüştürme, e-posta gönderimi gibi işlemler asenkron yapılabilir.
  • Yüksek Performans Gerektiren Sistemler: Kullanıcının beklemesini gerektiren işlemler (özellikle ağır işleme sürecine sahip operasyonlar) RabbitMQ ile yönetilebilir.
  • Farklı Platformlar Arası İletişim: RabbitMQ, farklı platformlarda (Java, .NET, Python, vs.) yazılmış uygulamaların birbiriyle iletişim kurmasını sağlar.
6. RabbitMQ’nun Mimarisi ve İşleyişi

RabbitMQ, bir mesajın üreticiden (publisher) tüketiciye (consumer) iletilmesini şu yapılarla gerçekleştirir:

  • Publisher: Mesajı gönderen yazılımdır.
  • Exchange: Mesajların yönlendirildiği yapıdır. Exchange, gelen mesajları belirli kurallara göre kuyruklara yönlendirir.
  • Queue: Mesajların depolandığı yerdir.
  • Consumer: Mesajı alan ve işleyen yazılımdır.

Mesajın iletim süreci şu şekildedir:

  1. Publisher, bir mesaj üretir ve bunu Exchange’e gönderir.
  2. Exchange, mesajı doğru kuyruğa (queue) yönlendirir.
  3. Tüketici (consumer) kuyruğa düşen mesajı alır ve işler.
7. RabbitMQ Kullanımına Dair Örnek

Aşağıda basit bir RabbitMQ kullanımı örneği verilmiştir:

Producer (Mesaj Üreten) Kod Örneği (Python):

import pika

# RabbitMQ'ya bağlanma
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Bir kuyruk oluşturma
channel.queue_declare(queue='hello')

# Mesaj gönderme
channel.basic_publish(exchange='', routing_key='hello', body='Merhaba RabbitMQ!')
print(" [x] Mesaj gönderildi: 'Merhaba RabbitMQ!'")

# Bağlantıyı kapatma
connection.close()

Consumer (Mesaj Tüketen) Kod Örneği (Python):

import pika

# RabbitMQ'ya bağlanma
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Kuyruğu oluşturma
channel.queue_declare(queue='hello')

# Mesajı alma
def callback(ch, method, properties, body):
    print(f" [x] Mesaj alındı: {body}")

channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Mesaj bekleniyor. Çıkmak için CTRL+C yapın')
channel.start_consuming()

Bu örneklerde, bir Producer (mesaj üreten) bir mesaj gönderir ve Consumer (mesaj tüketen) bu mesajı alır ve işleyebilir. RabbitMQ’nun kullanımının temel mantığı bu şekildedir.

RabbitMQ Ortamının Docker ile Konteynerleştirilmesi ve CloudAMQP Üzerinden İncelenmesi
1. Docker ile RabbitMQ’nun Kurulumu

Docker kullanarak RabbitMQ’yu yerel ortamda çalıştırmak oldukça kolaydır. Docker, uygulamaları konteyner içinde çalıştıran bir platformdur ve bu, RabbitMQ’yu izole bir ortamda çalıştırmak için idealdir. İlk olarak, RabbitMQ’nun Docker ortamında nasıl kullanılacağını inceleyelim.

Adımlar:
  1. Docker’ı Kurun: Eğer sisteminizde Docker kurulu değilse, Docker’ın resmi sitesinden Docker’ı indirip kurabilirsiniz.
  2. RabbitMQ Docker Komutunu Çalıştırın: RabbitMQ’yu Docker ortamında çalıştırmak için aşağıdaki komut kullanılır:
   docker run -d --hostname my-rabbit --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3-management

Bu komut, RabbitMQ’yu çalıştırır ve RabbitMQ’nun yönetim paneline (port 15672) ve mesajların iletimi için gerekli olan AMQP portuna (port 5672) erişim sağlar.

  1. Yönetim Paneline Erişim: Docker konteynerı başlatıldıktan sonra, tarayıcınızdan http://localhost:15672 adresine giderek RabbitMQ’nun yönetim paneline erişebilirsiniz. Varsayılan giriş bilgileri:
  • Kullanıcı Adı: guest
  • Şifre: guest
  1. Yönetim Panelini Keşfetme: Giriş yaptıktan sonra, RabbitMQ’nun yönetim panelinde queue’ları (kuyrukları), exchange’leri ve bağlı client’ları görebilirsiniz. Bu panel üzerinden mesaj akışını ve kuyruğu takip edebilirsiniz.
Docker İle RabbitMQ Kullanımı için Ek Bilgiler:
  • Docker, yerel ortamda izole bir yapı sunarak RabbitMQ’nun hızlı ve kolay bir şekilde kurulmasına olanak tanır.
  • RabbitMQ’nun yönetim paneli üzerinden detaylı yapılandırmalar yapılabilir. Örneğin, kullanıcılar oluşturulabilir, yetkilendirme ayarları yapılabilir, kuyruğa mesajlar gönderilebilir ve mesajlar tüketilebilir.
2. CloudAMQP Üzerinden RabbitMQ Kullanımı

Docker dışında bir diğer seçenek ise RabbitMQ’yu CloudAMQP gibi bulut tabanlı bir hizmet üzerinden kullanmaktır. CloudAMQP, RabbitMQ’nun bulut ortamında çalışmasını sağlayan bir hizmettir ve herhangi bir yerel kurulum gerektirmez.

Adımlar:
  1. CloudAMQP Hesabı Oluşturun: CloudAMQP web sitesine gidip bir hesap oluşturabilirsiniz. Ücretsiz bir plan seçerek RabbitMQ’yu sınırlı bir şekilde kullanabilirsiniz.
  2. Yeni Bir Instance Oluşturun: Hesabınızı oluşturduktan sonra, CloudAMQP panelinde yeni bir RabbitMQ instance’ı (örnek) oluşturmanız gerekmektedir.
  • Plan Seçimi: Başlangıç için ücretsiz olan “Little Lemur” planını seçebilirsiniz. Bu plan sınırlı bir kapasiteye sahiptir, ancak temel RabbitMQ işlemleri için yeterlidir.
  • Instance İsimlendirme: Örneğin, instance’a “example” adını verebilirsiniz.
  1. Instance Yönetim Paneline Erişim: Instance oluşturulduktan sonra, CloudAMQP panelinden RabbitMQ instance’ınıza erişebilirsiniz. Burada bağlantı bilgileri, kullanıcı adı ve şifre gibi detaylar yer alır. Bu bilgileri kullanarak uygulamalarınızı bu instance’a bağlayabilirsiniz.
  2. RabbitMQ Yönetim Paneli: CloudAMQP üzerinden oluşturduğunuz RabbitMQ instance’ı için bir yönetim paneli bulunmaktadır. Bu panele CloudAMQP hesabınız üzerinden erişebilir ve Docker’da olduğu gibi queue’ları, exchange’leri ve diğer bileşenleri yönetebilirsiniz.
CloudAMQP Avantajları:
  • Kurulum Gerektirmez: CloudAMQP, herhangi bir yerel kurulum gerektirmez. Tek yapmanız gereken, bir hesap oluşturmak ve bir instance başlatmaktır.
  • Esneklik: CloudAMQP ile RabbitMQ’yu bulut tabanlı projelerinizde kolayca kullanabilirsiniz. Ücretli planlara geçerek daha fazla kaynak ve kapasite kullanabilirsiniz.
  • Farklı Bulut Sağlayıcıları: CloudAMQP, farklı bulut sağlayıcılarını destekler (AWS, Azure, Google Cloud). Böylece projelerinizi ihtiyaçlarınıza göre yapılandırabilirsiniz.
3. RabbitMQ Kullanımı: Docker ve CloudAMQP Arasındaki Farklar
  • Docker: RabbitMQ’yu yerel ortamda test etmek ve küçük projeler için idealdir. Hızlı kurulum ve izole bir ortam sağlar.
  • CloudAMQP: Daha büyük projeler veya ölçeklenebilir bulut tabanlı sistemler için uygundur. Ücretsiz planlar başlangıç için yeterlidir, ancak daha yüksek trafik gereksinimlerinde ücretli planlar gerekebilir.

Docker ve CloudAMQP’nin her ikisi de RabbitMQ’yu çalıştırmak için uygun seçeneklerdir ve kullanıcının ihtiyacına göre tercih edilebilir. Docker daha çok yerel testler ve geliştirme için idealdir, CloudAMQP ise gerçek zamanlı bulut uygulamaları için tercih edilebilir.

4. RabbitMQ Bağlantı Bilgileri ve Yapılandırma

CloudAMQP veya Docker ile RabbitMQ’yu kullanırken uygulamanızın bu servislere nasıl bağlanacağını bilmeniz gerekir. Her iki ortamda da bağlantı bilgileri önemlidir.

  • Docker için Bağlantı: Docker ile çalışırken RabbitMQ’ya localhost üzerinden bağlanabilirsiniz. Örneğin:
  import pika

  connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  channel = connection.channel()
  • CloudAMQP için Bağlantı: CloudAMQP size bir URL sağlar ve uygulamanızı bu URL üzerinden bağlayabilirsiniz. Örneğin:
  import pika

  url = "amqp://username:password@hostname/vhost"
  connection = pika.BlockingConnection(pika.URLParameters(url))
  channel = connection.channel()

Bu bağlantı bilgileri, uygulamanızın RabbitMQ sunucusuna bağlanıp mesaj alışverişi yapmasını sağlar.

RabbitMQ’da Exchange ve Binding Kavramları ile İşleyişleri
1. Exchange Nedir?

Exchange, RabbitMQ’da mesajların hangi kuyruğa yönlendirileceğini belirleyen bir yapılandırmadır. Publisher (mesajı gönderen) tarafından gönderilen mesajlar önce Exchange’e gelir ve Exchange, bu mesajları belirli kurallara göre uygun kuyruklara iletir.

Exchange’in Temel Görevi:
  • Publisher’dan gelen mesajların kuyruklara yönlendirilmesi.
  • Mesajın hangi kuyruklara gitmesi gerektiğini belirler.
  • Farklı Exchange türleri, mesajların farklı kriterlere göre kuyruklara yönlendirilmesini sağlar.

RabbitMQ’da birden fazla kuyruk olabilir ve hangi mesajın hangi kuyruğa gideceği, Exchange tarafından belirlenir. Exchange, mesajı doğrudan bir kuyrukla eşleştirebilir veya daha karmaşık senaryolarda mesajları birden fazla kuyruğa yönlendirebilir.

Routing Key ve Header Kavramları:
  • Routing Key: Mesajların hangi kuyruğa yönlendirileceğini belirleyen anahtardır. Exchange, bu anahtara göre mesajı doğru kuyruğa iletir. Ancak, tüm Exchange türleri Routing Key kullanmaz.
  • Header: Bazı Exchange türleri Routing Key yerine mesaj başlıklarını (header) kullanarak mesajları yönlendirir. Header’lar, key-value (anahtar-değer) formatında olup, mesajların kuyruklara yönlendirilmesinde esneklik sağlar.
2. Binding Nedir?

Binding, Exchange ile kuyruklar arasındaki ilişkiyi tanımlayan bir kavramdır. Bir kuyruk, bir Exchange’e bağlanır ve bu bağlanma işlemi Binding olarak adlandırılır. Bir Exchange birden fazla kuyruğa bağlanabilir ve mesajlar bu kuyruğa bağlanan Binding’lere göre yönlendirilir.

Binding Nasıl Çalışır?
  • Exchange ve kuyruklar arasında bir köprü oluşturur.
  • Bir mesaj gönderildiğinde, hangi kuyruğa yönlendirileceğini belirleyen bir haritadır.
  • Routing Key veya Header kullanılarak mesajlar kuyruklara yönlendirilir.

Bir Exchange’e birden fazla kuyruk bağlanabilir ve bu durumda, Exchange gelen mesajları hangi kuyruğa yönlendireceğini Binding’e göre belirler. Binding işlemi, mesajların doğru kuyruğa yönlendirilmesini sağlar.

3. Exchange Türleri

RabbitMQ’da farklı türlerde Exchange’ler bulunur. Bu Exchange türleri, mesajların kuyruklara nasıl yönlendirileceğini belirler ve her türün kendine özgü işleyişi vardır.

3.1. Direct Exchange

Direct Exchange, mesajların doğrudan belirli bir Routing Key’e sahip kuyruklara iletilmesini sağlar. Bu tür Exchange, en basit ve yaygın kullanılan türlerden biridir.

  • Routing Key: Bu Exchange türünde mesajlar, Routing Key kullanılarak doğrudan bir kuyruğa yönlendirilir. Örneğin, bir mesajın Routing Key’i “dosya_hatasi” ise, sadece bu anahtarla bağlanmış olan kuyruklara mesaj gönderilir.

Kullanım Senaryoları:

  • Hata mesajlarının izlenmesi: Dosya yükleme hatası, veritabanı bağlantı hatası gibi farklı hata türleri için ayrı kuyruklar oluşturulabilir ve bu kuyruklara hata türüne göre mesajlar yönlendirilir.

Kod Örneği (Python):

import pika

# RabbitMQ'ya bağlan
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Direct Exchange oluştur
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# Kuyruk oluştur ve bind et
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='error')

# Mesaj gönder
channel.basic_publish(exchange='direct_logs', routing_key='error', body='Hata Mesajı!')
3.2. Fanout Exchange

Fanout Exchange, mesajların tüm bağlı kuyruklara iletilmesini sağlar. Bu tür Exchange, herhangi bir Routing Key kullanmaz ve mesajlar Exchange’e bağlı olan tüm kuyruklara yönlendirilir.

  • Özellikler: Mesajlar, hangi kuyruğa gönderileceği konusunda ayrım yapmaksızın tüm kuyruklara dağıtılır.

Kullanım Senaryoları:

  • Tüm sistemlere ortak bir bildiri gönderme: Örneğin, mikroservis mimarisinde tüm servislere ortak bir duyuru gönderilmek istendiğinde Fanout Exchange kullanılabilir.
3.3. Topic Exchange

Topic Exchange, Routing Key kullanarak mesajların kuyruklara yönlendirilmesini sağlar, ancak burada Routing Key daha karmaşıktır. Routing Key, bir desen (pattern) olarak tanımlanır ve mesajlar bu desenlere göre yönlendirilir.

  • Routing Key: Bu türde Routing Key, joker karakterler (*, #) ile birlikte kullanılır. Örneğin, “dosya.*.hatasi” şeklinde bir anahtar, “dosya.yukleme.hatasi” veya “dosya.silme.hatasi” gibi mesajları yakalayabilir.

Kullanım Senaryoları:

  • Farklı mesaj türlerini ayırt etme: Örneğin, bir e-ticaret sisteminde siparişler onaylandı, iptal edildi veya iade edildi gibi farklı durumlar için ayrı kuyruklar oluşturulabilir ve siparişler ilgili kuyruğa yönlendirilebilir.
3.4. Headers Exchange

Headers Exchange, Routing Key yerine mesaj başlıklarını (header) kullanarak mesajları yönlendirir. Mesaj başlıkları, key-value formatında belirlenir ve mesajlar bu başlıkların eşleşmesine göre kuyruklara iletilir.

  • Özellikler: Mesaj başlıklarına dayalı yönlendirme yapar. Header değerleri ile kuyruklar arasında bir eşleştirme yapılır.

Kullanım Senaryoları:

  • Log sistemlerinde belirli başlıklara göre mesajları ayırt etmek: Örneğin, log seviyelerine göre mesajlar (hata, uyarı, bilgi) farklı kuyruklara yönlendirilebilir.
4. Exchange ve Binding Kavramlarının Kullanım Alanları

RabbitMQ’daki Exchange türleri, farklı mesaj yönlendirme senaryolarına uyacak şekilde tasarlanmıştır:

  • Direct Exchange: Spesifik bir kuyruğa doğrudan mesaj göndermek için kullanılır.
  • Fanout Exchange: Mesajları geniş bir şekilde, tüm kuyruklara dağıtmak için kullanılır.
  • Topic Exchange: Karmaşık desenlere göre mesaj yönlendirme gerektiğinde kullanılır.
  • Headers Exchange: Mesaj başlıklarına göre yönlendirme yapmak için idealdir.

RabbitMQ ile Etkili Publisher ve Consumer Uygulamaları Oluşturma
1. RabbitMQ Kütüphanesinin Yüklenmesi

Öncelikle, RabbitMQ’yu .NET Core uygulamalarıyla kullanabilmek için gerekli kütüphanenin yüklenmesi gerekmektedir. Bu kütüphane, NuGet paketi olarak indirilebilir:

Install-Package RabbitMQ.Client

Bu kütüphane, hem Publisher hem de Consumer uygulamalarında kullanılacak.

2. Publisher Uygulaması

Publisher uygulaması, bir kuyruğa mesaj gönderme işlemini gerçekleştirir. Bu uygulamanın temel adımları şu şekildedir:

a. RabbitMQ Bağlantısının Kurulması

Publisher uygulaması, RabbitMQ sunucusuna bağlanarak bir kanal açar. Bu kanal üzerinden kuyruğa mesajlar gönderilir. Bağlantı oluşturmak için şu adımlar izlenir:

var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
    // Kuyruk oluşturma
    channel.QueueDeclare(queue: "example_queue",
                         durable: false,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);

    // Mesaj gönderme
    string message = "Merhaba, RabbitMQ!";
    var body = Encoding.UTF8.GetBytes(message);

    channel.BasicPublish(exchange: "",
                         routingKey: "example_queue",
                         basicProperties: null,
                         body: body);
    Console.WriteLine(" [x] Gönderilen Mesaj: {0}", message);
}

Yukarıdaki kod, “example_queue” isimli bir kuyruk oluşturur ve bu kuyruğa bir mesaj gönderir. RabbitMQ’da mesajlar byte array olarak işlenir, bu yüzden mesajlar gönderilmeden önce byte[] formatına dönüştürülür.

b. Kuyruk Oluşturma
channel.QueueDeclare(queue: "example_queue",
                     durable: false,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

Bu kod parçası, RabbitMQ’da “example_queue” isimli bir kuyruk oluşturur. Kuyruğun durable özelliği, mesajların RabbitMQ yeniden başlatıldığında korunup korunmayacağını belirtir. Exclusive ise kuyruğun sadece bu bağlantı için geçerli olup olmadığını belirler. autoDelete true olduğunda, kuyruk en son consumer bağlantısını kapattığında silinir.

3. Consumer Uygulaması

Consumer uygulaması, kuyruktaki mesajları okuma işlemini gerçekleştirir. Publisher uygulamasında kuyruğa gönderilen mesajlar, Consumer tarafından işlenir.

a. RabbitMQ Bağlantısının Kurulması

Publisher ile benzer şekilde, Consumer da RabbitMQ’ya bağlanır ve bir kanal açar. Ancak burada fark, kuyruktan mesajları almak için BasicConsume metodunun kullanılmasıdır:

var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "example_queue",
                         durable: false,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine(" [x] Alınan Mesaj: {0}", message);
    };

    channel.BasicConsume(queue: "example_queue",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" Mesaj bekleniyor...");
    Console.ReadLine();
}
b. Mesaj Okuma

Yukarıdaki kodda, EventingBasicConsumer sınıfı kullanılarak RabbitMQ kuyruğundaki mesajlar alınıyor. Mesaj alındığında, Received olayı tetiklenir ve gelen mesaj byte array formatından string formatına dönüştürülerek konsola yazdırılır.

4. Publisher ve Consumer Arasındaki İletişim

RabbitMQ, Publisher ve Consumer arasında asenkron bir iletişim sağlar. Publisher tarafından kuyruğa gönderilen mesajlar, Consumer tarafından sırasıyla alınır ve işlenir. Bu iletişim, BasicPublish ve BasicConsume metodları ile sağlanır.

RabbitMQ: İleri Düzey Kuyruk Mimarisi ve Uygulamaları

RabbitMQ, mesajları kuyruğa ekleme ve tüketme süreçlerinde oldukça esnek bir yapıya sahiptir. Ancak uygulamaların büyümesi ve ölçeklenmesi durumunda daha gelişmiş kuyruk mimarileri gereklidir.

Gelişmiş Kuyruk Mimarisi Nedir?

RabbitMQ’nun ana fikri, yoğun kaynak gerektiren işleri hemen tamamlamak yerine, bu işleri daha sonra yapılacak şekilde planlamaktır. Bu nedenle kuyruklar kullanılır ve mesajlar kuyruğa atılır, ardından tüketiciler (consumer) bu mesajları asenkron olarak işler.

Gelişmiş kuyruk mimarisi, aşağıdaki başlıkları içerir:

  • Mesajların ve kuyrukların kalıcılığı (durability)
  • Mesajların birden fazla tüketiciye dağıtım stratejisi
  • Mesaj onaylama sistemi (message acknowledgment)
  • Tüketici işlemlerinde mesajların kuyruktan silinmesi

Bu yapıların her biri, ölçeklenebilir ve güvenilir bir mesaj kuyruğu sistemi kurmak için hayati önem taşır.

Round Robin Mesaj Dağıtımı

RabbitMQ’nun varsayılan davranışı, mesajları round robin (sıralı) bir şekilde tüketicilere dağıtmaktır. Bu, bir mesajın ilk tüketiciye, ikinci mesajın ikinci tüketiciye, vb. sırayla gönderilmesi anlamına gelir. Bu sıralı dağıtım, ölçeklenebilirlik ve performans için gereklidir.

Kod Örneği: Round Robin Dağıtımı

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "hello",
                         durable: false,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine(" [x] Received {0}", message);
    };
    channel.BasicConsume(queue: "hello",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
}

Yukarıdaki örnekte, her tüketici sırayla mesajları alır. Her bir mesaj, round robin mantığıyla farklı tüketicilere gönderilir.

Mesaj Onaylama Sistemi (Message Acknowledgment)

RabbitMQ’nun varsayılan davranışı, bir mesaj tüketiciye gönderildikten sonra kuyruktan silinmesidir. Ancak bu durum, tüketici işlemi sırasında hata meydana gelirse veri kaybına neden olabilir. Bu nedenle mesaj onaylama sistemi kullanılır. Mesajlar, ancak tüketici başarıyla işlediğini onayladıktan sonra kuyruktan silinir.

Kod Örneği: Mesaj Onaylama

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);

    // Mesaj başarıyla işlendikten sonra onay gönder
    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);

Burada, BasicAck fonksiyonu mesajın başarıyla işlenip işlenmediğini RabbitMQ’ya bildirir. Eğer BasicAck çağrılmazsa, mesaj kuyruktan silinmez ve başka bir tüketiciye yeniden gönderilir.

Mesaj Kalıcılığı (Message Durability)

RabbitMQ, varsayılan olarak kuyruk ve mesajları bellekte tutar. Bu, RabbitMQ sunucusu kapandığında mesajların kaybolmasına neden olabilir. Mesajların ve kuyrukların kalıcı olabilmesi için durable ve persistent yapılandırmalarının yapılması gereklidir.

Kod Örneği: Kalıcı Kuyruk ve Mesaj

// Kalıcı kuyruk oluşturma
channel.QueueDeclare(queue: "durable_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);

// Kalıcı mesaj gönderme
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
var body = Encoding.UTF8.GetBytes("Kalıcı mesaj");
channel.BasicPublish(exchange: "", routingKey: "durable_queue", basicProperties: properties, body: body);

Bu örnekte, durable parametresi ile kuyruk kalıcı hale getirilir. Ayrıca, mesajın kalıcı olması için Persistent özelliği true olarak ayarlanmıştır. Bu şekilde, RabbitMQ sunucusu yeniden başlatılsa bile mesajlar kaybolmaz.

Fair Dispatch (Adil Dağıtım)

Varsayılan Round Robin dağıtımı, tüm tüketicilere sırayla mesaj gönderir, ancak bazı tüketiciler daha yavaş olabilir ve bu durumda eşit olmayan bir yük dağılımı ortaya çıkabilir. Fair Dispatch, her bir tüketicinin yalnızca işleyebildiği kadar mesaj almasını sağlar.

Kod Örneği: Fair Dispatch

// Prefetch count ile adil dağıtım
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

Bu yapılandırmada, her bir tüketici sadece bir mesaj alır ve mesajı işleyene kadar yeni bir mesaj gönderilmez. Bu, yükün daha adil bir şekilde dağıtılmasını sağlar.

RabbitMQ’da Mesajları Reddetme (Negative Acknowledgment)

Tüketici, bazı durumlarda mesajı işleyemeyebilir ve bu durumda mesajı yeniden kuyruğa göndermek isteyebilir. BasicNack ile mesajın işlenmediğini ve tekrar işlenmesi gerektiğini bildirebiliriz.

Kod Örneği: Mesajı Reddetme

// Mesajı işleyememe durumu
channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);

BasicNack ile mesajı yeniden kuyruğa ekleyebiliriz. Eğer requeue parametresi true ise, mesaj tekrar kuyruğa eklenir ve başka bir tüketici tarafından işlenir.

RabbitMQ: Direct Exchange Mekanizması ve Çalışma Prensibi
Exchange Nedir?

RabbitMQ’da mesajlar Exchange adı verilen yapılar üzerinden kuyruklara yönlendirilir. Bir Exchange, mesajları alır ve belirlenen kurallara göre bu mesajları ilgili kuyruğa (queue) gönderir. Direct Exchange, bu Exchange türlerinden biridir ve mesajların belirli bir anahtar (routing key) kullanılarak hedef kuyruklara yönlendirilmesini sağlar.

Direct Exchange Nedir?

Direct Exchange, mesajların belirli bir routing key (yönlendirme anahtarı) aracılığıyla doğrudan belirli bir kuyruğa gönderilmesini sağlayan Exchange türüdür. Bu davranışta, birden fazla kuyruk olabilir, ancak gönderilen mesajlar yalnızca belirtilen anahtar (routing key) ile uyuşan kuyruğa gider.

Direct Exchange’in ana mantığı şudur:

  • Publisher (mesajı gönderen taraf), mesajı gönderirken bir routing key belirler.
  • Bu routing key, belirli bir kuyruğa yönlendirilir.
  • Kuyruk, bu anahtarla eşleşirse, mesaj o kuyruk tarafından tüketilir.
Direct Exchange Yapısı

Bir RabbitMQ uygulamasında Direct Exchange kullanmak için aşağıdaki adımlar takip edilir:

  1. Exchange oluşturulur: Bu, mesajların yönlendirileceği bir Exchange tanımıdır.
  2. Kuyruk oluşturulur: Mesajların iletileceği bir kuyruk tanımlanır.
  3. Binding işlemi yapılır: Exchange ve kuyruk arasında bir bağlantı (binding) yapılır. Bu işlemde routing key kullanılır.
  4. Mesaj gönderilir: Publisher, ilgili Exchange üzerinden mesaj gönderir ve mesaj, belirtilen routing key’e sahip olan kuyrukta son bulur.
Direct Exchange ile Mesaj Gönderme

Direct Exchange ile mesaj göndermenin temel adımlarını ve kod örneklerini aşağıda açıklıyoruz.

1. Exchange Oluşturma

Öncelikle bir Direct Exchange oluşturulur. Bu Exchange, mesajları routing key kullanarak belirli kuyruklara yönlendirecektir.

// Exchange oluşturma
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");

Yukarıdaki kodda, direct_logs adında bir Direct Exchange oluşturulmuştur. Exchange türü “direct” olarak belirtilmiştir.

2. Kuyruk Oluşturma

Mesajların yönlendirileceği bir kuyruk oluşturulur.

// Kuyruk oluşturma
channel.QueueDeclare(queue: "error_logs", durable: false, exclusive: false, autoDelete: false, arguments: null);

Bu örnekte, error_logs adında bir kuyruk oluşturulmuştur.

3. Exchange ve Kuyruk Arasında Binding Yapma

Exchange ile kuyruk arasında bir bağlantı kurulur. Bu bağlantı, routing key kullanılarak yapılır.

// Binding işlemi
channel.QueueBind(queue: "error_logs", exchange: "direct_logs", routingKey: "error");

Yukarıdaki kodda, error_logs kuyruğu, direct_logs Exchange’ine “error” routing key ile bağlanmıştır. Bu, sadece routing key “error” olan mesajların bu kuyruğa gitmesini sağlar.

4. Mesaj Gönderme

Publisher, oluşturulan Direct Exchange üzerinden bir mesaj gönderir ve bir routing key belirtir. Mesaj, bu key ile eşleşen kuyruğa yönlendirilir.

// Mesaj gönderme
string message = "Error: Something went wrong!";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "direct_logs", routingKey: "error", basicProperties: null, body: body);

Yukarıdaki örnekte, error routing key’ine sahip bir mesaj gönderilmektedir. Bu mesaj, error key ile bağlı olan kuyrukta (yani error_logs kuyruğunda) son bulacaktır.

Consumer (Tüketici) Tarafında Mesaj Alımı

Tüketici tarafında, belirli bir kuyruktaki mesajlar alınır ve işlenir.

// Tüketici oluşturma
var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);
};

// Kuyruğu dinleme
channel.BasicConsume(queue: "error_logs", autoAck: true, consumer: consumer);

Bu kod, error_logs kuyruğunu dinler ve yeni bir mesaj geldiğinde, mesajı alır ve ekrana yazdırır.

Örnek Senaryo

Bir e-ticaret uygulamasında, siparişlerle ilgili log mesajlarının farklı tiplerde olabileceğini varsayalım: info, warning, error. Her bir log tipi için ayrı kuyruklar oluşturup, bu logları farklı yönlendirme anahtarları ile ilgili kuyruklara gönderebiliriz.

  • info_logs kuyruğu: info routing key’ine sahip mesajları alır.
  • warning_logs kuyruğu: warning routing key’ine sahip mesajları alır.
  • error_logs kuyruğu: error routing key’ine sahip mesajları alır.

Bu yapı, her bir log tipine özel işlemler yapılmasını sağlar. Örneğin, error logları için bir hata çözümleme servisi devreye girebilir.

RabbitMQ: Fanout Exchange Mekanizması ve Yayın Davranışı
Fanout Exchange Nedir?

Fanout Exchange, mesajların herhangi bir routing key kullanmaksızın Exchange’e bağlı olan tüm kuyruklara gönderilmesini sağlar. Bu Exchange türü, mesajın hangi kuyruğa gideceğini ayırt etmez; bir mesajı alır ve tüm bağlı kuyruklara iletir. Broadcast (yayın) mantığıyla çalışır, yani mesaj bir defa yayınlanır ve birden fazla kuyruğa yönlendirilir.

Fanout Exchange şu durumlar için idealdir:

  • Bir mesajı birden fazla kuyruğa dağıtmanız gerekiyorsa.
  • Routing key kullanmadan tüm bağlı kuyruğa aynı mesajı iletmek istediğinizde.
Fanout Exchange’in Özellikleri
  • Routing key kullanılmaz: Fanout Exchange’de routing key önemli değildir. Mesajlar, bağlı tüm kuyruklara otomatik olarak iletilir.
  • Yayın tipi mesajlaşma sağlar: Tek bir mesaj birden fazla kuyruğa dağıtılır.
  • Broadcast mantığıyla çalışır: Fanout Exchange, aynı mesajı birden fazla tüketiciye yönlendirmek için kullanılır.
Fanout Exchange Kullanımı

Fanout Exchange’i kullanarak bir mesajı tüm kuyruklara iletmek için aşağıdaki adımları izliyoruz:

1. Exchange Oluşturma

Öncelikle bir Fanout Exchange oluştururuz. Bu Exchange, mesajları bağlı olan tüm kuyruklara yönlendirecektir.

// Fanout Exchange oluşturma
channel.ExchangeDeclare(exchange: "logs", type: "fanout");

Bu örnekte, logs adında bir Fanout Exchange oluşturulmuştur. type parametresi olarak "fanout" belirtilmiştir, çünkü bu bir Fanout Exchange’dir.

2. Kuyruklar Oluşturma ve Binding Yapma

Fanout Exchange‘e bağlı kuyruklar oluşturulur ve bu kuyruklar Exchange‘e bağlanır (binding yapılır).

// Kuyruk oluşturma ve Exchange'e bağlama
string queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");

Bu kodda, rastgele bir isimle bir kuyruk oluşturulur ve bu kuyruk logs adlı Fanout Exchange’e bağlanır. routingKey parametresi boş bırakılmıştır, çünkü Fanout Exchange’de routing key kullanılmaz.

3. Mesaj Gönderme

Publisher, Fanout Exchange üzerinden bir mesaj gönderir. Mesaj, tüm bağlı kuyruklara iletilecektir.

// Mesaj gönderme
string message = "info: Hello World!";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);

Yukarıdaki örnekte, logs Exchange’ine bir mesaj gönderilmektedir. routingKey boş bırakılmıştır, çünkü Fanout Exchange routing key’e ihtiyaç duymaz.

4. Consumer (Tüketici) Tarafında Mesaj Alımı

Tüketiciler (Consumers), ilgili kuyruktan mesajları alır ve işleme koyar.

// Tüketici oluşturma
var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);
};

// Kuyruğu dinleme
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

Bu kod, belirlenen kuyruktaki mesajları dinler ve yeni bir mesaj geldiğinde bu mesajı alır, ardından ekrana yazdırır.

Fanout Exchange’in Pratik Kullanımı

Şimdi Fanout Exchange’in daha pratik bir senaryoda nasıl kullanılabileceğine bakalım. Diyelim ki bir yazılım güncelleme sisteminiz var ve bir sunucuya yeni bir güncelleme geldiğinde bu bilgiyi aynı anda farklı servislere iletmek istiyorsunuz. Örneğin, bir servisin e-posta bildirimleri, bir diğer servisin ise log tutma işlemleri olabilir. Bu senaryoda Fanout Exchange kullanmak ideal olacaktır.

  1. Publisher tarafı, güncelleme bilgisi içeren bir mesajı Fanout Exchange üzerinden gönderir.
  2. Consumer tarafında ise, birden fazla kuyruk vardır. Örneğin, e-posta bildirim kuyruğu ve log kuyruğu. Her iki kuyruk da aynı mesajı alır, ancak farklı işler yapar. E-posta servisi bu mesajı kullanarak bildirim gönderir, log servisi ise log tutar.
Fanout Exchange ile Birden Fazla Kuyruk Kullanımı

Aşağıda, bir Fanout Exchange kullanarak birden fazla kuyruğa mesaj gönderen bir uygulama örneği bulunmaktadır.

Publisher (Mesaj Gönderen)

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// Fanout Exchange oluştur
channel.ExchangeDeclare(exchange: "logs", type: "fanout");

// Mesaj gönder
string message = "info: Hello World!";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
Console.WriteLine(" [x] Sent {0}", message);

Consumer (Mesaj Alan)

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// Rastgele bir kuyruk oluştur
string queueName = channel.QueueDeclare().QueueName;

// Kuyruğu Fanout Exchange'e bağla
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");

// Mesajları dinle
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);
};

channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");

Bu örnekte, bir mesaj Fanout Exchange aracılığıyla birden fazla kuyrukta dinlenebilir ve işlenebilir.

RabbitMQ: Topic Exchange Mekanizması ve Esnek Yönlendirme
Topic Exchange Nedir?

Topic Exchange, mesajların routing key‘lere dayalı olarak kuyruklara iletilmesini sağlar. Routing key, nokta (.) ile ayrılmış kelimelerden oluşur ve kuyruklar, bu routing key‘lere göre mesajları alır. Routing key kullanarak daha esnek bir mesaj yönlendirme mekanizması kurabiliriz.

Topic Exchange‘in ana özellikleri:

  • Wildcard destekler: Routing key‘ler joker karakterler kullanılarak belirlenebilir.
    • * (yıldız): Noktayla ayrılmış bir kelimeyi temsil eder.
    • # (dies): Noktayla ayrılmış bir veya daha fazla kelimeyi temsil eder.
  • Mesajlar, routing key‘lerine göre uygun kuyruklara iletilir.
Topic Exchange Kullanımı

Topic Exchange‘i kullanarak mesajları uygun kuyruklara yönlendirmek için şu adımları izleriz:

1. Exchange Oluşturma

İlk adım olarak, bir Topic Exchange oluşturalım.

// Topic Exchange oluşturma
channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");

Bu kodda, topic_logs adında bir Topic Exchange oluşturulmuştur. Exchange’in türü topic olarak belirtilmiştir.

2. Kuyruk Oluşturma ve Binding Yapma

Her bir kuyruk, belirli bir routing key ile Topic Exchange‘e bağlanmalıdır.

// Kuyruk oluşturma ve Topic Exchange'e bağlama
string queueName = channel.QueueDeclare().QueueName;

// Kuyruğu Topic Exchange'e bağlama (binding)
channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "kern.*");

Bu kodda, rastgele bir isimle bir kuyruk oluşturulmuş ve bu kuyruk kern.* pattern’ine uygun tüm mesajları almak üzere Topic Exchange‘e bağlanmıştır. *, noktayla ayrılmış bir kelimenin yerine geçer. Yani, kern.* pattern’i, kern.info, kern.warning gibi routing key‘lere sahip mesajları alacaktır.

3. Mesaj Gönderme

Publisher, belirli bir routing key kullanarak Topic Exchange üzerinden mesaj gönderir.

// Mesaj gönderme
string routingKey = "kern.info";
string message = "A kernel info log.";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "topic_logs", routingKey: routingKey, basicProperties: null, body: body);

Bu örnekte, routing key olarak kern.info kullanılarak bir mesaj gönderilmiştir. Bu mesaj, kern.* pattern’ine uyan tüm kuyruklara iletilecektir.

4. Tüketici Tarafında Mesaj Alımı

Tüketiciler, belirli bir routing key pattern’ine uygun mesajları almak üzere ayarlanır.

// Tüketici oluşturma
var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);
};

// Kuyruğu dinleme
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

Bu kod, belirtilen kuyruktaki mesajları dinler ve gelen mesajı ekrana yazdırır.

Topic Exchange ile Esnek Mesaj Yönlendirme

Topic Exchange‘i kullanarak mesaj yönlendirmesini oldukça esnek hale getirebiliriz. Örneğin, bir log sistemi oluşturduğunuzu düşünelim. Routing key‘ler ile farklı seviyelerde loglar (info, warning, error) ve farklı bileşenlerden (kernel, auth, database) gelen mesajları yönlendirebiliriz.

Bu yapı, şu şekilde oluşturulabilir:

  • kern.*: Kernel ile ilgili tüm logları alır.
  • *.error: Hangi bileşenden olursa olsun, hata (error) loglarını alır.
  • auth.#: Auth ile ilgili tüm logları alır, çünkü # birden fazla kelimeyi temsil eder.
Örnek Senaryo

Aşağıda, Topic Exchange‘i kullanarak esnek bir mesaj yönlendirme sistemi kuran bir örnek bulunmaktadır:

Publisher (Mesaj Gönderen)

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// Topic Exchange oluştur
channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");

// Mesaj gönder
string routingKey = "auth.error";
string message = "An authentication error occurred.";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "topic_logs", routingKey: routingKey, basicProperties: null, body: body);
Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);

Consumer (Mesaj Alan)

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// Rastgele bir kuyruk oluştur
string queueName = channel.QueueDeclare().QueueName;

// Kuyruğu Topic Exchange'e bağla
channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.error");

// Mesajları dinle
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received '{0}':'{1}'", ea.RoutingKey, message);
};

channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");

Bu senaryoda, routing key olarak auth.error kullanılarak bir mesaj gönderilmiştir. Tüketici ise *.error pattern’ine uygun mesajları almaktadır, dolayısıyla auth.error mesajını alır ve işler.

Topic Exchange ile Routing Key Pattern’ları

Topic Exchange, routing key‘lerde joker karakterler kullanarak mesaj yönlendirmeyi esnek hale getirir:

  • * (yıldız): Noktayla ayrılmış bir kelimeyi temsil eder.
  • # (dies): Noktayla ayrılmış bir veya daha fazla kelimeyi temsil eder.

Bu joker karakterler sayesinde oldukça esnek bir mesaj yönlendirme mekanizması kurabiliriz. Örneğin:

  • kern.*: Kernel ile ilgili herhangi bir log alır (kern.info, kern.error gibi).
  • *.error: Bileşeni fark etmeksizin tüm error loglarını alır (auth.error, db.error gibi).
  • auth.#: Auth ile ilgili tüm logları alır (auth.info, auth.warning, auth.error gibi).
RabbitMQ: Header Exchange Mekanizması ve Özelleştirilmiş Yönlendirme
Header Exchange Nedir?

Header Exchange, mesajları header‘da tanımlı anahtar-değer çiftlerine (key-value pairs) göre kuyruklara ileten bir RabbitMQ Exchange türüdür. Header Exchange kullanırken, mesajların yönlendirileceği kuyruklar, mesajın içinde bulunan header verilerine göre belirlenir. Diğer Exchange türlerinden farklı olarak, routing key kullanılmaz; bunun yerine mesajın başlık kısmında tanımlanan özel bilgiler (header) kullanılır.

Header Exchange’in Diğer Exchange Türleri ile Karşılaştırılması

Direct ve Topic Exchange‘lerde mesaj yönlendirmesi routing key‘e dayalı olarak yapılır. Mesajın hangi kuyruklara iletileceği routing key‘in değeri üzerinden belirlenir. Ancak Header Exchange‘de yönlendirme tamamen header bilgilerine dayalıdır ve routing key kullanılmaz.

Mesela, Direct Exchange‘de bir mesajın yönlendirileceği kuyruk routing key ile eşleştirilirken, Header Exchange‘de kuyruk ile mesajın başlığındaki bilgiler eşleştiğinde mesaj ilgili kuyruğa gönderilir. Bu, daha esnek ve belirli ihtiyaçlara göre şekillendirilebilir bir mesaj yönlendirme sistemi sunar.

Header Exchange Kullanımı

Header Exchange‘i kullanarak mesaj yönlendirmek için şu adımları izlememiz gerekir:

1. Header Exchange Oluşturma

İlk olarak, bir Header Exchange oluşturulur. Bu Exchange, mesajları başlık bilgilerine göre yönlendirecektir.

// Header Exchange oluşturma
channel.ExchangeDeclare(exchange: "header_logs", type: "headers");

Bu kod, header_logs adında bir Header Exchange oluşturur. Exchange’in türü headers olarak belirtilmiştir.

2. Kuyruk Oluşturma ve Header Tanımlama

Her kuyruk, belirli header bilgilerine göre Header Exchange‘e bağlanmalıdır. Yani, mesajda tanımlı header bilgileri ile kuyrukta tanımlı header bilgileri eşleşirse mesaj ilgili kuyruğa yönlendirilir.

// Kuyruk oluşturma ve Header Exchange'e bağlama
string queueName = channel.QueueDeclare().QueueName;

var args = new Dictionary<string, object>
{
    { "x-match", "all" },  // all: Tüm header'lar eşleşmeli, any: Bir header eşleşse yeterli
    { "format", "pdf" },
    { "type", "report" }
};

// Kuyruğu Header Exchange'e bağlama
channel.QueueBind(queue: queueName, exchange: "header_logs", routingKey: "", arguments: args);

Bu örnekte, x-match parametresi ile eşleştirme kuralları belirlenmiştir:

  • x-match: "all": Mesajın tüm header’larının belirtilen kuyrukla eşleşmesi gerekir.
  • x-match: "any": Mesajın herhangi bir header’ı eşleşirse yeterlidir.

Ayrıca, format ve type header’ları tanımlanmış ve bu kuyruk yalnızca bu başlık bilgilerine sahip mesajları alacaktır.

3. Mesaj Gönderme

Publisher, mesajı göndermeden önce mesajın başlık kısmına (header) gerekli bilgileri ekler.

// Mesaj gönderme
var properties = channel.CreateBasicProperties();
properties.Headers = new Dictionary<string, object>
{
    { "format", "pdf" },
    { "type", "report" }
};

string message = "PDF report generated.";
var body = Encoding.UTF8.GetBytes(message);

// Header bilgileri ile mesaj gönderme
channel.BasicPublish(exchange: "header_logs", routingKey: "", basicProperties: properties, body: body);

Bu örnekte, mesajın başlık kısmına format ve type bilgileri eklenmiştir. Mesaj, yalnızca bu header bilgilerine uygun kuyruklara iletilecektir.

4. Tüketici Tarafında Mesaj Alımı

Tüketiciler, belirtilen header bilgilerine sahip mesajları alacak şekilde ayarlanır.

// Tüketici oluşturma
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received '{0}'", message);
};

// Kuyruğu dinleme
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

Bu kod, belirtilen kuyruktaki mesajları dinler ve gelen mesajı ekrana yazdırır.

Header Exchange ile Esnek Yönlendirme

Header Exchange, mesaj yönlendirme konusunda esnek bir yapı sunar. Farklı mesajları, başlıklarında tanımlı bilgiler üzerinden doğru kuyruklara yönlendirebilirsiniz. Özellikle x-match parametresi sayesinde yönlendirme işlemini çok daha kontrollü bir şekilde yapabilirsiniz.

x-match değerleri:

  • all: Tüm header bilgileri eşleşmelidir.
  • any: Herhangi bir header bilgisi eşleşirse mesaj iletilir.
Örnek Senaryo

Bir raporlama sistemi düşünelim. PDF ve Excel formatlarında raporlar üretiliyor. Ayrıca bu raporlar, farklı kategorilere (finans, insan kaynakları, vb.) göre ayrılıyor. Bu sistemde, farklı format ve kategoriye sahip raporlar doğru kuyruklara yönlendirilmelidir.

Publisher (Mesaj Gönderen)

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// Header Exchange oluştur
channel.ExchangeDeclare(exchange: "header_logs", type: "headers");

// Mesaj gönder
var properties = channel.CreateBasicProperties();
properties.Headers = new Dictionary<string, object>
{
    { "format", "pdf" },
    { "category", "finance" }
};

string message = "Finance PDF report.";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "header_logs", routingKey: "", basicProperties: properties, body: body);
Console.WriteLine(" [x] Sent '{0}'", message);

Consumer (Mesaj Alan)

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// Rastgele bir kuyruk oluştur ve Header Exchange'e bağla
string queueName = channel.QueueDeclare().QueueName;

var args = new Dictionary<string, object>
{
    { "x-match", "all" },
    { "format", "pdf" },
    { "category", "finance" }
};

// Kuyruğu Header Exchange'e bağla
channel.QueueBind(queue: queueName, exchange: "header_logs", routingKey: "", arguments: args);

// Mesajları dinle
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received '{0}'", message);
};

channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");

Bu senaryoda, mesajın header kısmına format: "pdf" ve category: "finance" bilgileri eklenmiştir. Bu mesaj, yalnızca bu başlık bilgilerine uygun kuyruklara iletilecektir.

RabbitMQ: Etkili Mesaj Tasarımı ve Uygulama Yöntemleri
Mesaj Tasarımı Nedir?

Mesaj tasarımı, belirli bir senaryoya göre mesajların iletilmesi ve işlenmesi için kullanılan yapılandırılmış bir modeldir. Yazılım dünyasındaki tasarım desenlerine benzer şekilde, mesaj tasarımları da belirli problemleri çözmek için uygulanır. Bu tasarımlar, mesajların nasıl bir yapıda olacağını, iki veya daha fazla servis arasında nasıl bir iletişim kurulacağını belirler.

Yaygın Mesaj Tasarımları

RabbitMQ’da kullanılan en yaygın mesaj tasarımları şunlardır:

  1. Point-to-Point (P2P)
  2. Publish/Subscribe
  3. Work Queue (İş Kuyruğu)
  4. Request/Response

Bu tasarımlar farklı senaryolara göre şekillenir ve her biri mesajların farklı şekillerde işlenmesini sağlar.

1. Point-to-Point Tasarımı

Point-to-Point (P2P), bir mesajın doğrudan bir kuyruğa gönderilmesi ve bu kuyruğun işleyen bir tüketici (consumer) tarafından alınması prensibine dayanır. Bu tasarımda, mesaj yalnızca tek bir tüketici tarafından işlenir. Eğer bir mesajın yalnızca tek bir tüketici tarafından alınması gerekiyorsa, bu tasarım uygundur.

Uygulama Adımları:
  • Publisher (mesajı gönderen), mesajı belirli bir kuyruğa gönderir.
  • Tüketici (consumer), bu kuyruğu dinleyerek mesajları alır.
// Publisher
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "example_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);

string message = "Hello, World!";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "", routingKey: "example_queue", basicProperties: null, body: body);
Console.WriteLine(" [x] Sent {0}", message);

// Consumer
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "example_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "example_queue", autoAck: true, consumer: consumer);
2. Publish/Subscribe Tasarımı

Publish/Subscribe tasarımında, publisher mesajı bir exchange (dağıtıcı) üzerinden birçok kuyruğa gönderir. Bu tasarım, bir mesajın birden fazla tüketici tarafından işlenmesi gerektiği senaryolar için uygundur. Bu modelde mesajlar, exchange’e bağlı olan tüm kuyruklara iletilir.

Uygulama Adımları:
  • Publisher, mesajı bir exchange‘e gönderir.
  • Exchange, mesajı bağlı olan tüm kuyruklara iletir.
  • Tüm tüketiciler bu mesajı alır.
// Publisher
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);

string message = "Log message!";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
Console.WriteLine(" [x] Sent {0}", message);

// Consumer
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
var queueName = channel.QueueDeclare().QueueName;

channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);
};

channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
3. Work Queue Tasarımı

Work Queue tasarımı, birden fazla tüketici arasında iş yükünü eşit dağıtmayı amaçlar. Bu tasarımda, bir mesaj birden fazla tüketici tarafından değil, yalnızca biri tarafından işlenir. Bu, paralel işleme ve iş yükü dağılımı için idealdir.

Uygulama Adımları:
  • Publisher, mesajı kuyruğa gönderir.
  • Tüketicilerden yalnızca biri mesajı alır ve işler.
// Publisher
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);

string message = "Task message";
var body = Encoding.UTF8.GetBytes(message);

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);
Console.WriteLine(" [x] Sent {0}", message);

// Consumer
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);

    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
4. Request/Response Tasarımı

Request/Response tasarımı, publisher’ın bir istekte bulunup, tüketiciden bir yanıt (response) beklediği senaryolarda kullanılır. Publisher, bir kuyruktan mesaj gönderir ve tüketici bu mesajı işleyip yanıtını başka bir kuyruktan döner.

Uygulama Adımları:
  • Publisher, istek mesajını bir kuyruğa gönderir.
  • Tüketici, bu isteği işler ve yanıtı başka bir kuyruktan gönderir.
  • Publisher, yanıtı bekler ve alır.
// Publisher
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

var replyQueueName = channel.QueueDeclare().QueueName;
var consumer = new EventingBasicConsumer(channel);

var correlationId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = correlationId;

string message = "Request message";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: body);

consumer.Received += (model, ea) =>
{
    if (ea.BasicProperties.CorrelationId == correlationId)
    {
        var response = Encoding.UTF8.GetString(ea.Body.ToArray());
        Console.WriteLine(" [x] Received response: {0}", response);
    }
};
channel.BasicConsume(queue: replyQueueName, autoAck: true, consumer: consumer);

// Consumer
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var response = "";
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received request: {0}", message);

    response = "Response to " + message;

    var reply

Props = channel.CreateBasicProperties();
    replyProps.CorrelationId = ea.BasicProperties.CorrelationId;

    var responseBytes = Encoding.UTF8.GetBytes(response);

    channel.BasicPublish(exchange: "", routingKey: ea.BasicProperties.ReplyTo, basicProperties: replyProps, body: responseBytes);
    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);

Enterprise Service Bus (ESB) Nedir ve Nasıl Çalışır?

Enterprise Service Bus (ESB), farklı yazılım sistemlerinin birbiriyle iletişim kurmasını sağlayan bir mimaridir. Bu yapı, servisler arasında entegrasyon sağlayarak iletişim süreçlerini kolaylaştırır. Farklı sistemler arasında kullanılan çeşitli protokolleri soyutlayarak tek bir iletişim katmanı üzerinden veri alışverişi yapılmasını mümkün kılar. Böylece, sistemler arası bağımlılığı en aza indirir ve iletişim standartlarını belirler.

ESB’nin Avantajları:
  • Farklı servislerin birbiriyle daha kolay ve esnek bir şekilde entegre olmasını sağlar.
  • Sistemlerin tek bir teknolojiye bağımlı olmasını engeller.
  • Kafka, RabbitMQ gibi mesaj broker’lar arasında geçişi kolaylaştırır.
  • Mikro servis mimarilerinde servisler arasında iletişimi standart hale getirir.

Örnek:
İki mikro servis arasında güvenli ve standart bir şekilde veri alışverişi sağlamak istiyorsanız, ESB kullanarak bu süreci yönetebilirsiniz. ESB, verilerin hangi formatta, hangi güvenlik protokolleriyle gönderileceğini belirler ve bu işlemi tek bir mimari çatıda toplar.

MassTransit Nedir?

MassTransit, özellikle .NET için geliştirilmiş olan, açık kaynak kodlu bir mesajlaşma kütüphanesidir. MassTransit, dağıtık sistemler arasında asenkron, mesaj tabanlı iletişimi sağlar. Dağıtık sistemlerde yüksek kullanılabilirlik, ölçeklenebilirlik ve güvenilirlik sağlayarak mikro servis mimarilerinde ideal bir çözümdür.

MassTransit’in Özellikleri:
  • Mesaj tabanlı, gevşek bağlı ve asenkron olarak tasarlanmış bir sistemdir.
  • Mikro servis mimarisine uygun olarak yüksek dereceli güvenilirlik sağlar.
  • Güçlü mesaj desenlerini destekler (Saga Pattern, Orchestration, Choreography).
  • Dağıtık transaction desteği sunar.
  • Test edilebilirlik ve monitoring (izleme) özelliklerine sahiptir.

Kod Örneği:
MassTransit’i RabbitMQ ile entegre ederek basit bir mesaj gönderme ve alma işlemi gerçekleştirelim.

Publisher (Mesaj Gönderici):

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "example_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);

string message = "Hello, World!";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "", routingKey: "example_queue", basicProperties: null, body: body);
Console.WriteLine(" [x] Sent {0}", message);

Consumer (Mesaj Alıcı):

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "example_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "example_queue", autoAck: true, consumer: consumer);
RabbitMQ ile MassTransit Entegrasyonu

MassTransit, RabbitMQ gibi mesaj broker’lar ile kolayca entegre edilebilir. MassTransit, dağıtık mikro servis yapılarında, mesaj iletimini yönetmek için kullanılan bir kütüphanedir. Bu entegrasyon, servisler arası iletişim süreçlerinde mesajlaşmanın kolay bir şekilde yapılandırılmasını sağlar.

MassTransit RabbitMQ Entegrasyonu ile Mesaj Gönderme:
MassTransit kullanarak, mesajlar publish/subscribe (yayınla/abone ol) modeline göre işlenir. Publish, bir mesajın birden fazla kuyruk tarafından alınmasını sağlar.

// MassTransit Publisher (Mesaj Gönderici)
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host(new Uri("rabbitmq://localhost"), h => { });
});

await busControl.Publish<IMessage>(new { Text = "Hello, MassTransit!" });

MassTransit Consumer (Mesaj Alıcı):

public class ExampleConsumer : IConsumer<IMessage>
{
    public Task Consume(ConsumeContext<IMessage> context)
    {
        Console.WriteLine("Received message: {0}", context.Message.Text);
        return Task.CompletedTask;
    }
}

// Consumer configuration
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.ReceiveEndpoint("example_queue", e =>
    {
        e.Consumer<ExampleConsumer>();
    });
});

await busControl.StartAsync();
Send ve Publish Farkı
  • Send: Bir mesajın belirli bir kuyruğa gönderilmesini sağlar. Mesaj, belirli bir kuyrukta işlenir ve sadece o kuyruğa özgüdür.
  • Publish: Bir mesajın, ilgili tüm kuyruklara gönderilmesini sağlar. Bu model, event-driven (olay güdümlü) sistemlerde, bir olayın tüm abonelere iletilmesini sağlar.

Send Kullanımı:

await sendEndpoint.Send<IMessage>(new { Text = "Direct Message to a Queue" });

Publish Kullanımı:

await busControl.Publish<IMessage>(new { Text = "Message to All Subscribers" });
Enterprise Service Bus & MassTransit Neden Kullanılır?

Enterprise Service Bus, özellikle mikro servis mimarilerinde, farklı servislerin birbiriyle iletişimini kolaylaştırır. MassTransit ise .NET uygulamalarında dağıtık sistemler arasında mesaj tabanlı iletişimi sağlar ve RabbitMQ gibi mesaj broker’larla entegrasyon imkanı sunar. Bu araçlar, özellikle aşağıdaki durumlarda tercih edilir:

  • Mikro servislerin birbiriyle haberleşmesinde bağımsızlık ve esneklik sağlamak.
  • Mesajlaşma süreçlerini standardize etmek.
  • Dağıtık sistemler arasında asenkron iletişim sağlamak.
  • Servisler arası güvenilir ve ölçeklenebilir bir yapı kurmak.
MassTransit ile Request/Response Pattern Uygulama Rehberi
Request/Response Pattern Nedir?

Request/Response Pattern, bir sistemin bir istek (request) göndermesi ve bu isteğe bir yanıt (response) alması sürecini ifade eder. Bu pattern genellikle iki bileşen arasında asenkron bir iletişim sağlamak için kullanılır. İstek, belirli bir işlem yapılması amacıyla gönderilir, yanıt ise işlem sonucunu döndürür.

Bu desenin başlıca avantajları:

  • Asenkron iletişim sağlayarak sistemlerin birbirini beklemeden çalışmasına imkan tanır.
  • Ölçeklenebilirliği artırır.
  • Servislerin sorumluluklarını net bir şekilde ayırır.

MassTransit ile bu desenin uygulanmasını adım adım inceleyelim.

Adım 1: Mesaj Kontratlarının Oluşturulması

İlk adım olarak, isteğin ve yanıtın formatını belirleyen mesaj kontratlarını oluşturmalıyız. Bu kontratlar, iki servis arasındaki iletişimi belirler. Bir servis, belirli bir formatta istek gönderir ve diğer servis bu isteği yine belirli bir formatta yanıtlar.

Request ve Response Mesajlarının Oluşturulması:

public record RequestMessage(string Text, int MessageNo);
public record ResponseMessage(string Text);

Bu örnekte, RequestMessage sınıfı, gönderilecek isteğin formatını tanımlar. İki alan içerir:

  • Text: İsteğin içeriği.
  • MessageNo: Mesajın sıra numarası.

ResponseMessage sınıfı ise yanıtın formatını belirler ve yalnızca bir Text alanı içerir.

Adım 2: Publisher (İstek Gönderici) Oluşturulması

Publisher, isteği gönderen bileşendir. Bu bileşen, belirli bir kuyruğa mesaj gönderir ve yanıtı bekler. MassTransit ile bu işlem oldukça basittir.

Publisher’ın Yapılandırılması:
Publisher, MassTransit’in RequestClient yapısını kullanarak isteği gönderir ve yanıtı bekler. Bu işlem asenkron olarak gerçekleştirilir.

var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host(new Uri("rabbitmq://localhost"), h => { });
});

await bus.StartAsync();

try
{
    var requestClient = bus.CreateRequestClient<RequestMessage>(new Uri("rabbitmq://localhost/request_queue"));

    int i = 1;
    while (true)
    {
        var request = new RequestMessage($"Mesaj {i}", i);
        var response = await requestClient.GetResponse<ResponseMessage>(request);

        Console.WriteLine($"Response received: {response.Message.Text}");

        i++;
        await Task.Delay(2000); // 2 saniyede bir mesaj gönder
    }
}
finally
{
    await bus.StopAsync();
}

Bu kodda, MassTransit RequestClient kullanılarak request_queue kuyruğuna mesaj gönderilir. Gönderilen her mesajın ardından, yanıtı (ResponseMessage) almak için beklenir.

Adım 3: Consumer (Yanıtlayıcı) Oluşturulması

Consumer (kanser), gelen isteği işleyen ve yanıtı gönderen bileşendir. Bu bileşen, isteği aldıktan sonra gerekli işlemleri yapar ve yanıtı geri gönderir.

Consumer’ın Yapılandırılması:
Consumer, belirli bir kuyruğu dinleyerek isteği alır ve işleme tabi tutar. Sonrasında ise yanıtı gönderen Respond fonksiyonu kullanılır.

public class RequestMessageConsumer : IConsumer<RequestMessage>
{
    public async Task Consume(ConsumeContext<RequestMessage> context)
    {
        Console.WriteLine($"Received request: {context.Message.Text}, MessageNo: {context.Message.MessageNo}");

        // İsteği işleme ve yanıt gönderme
        var response = new ResponseMessage($"Yanıt: Mesaj {context.Message.MessageNo} işlendi.");
        await context.RespondAsync(response);
    }
}

Bu örnekte, RequestMessageConsumer sınıfı, RequestMessage türündeki mesajları dinler. Her mesaj geldiğinde, bu mesaj işlenir ve bir ResponseMessage ile yanıtlanır.

Adım 4: Consumer’ın Başlatılması

Consumer’ın mesaj kuyruğunu dinlemesi ve gelen isteklere yanıt vermesi için, RabbitMQ ile bağlantı kurup yapılandırılması gerekir.

Consumer’ı Başlatma:

var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host(new Uri("rabbitmq://localhost"), h => { });

    cfg.ReceiveEndpoint("request_queue", ep =>
    {
        ep.Consumer<RequestMessageConsumer>();
    });
});

await bus.StartAsync();

Console.WriteLine("Consumer is running. Press any key to exit");
Console.ReadKey();

await bus.StopAsync();

Bu kodda, request_queue isimli bir kuyruk dinlenir ve bu kuyruktan gelen istekler RequestMessageConsumer tarafından işlenir.

Adım 5: Test ve Çalıştırma

Her iki uygulamayı da çalıştırdığınızda, Publisher belirli aralıklarla istek gönderir ve Consumer bu isteklere yanıt verir. Publisher, yanıtı aldıktan sonra ekrana yazar. Mesajlar asenkron olarak işlenir ve her iki bileşen de bağımsız olarak çalışır.

Örnek Çıktı:

Received request: Mesaj 1, MessageNo: 1
Response received: Yanıt: Mesaj 1 işlendi.

Received request: Mesaj 2, MessageNo: 2
Response received: Yanıt: Mesaj 2 işlendi.

Bu örnek, isteğin başarılı bir şekilde alındığını ve yanıtın geri döndürüldüğünü gösterir.

MassTransit Kütüphanesinde Message Acknowledgment (Mesaj Onayı) Üzerine
Mesaj Onayı Nedir?

RabbitMQ veya benzeri mesaj kuyruklama sistemlerinde, bir mesaj bir consumer (tüketici) tarafından alındığında, bu mesajın başarılı bir şekilde işlendiğini onaylamak gerekir. Bu onay, mesajın kuyruktan silinmesini sağlar. Eğer mesaj onaylanmazsa, mesaj tekrar kuyruğa geri gönderilir (requeue) ve başka bir tüketici tarafından işlenebilir.

Mesaj onayı, bir consumer‘ın mesajı işleyip işlemediğini belirlemek için kullanılır. Eğer mesaj düzgün işlenmezse, consumer tekrar denemek üzere mesajı yeniden kuyruğa alabilir. Bu mekanizma, mesajların kaybolmamasını ve güvenli bir şekilde işlenmesini sağlar.

MassTransit’te Mesaj Onayı

MassTransit kütüphanesi, RabbitMQ gibi mesajlaşma sistemleriyle çalışırken message acknowledgment mekanizmasını otomatik olarak yönetir. Bu sayede, geliştiricilerin manuel olarak mesaj onayı yapmalarına gerek kalmaz.

MassTransit’in varsayılan davranışı şu şekildedir:

  • Bir mesaj, bir consumer tarafından alındığında, işlem başarıyla tamamlanana kadar mesaj onaylanmaz.
  • Mesaj başarıyla işlendiğinde, MassTransit bu mesajı otomatik olarak onaylar ve kuyruktan siler.
  • Eğer işlem sırasında bir hata oluşursa, MassTransit mesajı kuyruktan silmez ve mesaj yeniden işlenmek üzere kuyruğa döner.
Örnek Senaryo: Mesaj Onayı Süreci
  1. Bir mesaj RabbitMQ kuyruğuna gelir.
  2. MassTransit Consumer bu mesajı alır ve işlemeye başlar.
  3. İşlem tamamlanana kadar mesaj kuyruktan silinmez.
  4. Eğer işlem başarıyla tamamlarsa, mesaj otomatik olarak onaylanır ve kuyruktan silinir.
  5. İşlem sırasında bir hata meydana gelirse, mesaj tekrar kuyruğa eklenir ve başka bir consumer tarafından işlenmek üzere hazır hale gelir.

MassTransit, bu süreci otomatik olarak yönetir ve geliştiricinin manuel olarak BasicAck veya BasicNack gibi RabbitMQ işlevlerini çağırmasına gerek kalmaz.

Kod Örneği: Mesaj Onay Mekanizması

Aşağıdaki örnek, MassTransit kullanarak bir mesajı işleyen ve otomatik olarak onaylayan bir consumer‘ı göstermektedir.

public class ExampleConsumer : IConsumer<ExampleMessage>
{
    public async Task Consume(ConsumeContext<ExampleMessage> context)
    {
        try
        {
            // Mesajı işliyoruz
            Console.WriteLine($"Mesaj alındı: {context.Message.Text}");

            // İşlem başarılı olursa mesaj otomatik olarak kuyruktan silinir
            await Task.CompletedTask;
        }
        catch (Exception ex)
        {
            // Hata oluşursa mesaj tekrar işlenmek üzere kuyrukta kalır
            Console.WriteLine($"Hata: {ex.Message}");
            throw;
        }
    }
}

Bu örnekte, ExampleConsumer bir mesajı alır ve işler. Eğer mesajın işlenmesi sırasında herhangi bir hata olmazsa, MassTransit mesajı otomatik olarak onaylar. Ancak, bir hata meydana gelirse, mesaj yeniden işlenmek üzere kuyrukta kalır.

Mesaj İşleme Sırasında Hata Yönetimi

MassTransit, mesaj işleme sırasında oluşan hataları otomatik olarak yönetir. Bir mesajın işlenmesi sırasında bir hata oluştuğunda, MassTransit şu adımları takip eder:

  1. Mesaj işlenmeye başlanır.
  2. Eğer işleme sırasında bir hata oluşursa, bu hata yakalanır ve mesaj tekrar kuyruğa döner (requeue).
  3. MassTransit, mesajı tekrar işlemek için belirli bir süre boyunca yeniden denemeye devam eder. Yeniden deneme politikasını özelleştirebilirsiniz (örneğin, mesajın kaç kez yeniden işleneceği veya ne kadar süre sonra tekrar deneneceği).

Aşağıdaki örnekte, bir hata oluştuğunda mesajın yeniden kuyruğa alınmasını gösteriyoruz:

public class FaultTolerantConsumer : IConsumer<ExampleMessage>
{
    public async Task Consume(ConsumeContext<ExampleMessage> context)
    {
        try
        {
            // Mesajı işlemeye çalış
            Console.WriteLine($"Mesaj alındı: {context.Message.Text}");

            // İşlem başarılı olursa, mesaj kuyruktan silinir
            await Task.CompletedTask;
        }
        catch (Exception ex)
        {
            // Hata oluşursa mesajı tekrar işleme
            Console.WriteLine($"İşleme sırasında hata oluştu: {ex.Message}");

            // Hata yakalanır ve MassTransit mesajı tekrar işlemeye çalışır
            throw; // Hata yeniden fırlatılır, mesaj kuyrukta kalır
        }
    }
}
Mesajların Tekrar İşlenmesi (Retry)

MassTransit, bir mesaj işlenirken hata oluştuğunda, mesajın kaç kez tekrar işleneceğine dair politikaları yapılandırmanıza izin verir. Bu politika, mesajların kaybolmasını engeller ve başarısız mesajların güvenilir bir şekilde işlenmesini sağlar.

Örneğin, aşağıdaki kodda mesajın 3 kez tekrar işlenmesi sağlanmıştır:

cfg.ReceiveEndpoint("example_queue", e =>
{
    e.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));
    e.Consumer<ExampleConsumer>();
});

Bu örnekte, mesaj işleme başarısız olursa 3 kez, her biri 5 saniye arayla tekrar denenecektir.

Kaynakça:

Öne geçmenin sırrı, başlamaktır.

Mark Twain

Bir sonraki yazıda görüşmek dileğiyle!”

Leave a Reply

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir


7 + 10 = ?