• בלוג
  • קריאה וכתיבה ל Kafka מתוך Shell Script

קריאה וכתיבה ל Kafka מתוך Shell Script

קפקא היא פלטפורמה לעיבוד זרם נתונים שזה אומר שהיא מקבלת הודעות מכל מיני מקורות ומעבירה אותן הלאה לתוכנות אחרות שיודעות לעשות איתן משהו. אפשר לחשוב עליה כמו על תור הודעות על סטרואידים. בהמלצת חבר התחלתי לשחק איתה והפוסט הזה מסכם את השעות הראשונות שלי עם קפקא (אז תהיו בני אדם ותקנו אותי בתגובות אם תמצאו אי דיוקים).

1. איך מתקינים

בעידן של דוקר התקנות הן כבר לא ממש בעיה. כל מה שהייתי צריך בשביל להרים שרת קפקא זה ליצור את קובץ ה docker-compose.yml הבא בתיקיה ריקה:

version: "3"
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
      - '9093:9093'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper

ואז להפעיל מאותה תיקיה:

docker-compose up -d

הפלט הוא:

Creating network "kafka-docker_default" with the default driver
Creating kafka-docker_zookeeper_1 ... done
Creating kafka-docker_kafka_1     ... done

ואם זה מה שקרה גם אצלכם על המכונה אנחנו מוכנים להמשיך.

2. כתיבת הודעה ראשונה לנושא

בקפקא הם קוראים לשרת ברוקר ובקובץ הקונפיגורציה שבנינו הברוקר מקשיב על פורט 9093 להודעות שמגיעות מחוץ ל docker. התוכנה הפשוטה ביותר שמצאתי לעבודה עם קפקא משורת הפקודה נקראת kafkacat. אני התקנתי אותה בנפרד אבל אפשר גם למצוא אימג'ים שלה על דוקרהאב.

אחרי ההתקנה וכדי לבדוק שקפקא שלכם באוויר נוכל לכתוב:

kafkacat -L -b localhost:9093

וזה הפלט שמסמן לנו שאנחנו בכיוון טוב:

Metadata for all topics (from broker 1001: localhost:9093/1001):
 1 brokers:
  broker 1001 at localhost:9093 (controller)
 0 topics:

בקפקא אנחנו שולחים הודעה לתוך Topic (ויש גם דבר שנקרא Partition בתוך הטופיק אבל עליו נצטרך לדבר ביום אחר). לקוחות אחרים יכולים להאזין לטופיק ולקבל הודעות שנכתבו אליו. בואו ננסה את זה - הפקודה הבאה שולחת הודעת Hello World לטופיק שנקרא test:

echo Hello World | kafkacat -b localhost:9093 -t test

ושימו לב עכשיו מה קורה כשאני מדפיס את ה Metadata כמו שעשיתי קודם:

$ kafkacat -L -b localhost:9093
Metadata for all topics (from broker 1001: localhost:9093/1001):
 1 brokers:
  broker 1001 at localhost:9093 (controller)
 1 topics:
  topic "test" with 1 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001

עכשיו השרת כבר יודע שיש טופיק בשם test ויש בו partition יחיד.

3. קריאת ההודעה שכתבנו

בעזרת kafkacat אפשר גם לקרוא את ההודעה שכתבנו. קפקא שומר את כל ההודעות (בהגדרה הדיפולטית זה יהיה למשך שבוע) אז אין בעיה לשחק עם זה ולקרוא את ההודעה שוב ושוב. נקליד את הפקודה:

kafkacat -b localhost:9093 -t test -e

ונקבל את הפלט:

% Auto-selecting Consumer mode (use -P or -C to override)
Hello World
% Reached end of topic test [0] at offset 1: exiting

באופן אוטומטי קפקאקט קרא את כל ההודעות מנושא test ואז יצא. כל הפעלה חוזרת של הפקודה תדפיס בדיוק את אותו פלט בגלל שההודעה נשמרת על קפקא.

אגב- בהגדרות הדוקר שפירסמתי כאן למצב פיתוח כל פעם שאתם סוגרים את הדוקר-קומפוז ומפעילים מחדש כל ההודעות יימחקו. זה בכוונה. במצב פרודקשן צריך לדאוג לו למערכת קבצים מסודרת שהמידע עליה ישרוד כיבוי והדלקה של השרת.

4. כתיבה וקריאה מסונכרנות

הכיף בעבודה עם תור הודעות מגיע כשרוצים לסנכרן בין מספר תהליכים: יהיו לנו תהליכים ששולחים הודעות, תהליכים אחרים שמקבלים אותן ועושים איתן משהו וככה נוכל לבנות טופולוגיה של עיבוד נתונים מקבילי.

בקפקא תהליך שקורא הודעות נקרא Consumer וכזה שכותב הודעות נקרא Producer. אפשר לחבר יחד מספר Consumers ולתת לכולם מזהה שנקרא Group ID. במצב כזה קפקא ישלח את ההודעה רק לאחד מהצרכנים מהקבוצה, וגם יסמן לעצמו שהוא כבר נתן את ההודעה הזאת לקבוצה הזאת כך שפעם הבאה שצרכן מהקבוצה מתחבר הוא לא יקבל את אותה הודעה.

בואו נבדוק את ההתנהגות בעזרת הפעלת kafkacat במקביל מכמה חלונות.

בחלון הראשון יהיה לי Producer שיקרא שורות מ Standard Input וישלח כל שורה כהודעה ל Kafka. אפעיל אותו עם הפקודה:

kafkacat -P -b localhost:9093 -t chat

בשני חלונות נוספים יהיו לי Consumers שיקראו הודעות מקפקא, ישנו אותן לכתיב של האקרים וידפיסו למסך. בהם אפעיל את הפקודה:

kafkacat -q -C -b localhost:9093 -G c1 chat -u | tr aleto @1370

כמה מתגים ששווה לשים לב אליהם בפקודה החדשה:

  1. המתג -q גורם לקפקאקט לא להדפיס כל מיני שטויות שלו לפני ואחרי ההודעה
  2. המתג -G מוסיף מזהה קבוצה כדי שלא נקבל הודעות שכבר טיפלנו בהן (או שמישהו אחר מהקבוצה שלנו טיפל בהן)
  3. המתג -u גורם לקפקאקט לוותר על Buffering כדי שהודעות יישלחו מיד להמשיך ה Pipe לפקודה tr. בלעדיו לא היינו רואים פלט על המסך עד שהבאפר היה מתמלא.

כשתפעילו את הפקודה ב-3 חלונות תוכלו לכתוב טקסט בחלון הראשון ולראות אותו מודפס בכתיב 1337 באחד משני החלונות האחרים.

5. תוכניות לעתיד

האמת שהשעות הראשונות עם קפקא הפתיעו אותי לטובה. חששתי מטכנולוגיה מבוססת Java מסורבלת ומצאתי שרת שקל מאוד להפעיל אותו וקל מאוד לעבוד איתו ועד עכשיו כל מה שניסיתי לכתוב עבד כמעט בפעם הראשונה.

עדיין אני רק בתחילת הדרך וצריך עוד לחבר את זה לקוד אמיתי ולהבין איך מקנפגים אותו לעבוד נכון בפרודקשן, שני דברים עם הרבה פוטנציאל לבעיות. אמשיך לעדכן.