• בלוג
  • צעדים ראשונים עם grpc

צעדים ראשונים עם grpc

ספריית gRPC היא ספריית קוד פתוח מבית גוגל שתפקידה לחבר בין סרביסים במערכת שלכם בצורה יעילה ומהירה. אפשר לחשוב עליה בתור תחליף מהיר יותר ל REST APIs כאשר הרווח המרכזי של gRPC מבחינת ביצועים מגיע מהשימוש ב HTTP/2 ובשליטה טובה יותר של הספריה בפרוטוקול. מבחינת חווית משתמש הספריה מאפשרת כתיבת תוכניות מבוזרות כאילו אנחנו רצים על אותה מכונה באמצעות שימוש נרחב ב Stub-ים.

הספריה מגיעה עם תמיכה רשמית ב 11 שפות תכנות ואפשר למצוא חיבורים לעוד עשרות בחיפוש פשוט ברשת. בפוסט זה אציג שתי דוגמאות בשפת Python לשימוש ב gRPC בהתחלה כדי לכתוב שרת Echo ולאחר מכן להפוך אותו ל Chat.

1. התקנה

בהנחה שיש לכם python3 מקונפג כמו שצריך אפשר להתקין הכל עם פקודת pip פשוטה:

$ pip install grpclib protobuf grpcio-tools

בכתיבת קוד gRPC אנחנו מתחילים בכתיבת קובץ פרוטוקול (עם סיומת proto) ואז משתמשים בשפה שבחרנו כדי ליצור קוד סטאב עבור הפרוטוקול לפרויקט שלנו. זה אומר שלכל קובץ פרוטוקול יהיו לנו שני קבצים מג'ונרטים אותם נטען מהקוד, וכל פעם שמשנים את הפרוטוקול צריך להפעיל מחדש את הפקודה שמייצרת את אותם שני קבצים.

קובץ פרוטוקול של gRPC מתאר את התקשורת בין השרת ללקוח - ובטרמינולוגיה הזאת הלקוח פונה לשרת ומבקש להריץ קוד מסוים, השרת מריץ את הקוד ושולח חזרה תשובה. פרוטוקול התקשורת מורכב מרשימת הפונקציות שהשרת יכול להפעיל ורשימת הפרמטרים של כל פונקציה.

2. תוכנית ראשונה: שרת Echo

הנה לדוגמה תוכנית ראשונה תהיה תוכנית Echo, כלומר תהיה לנו פונקציה אחת על השרת שבסך הכל תשלח ללקוח חזרה בדיוק את מה שהוא שלח אליי. קובץ הפרוטוקול של התוכנית הוא:

syntax = "proto3";

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
  string text = 1;
}

message HelloReply {
  string message = 1;
}

בתרגום לעברית:

  1. המילה service מגדירה אוסף פעולות ששרת יודע לעשות (אפשר לחשוב על זה כ Class בשרת).

  2. המילה message מגדירה הודעה שפעולה יודעת לקבל או להחזיר.

  3. בתוך message אפשר לכתוב מספר שדות שיהיו בהודעה. לכל שדה יש מספר וסוג.

בדוגמה שלנו המחלקה על השרת נקראת Greeter והיא חושפת פונקציה אחת בשם SayHello. פונקציה זו מקבלת הודעה מסוג HelloRequest ומחזירה הודעה מסוג HelloReply.

שתי ההודעות מוגדרות בהמשך הקובץ, לשתיהן שדה אחד מסוג טקסט אבל עם שם שונה.

מיטיבי הלכת בין הקוראים מוזמנים למצוא את ההוראות המלאות לכתיבת קבצי proto בתיעוד שלהם בקישור: https://developers.google.com/protocol-buffers/docs/proto3.

אחרי שכתבנו את קובץ ה proto אנחנו צריכים לייצר ממנו את קבצי הסטאב. בפייתון אני רושם משורת הפקודה:

$ python -m grpc_tools.protoc -I. --python_out=. --grpclib_python_out=. sayhello.proto

זה ייצור את הקבצים sayhello_grpc.py ו sayhello_pb2.py, שניהם לא נועדו לבני אדם. אנחנו פשוט נשלב אותם בתוכנית ה Python שלנו כדי לחבר בין השרת ללקוח.

בשביל לבנות קוד צד שרת gRPC אני צריך להגדיר מחלקה שתממש את הפונקציות שהגדרתי בפרוטוקול. אני משתמש בבסיס שנוצר עבורי בקובץ sayhello_grpc.py ומרחיב אותו. מחלקת צד שרת נראית כך:

class Greeter(GreeterBase):
    async def SayHello(self, stream):
        request = await stream.recv_message()
        message = f'You Said: "{request.text}"'
        await stream.send_message(HelloReply(message=message))

הפונקציה SayHello מתאימה לפונקציה שהגדרתי בקובץ הפרוטוקול. הפרמטר stream הוא שמאפשר לי לבצע פעולות הקשורות לפרוטוקול ובמקרה שלנו stream.recv_message() מקבלת את הבקשה מהלקוח ו stream.send_message שולחת את התשובה. זה מבנה מעניין כי בדרך כלל אנחנו חושבים על פונקציה בתור משהו ש"מחזיר" ערך, ופה בגלל שמדובר בפעולה מבוזרת החזרת הערך היא למעשה שליחת הודעה חזרה עם הערך החוזר, במקום פקודת return.

קוד צד השרת המלא עם הקוד שמפעיל את המחלקה נראה כך:

# File: server.py

import asyncio

from grpclib.utils import graceful_exit
from grpclib.server import Server

# generated by protoc
from sayhello_pb2 import HelloReply
from sayhello_grpc import GreeterBase


class Greeter(GreeterBase):
    async def SayHello(self, stream):
        request = await stream.recv_message()
        message = f'You Said: "{request.text}"'
        await stream.send_message(HelloReply(message=message))


async def main(*, host='127.0.0.1', port=50051):
    server = Server([Greeter()])
    # Note: graceful_exit isn't supported in Windows
    with graceful_exit([server]):
        await server.start(host, port)
        print(f'Serving on {host}:{port}')
        await server.wait_closed()


if __name__ == '__main__':
    asyncio.run(main())

בצד הלקוח אני מושך דבר שנקרא Stub מתוך אותו קובץ grpc שנוצר אוטומטית, ובעזרתו מפעיל פונקציה מרוחקת. הפעם הקוד נראה הרבה יותר טבעי לפייתון וכבר לא מרגישים את העבודה המבוזרת:

reply = await greeter.SayHello(HelloRequest(text=line.strip()))

וזה קוד צד הלקוח המלא:

# File: client.py

import asyncio

from grpclib.client import Channel

# generated by protoc
from sayhello_pb2 import HelloRequest, HelloReply
from sayhello_grpc import GreeterStub
import sys

async def main():
    async with Channel('127.0.0.1', 50051) as channel:
        greeter = GreeterStub(channel)

        for line in sys.stdin:
            reply = await greeter.SayHello(HelloRequest(text=line.strip()))
            print(reply.message)


if __name__ == '__main__':
    asyncio.run(main())

אני שמרתי את קוד צד השרת בקובץ server.py ואת קוד צד הלקוח בקובץ client.py ואז אפשר משני חלונות שונים להפעיל את השרת והלקוח, לכתוב משהו במסוף בצד הלקוח ולראות את ההדפסה שחוזרת מהשרת.

3. תוכנית שניה: שרת Chat

נמשיך למשהו קצת יותר מתוחכם והוא תוכנית צ'ט. בניגוד לשרת Echo שבסך הכל שולח לך בחזרה את מה שכתבת, תוכנית צ'ט צריכה להכיר את כל הלקוחות המחוברים וכל פעם שאחד מהם שולח הודעה היא צריכה להודיע לכל הלקוחות האחרים. יש פה שני אתגרים חדשים גם בצד השרת וגם בצד הלקוח:

  1. בצד הלקוח אני כבר לא יודע מתי אני אקבל את ההודעה הבאה. אני לא יכול לסמוך על זה שהודעה תגיע רק בתור תשובה להודעה שאני שולח, כי יש אנשים אחרים בחדר ששולחים ומקבלים הודעות כל הזמן.

  2. בצד השרת אני כבר לא יכול לשלוח הודעה חזרה רק למי שהרגע פנה אליי, וצריך לשמור את כל הלקוחות המחוברים בזיכרון כדי שכל פעם שאחד שולח הודעה אוכל להחזיר את ההודעה גם לכל האחרים.

נתחיל עם הפרוטוקול. בגלל שזה בסך הכל משחק עם gRPC מספיקות לי שתי פונקציות:

syntax = "proto3";

service ChatRoom {
  rpc Subscribe (Empty) returns (stream Message) {}
  rpc SendMessage (Message) returns (Empty) {}
}

message Message {
  string text = 1;
}

message Empty {}

לקוח יקרא לפונקציה Subscribe והתשובה שהוא יקבל תהיה stream של הודעות. ב gRPC, המילה stream אומרת שאני יכול לשלוח יותר מתשובה אחת. במקביל לקוחות אחרים או אותו לקוח יכולים לקרוא לפונקציה SendMessage כדי לשלוח הודעות. כשמישהו קורא ל SendMessage אז כל אלה שעכשיו תקועים על Subscribe מקבלים את ההודעה החדשה, בלי לסיים את התקשורת.

נמשיך עם קוד צד השרת. בצד השרת אני שומר במחלקה מבנה נתונים של מילון שמחזיק את כל הלקוחות המחוברים. המפתח הוא כתובת הלקוח והערך הוא תור הודעות שיש לי עבור אותו לקוח. כל פעם שלקוח חדש מתחבר אני יוצר עבורו שורה חדשה במילון ונכנס ללולאה שבה כל פעם שיש הודעה חדשה אני שולח אותה. זה התפקיד של הפונקציה Subscribe.

הפונקציה השניה, SendMessage פשוט מוסיפה את ההודעה לתור של כל הלקוחות המחוברים.

קוד צד השרת המלא נראה כך:

# File: chat_server.py

import asyncio

from grpclib.server import Server, Stream
from grpclib.utils import graceful_exit

from chat_grpc import ChatRoomBase
from chat_pb2 import Message, Empty

class ChatRoom(ChatRoomBase):
    def __init__(self):
        self.chatters = {}

    async def Subscribe(
            self,
            stream: Stream[Empty, Message]
            ) -> None:

        peer = stream.peer.addr()
        try:
            self.chatters[peer] = asyncio.Queue()
            request = await stream.recv_message()

            while True:
                message = await self.chatters[peer].get()
                await stream.send_message(Message(text=message))
                self.chatters[peer].task_done()
        finally:
            print(f"Bye bye {peer}")
            del(self.chatters[peer])

    async def SendMessage(
            self,
            stream: Stream[Message, Empty]
            ) -> None:
        request = await stream.recv_message()
        for q in self.chatters.values():
            q.put_nowait(request.text)
        await stream.send_message(Empty())


async def main(*, host: str = '127.0.0.1', port: int = 50051) -> None:
    server = Server([ChatRoom()])
    with graceful_exit([server]):
        await server.start(host, port)
        print(f'Serving on {host}:{port}')
        await server.wait_closed()


if __name__ == '__main__':
    asyncio.run(main())

בצד הלקוח האתגר הוא לבצע שתי משימות במקביל: אני גם צריך להתחבר לשרת ולהדפיס כל הודעה שחוזרת מ Subscribe, וגם צריך להקשיב לשורות שמשתמש מקליד ב stdin וכל פעם שמשתמש מסיים לכתוב שורה לשלוח אותה לשרת. הקסם שעושה את שני הדברים במקביל הוא הפונקציה wait:

async def main() -> None:
    await asyncio.wait([
        asyncio.create_task(chat()),
        asyncio.create_task(read_messages_and_send()),
        ])

זו פונקציה שמקבלת מספר פונקציות אסינכרוניות ומריצה את כולן במקביל. אצלי הפונקציה chat היא זו שמדפיסה את ההודעות מהשרת והפונקציה read_messages_and_send קוראת הודעות מהמקלדת ושולחת אותן לשרת. קוד התוכנית המלא בקובץ client.py נראה כך:

# File: chat_client.py

import asyncio
import sys

from grpclib.client import Channel

from chat_grpc import ChatRoomStub
from chat_pb2 import Message, Empty

async def connect_stdin_stdout():
    loop = asyncio.get_event_loop()
    reader = asyncio.StreamReader()
    protocol = asyncio.StreamReaderProtocol(reader)
    await loop.connect_read_pipe(lambda: protocol, sys.stdin)
    w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
    writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
    return reader, writer


async def chat() -> None:
    async with Channel('127.0.0.1', '50051') as channel:
        stub = ChatRoomStub(channel)

        async with stub.Subscribe.open() as stream:
            await stream.send_message(Empty(), end=True)
            async for reply in stream:
                print(reply)

async def read_messages_and_send() -> None:
    reader, writer = await connect_stdin_stdout()
    async with Channel('127.0.0.1', '50051') as channel:
        stub = ChatRoomStub(channel)
        while True:
            res = await reader.read(100)
            if not res:
                break
            await stub.SendMessage(Message(text=res))


async def main() -> None:
    await asyncio.wait([
        asyncio.create_task(chat()),
        asyncio.create_task(read_messages_and_send()),
        ])



if __name__ == '__main__':
    asyncio.run(main())

מוזמנים לנסות להריץ אצלכם בחלון אחד את השרת ובמספר חלונות אחרים את תוכנית הלקוח ולראות איך החלונות מדברים אחד עם השני.

4. מה אהבתי

סך הכל gRPC היא ספריה נחמדה לחיבור בין מספר סרביסים. העובדה שיש לה תמיכה באינסוף שפות תכנות אומרת שיהיה לנו קל לחבר קוד שכתוב בשפות שונות, וקובץ ה proto נותן לי דרך סטנדרטית להגדיר ממשק עבור הסרביס שלי.

5. מה לא אהבתי

למרות שאנחנו ב 2021, הספריה מרגישה קצת כמו חזרה בזמן. בעיקר היתה חסרה לי תמיכה מובנית בניהול הרשאות (כן ראיתי שיש משהו מובנה של גוגל ואפשרות להוסיף פלאגינים. זה לא נראה נוח), וכמו ב REST, גם כאן הממשק מאוד קשיח. כל שינוי קטן במידע מחייב שינוי של הפרוטוקול ותיאום בין מספר סרביסים.

הבחירה ב gRPC היא בחירה בביצועים על חשבון גמישות. כשאתם בסיטואציה שהביצועים קריטיים וצריכים להעביר הרבה מידע ומהר יכול להיות ש gRPC יהיה פיתרון טוב. ברוב הסיטואציות GraphQL ייתן תוצאה יותר טובה בגלל נוחות העבודה והגמישות של הפרוטוקול.