• בלוג
  • שתי דרכים להגדיר תורים ב RabbitMQ

שתי דרכים להגדיר תורים ב RabbitMQ

15/04/2022

מערכת ניהול התורים RabbitMQ מציעה לנו שתי דרכים מרכזיות לבניית קונפיגורציה. במדריך זה נבנה תוכניות פשוטות לשליחה וקבלת הודעות ונראה מהן שתי הדרכים להגדרת התורים ומתי נרצה להשתמש בכל שיטה.

1. מה אנחנו בונים

הדוגמה שלנו היום מורכבת משתי תוכניות - אחת כותבת הודעה ל RabbitMQ והשניה מקבלת הודעה ומדפיסה אותה למסך. אבל, התוכניות האלה מחוברות ב Fanout Exchange, מה שאומר שכל הודעה שנכתבת נשלחת אוטומטית לכל התוכניות שמקשיבות על התור. מבנה זה יאפשר לי להריץ מספר עותקים של התוכנית השניה ולקבל את ההודעות בכל עותק. בנוסף אוכל בעתיד להוסיף עוד תוכניות שקוראות והן יוכלו לעשות דברים אחרים עם ההודעות.

מבנה זה מתאים לדוגמה למערכת לוגים מבוזרת, שם תהליכים מוסימים יוכלו לכתוב הודעות לוג, ותהליכים אחרים יהיו אחראיים לכתיבת הודעות הלוג ליעד שלהן - למשל תהליך מסוים יכתוב את כל ההודעות לקובץ על הדיסק, תהליך אחר יכתוב את ההודעות לטבלה בבסיס הנתונים ואחר ישלח את הלוגים לאחסון בענן.

החלק בקוד בשפת רובי ששולח הודעות לתור נראה כך:

loop do
  x.publish("#{rand} - Hello World")
  sleep 1
end

והחלק בקוד בשפת רובי שקורא מהתור ומדפיס הודעות למסך הוא:

queue.subscribe(block: true) do |_delivery_info, _properties, body|
  puts " [x] Received #{body}"
end

2. הגדרת התורים בתוך קוד התוכנית

גישה ראשונה לבניית המערכת היא להגדיר את התורים בתוך קוד התוכנית. הייתרון בגישה זו הוא שהכל נמצא במקום אחד ואנחנו (המתכנתים) מרגישים בשליטה על המערכת. אם צריך לשנות משהו מספיק לשנות רק קוד. הקוד להקמת התורים בתוכנית ששולחת הודעות יהיה:

channel = connection.create_channel
queue = channel.queue('')
x = channel.fanout('loggers')
queue.bind(x)

וזה מביא אותנו גם לחיסרון בגישה - את אותו הקוד שמקים את התורים אנחנו צריכים לכתוב בכל אחת מהתוכניות שעובדות עם RabbitMQ. כלומר גם בתוכנית שקוראת הודעות נצטרך לכתוב בלוק כזה:

channel = connection.create_channel
queue = channel.queue('')
x = channel.fanout('loggers')
queue.bind(x)

ככל שיותר תוכניות יעבדו עם התורים וככל שהן כתובות ביותר שפות ומתוחזקות על ידי צוותים שונים, כך הגדרת הקשרים בין התורים בקוד הופכת פחות פרקטית. וזה מביא אותנו לגישה השניה-

3. הגדרת התורים בתוך Docker Image

דרך נוספת להגדיר את התורים והקשרים ביניהם ב RabbitMQ היא לבנות אימג' של רביט שהתורים כבר מוגדרים בו. כאן יש לנו קצת יותר עבודה בהקמה אבל יהיה יותר קל לסנכרן בין תוכניות שונות.

אני אוהב להשתמש ב Management Plugin של RabbitMQ בשביל להגדיר את התורים והקשרים ביניהם דרך ממשק גרפי. אני מריץ שרת RabbitMQ שפלאגין הניהול שלו מופעל עם דוקר באמצעות הפקודה:

$ docker run --rm -p 5672:5672 -p 15672:15672 rabbitmq:3-management

עכשיו אני יכול להיכנס לממשק הניהול דרך הדפדפן בכתובת:

localhost:15672

התחברות עם משתמש וסיסמה guest ו guest תכניס אותי למסך ניהול התורים. שם אני מבצע את הפעולות הבאות:

  1. בטאב Queues אני יוצר Queue חדש וקורא לו messages.

  2. בטאב Exchanges אני יוצר Exchange חדש מסוג Fanout וקורא לו loggers. בתפריט bindings של ה Exchange אני יוצר Binding חדש, מחבר את ה Exchange לתור שיצרתי ובוחר בתור מפתח ניתוב את המילה logmessage.

  3. בטאב Overview אני בוחר באפשרות Export definitions ולוחץ על כפתור Download broker definitios.

זה נתן לי קובץ JSON שנראה כך:

{
  "rabbit_version": "3.9.15",
  "rabbitmq_version": "3.9.15",
  "product_name": "RabbitMQ",
  "product_version": "3.9.15",
  "users": [
    {
      "name": "guest",
      "password_hash": "KpcShz9x5hNdr6gP7/4UAuZGM4hwt1HzI8ENYfnbBo5ZuyGG",
      "hashing_algorithm": "rabbit_password_hashing_sha256",
      "tags": [
        "administrator"
      ],
      "limits": {}
    }
  ],
  "vhosts": [
    {
      "name": "/"
    }
  ],
  "permissions": [
    {
      "user": "guest",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    }
  ],
  "topic_permissions": [],
  "parameters": [],
  "global_parameters": [
    {
      "name": "internal_cluster_id",
      "value": "rabbitmq-cluster-id-aUEEtfFz1z464iTHnlF0yw"
    }
  ],
  "policies": [],
  "queues": [
    {
      "name": "messages",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "arguments": {
        "x-queue-type": "classic"
      }
    }
  ],
  "exchanges": [
    {
      "name": "loggers",
      "vhost": "/",
      "type": "fanout",
      "durable": true,
      "auto_delete": false,
      "internal": false,
      "arguments": {}
    }
  ],
  "bindings": [
    {
      "source": "loggers",
      "vhost": "/",
      "destination": "messages",
      "destination_type": "queue",
      "routing_key": "logmessage",
      "arguments": {}
    }
  ]
}

אני שומר את הקובץ בתיקיה חדשה ולידו יוצר קובץ בשם Dockerfile עם התוכן הבא:

FROM rabbitmq:3

COPY 20-load-my-definitions.conf /etc/rabbitmq/conf.d/
COPY definitions.json /etc/rabbitmq/

RUN chown rabbitmq:rabbitmq /etc/rabbitmq/definitions.json

ונשאר לנו רק להוסיף עוד קובץ אחד - קובץ הקונפיגורציה שטוען את ההגדרות מה json. נוסיף לתיקיה את הקובץ 20-load-my-definitions.conf עם התוכן הבא:

load_definitions = /etc/rabbitmq/definitions.json

כשכל הקבצים במקום אני יכול לבנות את האימג' עם:

$ docker build . -t my-rabbitmq-example

ומריץ עם:

$ docker run --rm -p 5672:5672 my-rabbitmq-example

עכשיו אם אני מנסה להתחבר עם קוד מהסוג הזה:

channel = connection.create_channel
x = channel.fanout('loggers')

אני אקבל שגיאה. בגלל שה Exchange כבר הוגדר אני חייב לציין בדיוק את הפרמטרים שאיתם ה Exchange הוגדר כדי שאוכל לעבוד איתו. התוכנית ששולחת הודעות לכן צריכה להשתנות והקוד שלה יראה כך:

#!/usr/bin/env ruby
require 'bunny'

connection = Bunny.new
connection.start

channel = connection.create_channel
x = channel.fanout('loggers', durable: true)

loop do
  x.publish("#{rand} - Hello World", routing_key: :logmessage)
  sleep 1
end

connection.close

שימו לב שבלוק יצירת החיבורים הפעם קצר יותר ומורכב רק משורת החיבור ל Exchange המתאים. את הקשר בין התור ל Exchange כבר יצרנו בשלב יצירת שרת ה RabbitMQ. מפתח הניתוב logmessage נלקח ישירות מההגדרות שיצרנו בזמן בניית האימג'.

גם בצד המקבל התוכנית יכולה לוותר על יצירת החיבור בין התור ל Exchange והקוד המקבל נראה כך:

#!/usr/bin/env ruby
require 'bunny'

connection = Bunny.new
connection.start

channel = connection.create_channel
queue = channel.queue('messages', durable: true)

begin
  puts ' [*] Waiting for messages. To exit press CTRL+C'
  queue.subscribe(block: true) do |_delivery_info, _properties, body|
    puts " [x] Received #{body}"
  end
rescue Interrupt =>
  connection.close
  exit(0)
end

4. מתי נבחר בכל שיטה

היכרות עם שתי הגישות להגדרת שרת RabbitMQ עוזרת לנו לבחור את שיטת העבודה שמתאימה לנו בכל מצב:

  1. כשיש לי מעט תוכניות שמשתמשות ב RabbitMQ, כשיש מעט חיבורים וכשאותם מתכנתים עובדים על הקוד של כל התוכניות (שכתובות באותה שפת תכנות) - יהיה לי יותר קל להשתמש בגישה הראשונה. אני אכתוב את קוד החיבור ל RabbitMQ בקובץ אחד ואטען אותו מכל תוכנית שצריכה להתחבר למערכת ההודעות. כשצריך לשנות משהו במבנה מספיק לשנות שם ולהפעיל מחדש את המערכת.

  2. כשמערכת ההודעות משמשת לסינכרון בין מספר מערכות שנכתבות על ידי מספר צוותים, וככל שהקשרים בין התורים יהיו מסובכים יותר, כך כדאי לי לבנות את כל ההגדרות של RabbitMQ בנפרד וכך להכריח את התוכניות לעבוד לפי הקונפיגורציה שעליה החלטנו. מתכנתים בצוותים השונים יכולים להיות רגועים שהגדרות התור לא ישתנו פתאום, כי שינוי הגדרות הוא קשה יותר ודורש בניה מחדש של האימג'.

נ.ב. אם אתם אוהבים את RabbitMQ ורוצים לשמוע עוד רעיונות לעבודה יעילה באמצעותו, מוזמנים להירשם לוובינר שאעביר בנושא בתחילת החודש הבא בקישור: https://www.tocode.co.il/workshops/115