פיתוח קוד אסינכרוני ב Python עם asyncio


1. שלום עולם אסיכנרוני

תכנות אסינכרוני הוא פרדיגמה שנועדה לבצע מספר משימות במקביל בלי שימוש ב Threads או Processes. השיטה עובדת על מנגנון של מערכת ההפעלה שנקרא בסביבת חלונות IO Completion Ports ובסביבת יוניקס Select. מנגנון זה בשתי הסביבות מאפשר לתוכנית להתחיל לבצע פעולת I/O ולקבל הודעה ממערכת ההפעלה כשהפעולה מסתיימת.

המבנה של תוכנית אסינכרונית יהיה מבוסס על אירועים ו Call Backs: אנחנו נתחיל לבצע פעולה ונבקש מהתוכנית להודיע לנו כשהפעולה מסתיימת. פייתון יעביר את הבקשה שלנו למערכת ההפעלה ובתורו יחכה להודעה ממנה. בשביל לבצע מספר פעולות במקביל פשוט נחזור על התהליך, כלומר נבקש לבצע מספר פעולות ולקבל עדכון כשאחת מהן (או כולן) יסתיימו. אחרי שסיימנו עם כל הבקשות פייתון ייכנס למצב המתנה בלולאה שנקראת Main Loop. הלולאה היא בעצם פקודת המתנה שמחכה לקבל עדכון ממערכת ההפעלה, כשקיבלה את העדכון מעבירה אותו לחלק הרלוונטי בתוכנית שלנו ולאחר מכן חוזרת להמתין לאירועים נוספים.

הסיבה העיקרית שמתכנתים מעדיפים מבנה אסינכרוני על פני עבודה עם Threads היא שבקוד אסינכרוני אין צורך לסנכרן בין תהליכים. פונקציות ה Callback, כלומר פונקציות הטיפול באירועים שיקראו אוטומטית כשפעולה אסיכנרונית מסתיימת, מתבצעות מתוך Thread יחיד ולכן לא יהיה מצב ששתי פונקציות כאלה פועלות באותו זמן, ניגשות לאותם אזורים בזיכרון או מייצרות התנגשות מכל סיבה אחרת. פונקציות ה Callback ייקראו תמיד אחת אחרי השניה, ולעולם בתוכנית אסינכרונית לא יהיה מצב של "הפסקה באמצע" פונקציה אחת כדי להפעיל פונקציה אחרת.

מצד שני הסיבה העיקרית שמתכנתים לא אוהבים תכנות אסינכרוני היא הצורך להעביר את המידע שלנו בין שני אזורים בתוכנית: האזור שמתחיל את הפעולה, ופונקציית ה Callback שמטפלת בתוצאת הפעולה. ב Python פקודה חדשה בשפה שנקראת await עוזרת לנו להתמודד עם בעיה זו וגורמת לקוד אסינכרוני להיראות כמו קוד מבוסס Threads.

לא כל פעולה יכולה לרוץ מתוך קוד אסינכרוני: בפייתון יש פעולות שנקראות פעולות "חוסמות", ואלה בעצם כל הפקודות הרגילות של פייתון. כשאנחנו קוראים שורה מקובץ למשל אנחנו יודעים שהשורה הבאה בסקריפט תקבל כבר את השורה שקראנו. פקודת הקריאה תסתיים רק כשהשורה נמצאת אצלנו ביד. בעבודה אסינכרונית אנחנו נצטרך להשתמש בפקודות חדשות לגמרי מהפקודות שהכרנו, פקודות שלא יחכו לסיום הקריאה כדי להחזיר תוצאה אלא יחזירו בצורה מיידית איזושהי הבטחה לחישוב עתידי, ורק בהמשך התוכנית במועד מאוחר יותר אותה הבטחה תתממש ותתן לנו ביד את התוצאה.

הפעולות המרכזיות שנריץ בצורה אסינכרונית יהיו פעולות I/O כלומר פעולות על קבצים ופעולות על הרשת.

בואו נתחיל עם דוגמא פשוטה כדי לראות איך נראה קוד אסינכרוני. התוכנית הבאה מייצרת שרת TCP שמחכה ללקוחות וכל פעם שמגיע לקוח השרת יקרא שורת טקסט מהלקוח וישלח אותה חזרה לאותו לקוח - מה שנקרא Echo Server:

import asyncio
import logging

async def client_connected_handler(reader, writer):
    line = await reader.readline()
    writer.write(line)
    await writer.drain()
    writer.close()

async def tcp_echo_server():
    print("Starting a server")
    server = await asyncio.start_server(client_connected_handler, host='0.0.0.0', port='8080')
    await server.serve_forever()

logging.basicConfig(level=logging.DEBUG)
asyncio.run(tcp_echo_server(), debug=True)

מבחינת התיאוריה הקוד משתמש במספר מבנים חדשים:

  1. כל פונקציה שמוגדרת עם המילה async בהתחלה נקראת Coroutine. קורוטינה היא פונקציה שמחזירה הבטחה לעתיד, כך שאם מפעילים אותה באופן רגיל היא תחזור מיד ולא תעשה כלום, אבל אם נפעיל אותה עם await לפניה היא תעשה משהו אחר וקסום.

  2. הדבר הקסום שקורה כש coroutine מופעלת עם await הוא פיצול של הקוד: כל הקוד שאחרי המילה await כאילו עובר לפונקציה אחרת או לאזור אחר בזיכרון. אפשר לחשוב עליו כאילו הוא נשמר בקופסא. פייתון מחכה שהקורוטינה תסתיים, ורק אחרי שאותה קורוטינה הסתיימה פייתון ימשיך לבצע את הקוד ששמנו בצד.

  3. אם קורה מצב בו מספר קורוטינות ממתינות עם await במקביל, פייתון יחזור לבצע את הקוד של הקורוטינה הראשונה שמוכנה. בזמן ביצוע קוד ההמשך קורוטינות אחרות יצטרכו להמשיך לחכות, ורק אחרי שפייתון יסיים הוא ימשיך לקוד ההמשך של הקורוטינה הבאה שמוכנה.

לכן ה Flow של הקוד יעבוד בערך כך:

  1. הדבר הראשון שיקרה הוא הפעלה של הקורוטינה tcp_echo_server. אנחנו מפעילים אותה בתור תוכנית ראשית עם פקודת asyncio הבאה:
asyncio.run(tcp_echo_server(), debug=True)

בתוכנית asyncio תהיה לנו פקודת asyncio.run אחת בתחילת התוכנית כדי להתניע את כל המנגנון האסינכרוני.

  1. לאחר מכן מתוך הפונקציה tcp_echo_server תיקרא שורת ההדפסה ואחריה פקודת await על קורוטינה נוספת:
    server = await asyncio.start_server(client_connected_handler, host='0.0.0.0', port='8080')

כאן פייתון יעצור להמתין אבל מאחר ואין אף קורוטינה אחרת שמתבצעת במקביל נקודת ההמשך הבאה היחידה היא השורה הבאה בקוד. לכן אחרי ש start_server תסיים את העבודה ותתחיל את השרת נוכל להמשיך לשורה שאחריה.

  1. השורה הבאה מפעילה קורוטינה חדשה:
    await server.serve_forever()

זו כבר קורוטינה ספציפית של שרת שלעולם אינה חוזרת. היא פשוט ממתינה ללקוחות שיתחברו לשרת שלנו, וכאשר לקוח יגיע היא תפעיל את הקורוטינה שהעברנו בתור פרמטר callback ל start_server.

  1. הדבר הבא שיקרה הוא שלקוח יתחבר. אני לא יודע מתי זה יקרה אבל מבחינת השרת אנחנו פשוט מחכים עד שזה יקרה. ברגע שלקוח מתחבר אוטומטית מופעלת הקורוטינה client_connected_handler וכאן יתחיל הכיף האמיתי. כבר בשורה הראשונה שלה אנחנו מוצאים את הפקודה:
    line = await reader.readline()

עכשיו יש לנו שני דברים שיכולים לקרות: או שנסיים לקרוא שורה מהלקוח שכבר התחבר, או שלקוח חדש יתחבר לשרת שלנו. אלה בעצם שתי קורוטינות שרצות עכשיו במקביל. אני לא יודע איזה מהם תסתיים קודם וגם המחשב לא - זה כבר תלוי במה הולך לקרות בעולם, אבל בואו נניח שלקוח חדש התחבר עכשיו למערכת. מבחינת הקוד של client_connected_handler כל מה שקורה אחרי פקודת await עליה אנחנו ממתינים הוא עדיין סגור בקופסא בצד, וקריאה חדשה לפונקציה מתחילה.

  1. הלקוח השני הגיע בתורו לאותה שורה ראשונה:
    line = await reader.readline()

ועכשיו יש לנו כבר שלוש פעולות אסינכרוניות שמתבצעות ברגע זה במקביל: ההמתנה ללקוח חדש, ההמתנה לקלט מהלקוח הראשון וההמתנה לקלט מהלקוח השני. אני לא יודע איזה פעולה תסתיים קודם, אבל ברגע שאחת הפעולות תסתיים מיד פייתון ימשיך לבצע את הקוד שמופיע אחרי ה await שמתאים לה.

2. קוד אסינכרוני וסינכרון

היתרון הגדול בקוד אסינכרוני על פני קוד מבוסס Threads הוא שאין צורך לסנכרן בין הפעולות ואין כמעט סיכוי להתנגשויות. נמשיך עם השרת שלנו ובואו נדמיין שאנחנו רוצים לספר כמה לקוחות מחוברים בכל רגע נתון. כיוון טוב יכול להיות להוסיף משתנה גלובאלי והתוכנית תיראה כך:

import asyncio
import logging

active_clients = 0 

async def client_connected_handler(reader, writer):
    global active_clients
    active_clients += 1
    print(f"New client connected. Now there are {active_clients} clients")

    line = await reader.readline()
    writer.write(line)
    await writer.drain()
    writer.close()
    await writer.wait_closed()
    active_clients -= 1
    print(f"Client left. Now there are {active_clients} clients")

async def tcp_echo_server():
    print("Starting a server")
    server = await asyncio.start_server(client_connected_handler, host='0.0.0.0', port='8080')
    await server.serve_forever()

logging.basicConfig(level=logging.DEBUG)
asyncio.run(tcp_echo_server(), debug=True)

ופה כדאי לשאול - מה אם שני לקוחות מתחברים בדיוק באותו זמן? האם פעולות ה +1 של שני התהליכים לא יתנגשו? האם נקבל עדיין את התוצאה הנכונה?

התשובה כמובן חיובית: בעבודה אסינכרונית המקום היחיד בו פייתון יכול לעצור ריצה של קטע קוד אחד ולהתחיל טיפול בקטע קוד אחר הוא בתוך await יזום. לכן בעוד שבעבודה עם Threads היינו צריכים סינכרון על המשתנה הגלובלי כדי שפעולת ה +1 לא תיחתך באמצע, במעבר לקוד אסינכרוני זה כבר לא נחוץ. עד שנגיע לשורה 11 ונמתין לקלט המשתנה הגלובלי כבר יסיים לקבל את הערך החדש שלו ולכן המספר תמיד יישאר מעודכן ומדויק.

3. קריאה למספר פעולות await במקביל בעצמנו - Tasks

ראינו שקוד אסינכרוני ועבודה עם המבנים האסינכרוניים של פייתון מאפשרים לנו לבצע פעולות במקביל בצורה מהירה ובלי שנצטרך לדאוג לסינכרון. בתוכניות אסינכרוניות מורכבות יותר נרצה הרבה פעמים לקבל שליטה יותר מדויקת במשימות אליהן אנחנו ממתינים עם await ואולי להפעיל מספר await-ים בו זמנית בעצמנו.

ניקח כדוגמא את המודול aiofiles ונראה איך הוא עוזר לנו ליצור מספר קבצים במקביל. הקוד הבא משתמש במודול כדי ליצור שלושה קבצים ולהדפיס את שמות הקבצים שנוצרו:

import asyncio, aiofiles

async def create_file(name):
    async with aiofiles.open(name, mode='w') as f:
        for _ in range(1_000):
            await f.write('hello async\n')
        print(f"File {name} is ready")

async def main():
    await create_file('one.txt')
    await create_file('two.txt')
    await create_file('three.txt')

asyncio.run(main(), debug=True)

לא משנה כמה פעמים נפעיל את הקוד ברור שתמיד ההדפסות יופיעו באותו סדר. פקודות ה create_file מופיעות אחת אחרי השניה ולכן בזמן שאנחנו ב await לפקודה הראשונה האחרות עדיין לא התחילו.

ככל שהפעולות לוקחות יותר זמן כך גדל הסיכוי שנעדיף להפעיל אותן בצורה מקבילית - כלומר ליצור את כל שלושת הקבצים "יחד" ומי שיסיים קודם יסיים. בפייתון אנחנו יכולים להפעיל קורוטינה בלי להמתין שתסתיים עם הפקודה asyncio.create_task. הפקודה מקבלת קורוטינה ומחזירה אוביקט חדש מסוג Task. אנחנו יכולים להחליט לחכות על אוביקט זה עד שיסתיים או לבטל אותו עם הפקודה cancel.

נשנה את תוכנית הקבצים כך שתעבוד עם Tasks ותיצור את שלושת הקבצים במקביל:

import asyncio, aiofiles

async def create_file(name):
    async with aiofiles.open(name, mode='w') as f:
        for i in range(1_000):
            await f.write('hello async\n')
        print(f"File {name} is ready")

async def main():
    t1 = asyncio.create_task(create_file('one.txt'))
    t2 = asyncio.create_task(create_file('two.txt'))
    t3 = asyncio.create_task(create_file('three.txt'))

    await t1
    await t2
    await t3

asyncio.run(main(), debug=True)

בגירסא זו של התוכנית אפשר לראות שההדפסות על סיום יצירת הקבצים מופיעות במקביל וסדר ההדפסות יכול להשתנות. גם מבחינת זמן ריצה גירסא זו מהירה יותר שכן היא יכולה להתחיל ליצור את כל שלושת הקבצים באותו רגע במקום לחכות לסיום יצירת קובץ לפני שמתחילים ליצור את זה שאחריו.

4. איסוף תשובות ממספר בקשות אסיכנרוניות במקביל - gather

את אותו היגיון של הפעלת משימות נוכל לשכלל עם הפקודה gather: פקודה זו נועדה להפעיל במקביל מספר משימות שמחזירות ערך, לדוגמא קריאת מידע מ HTTP API. המודול aiohttp מאפשר לשלוח בקשות HTTP בצורה נוחה לממשקי רשת שונים. הפקודה get שלו תשלח בקשת get לנתיב לבחירתכם, ותחזיר אוביקט תוצאה שלו פונקציה בשם text שתחזיר את טקסט התוצאה.

התוכנית הבאה משתמשת ב aiohttp כדי לשלוף שמות של מספר דמויות ממלחמת הכוכבים באמצעות האתר swapi.co:

import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.json()

async def main():
    async with aiohttp.ClientSession() as session:
        for i in range(1, 5):
            resp = await fetch(session, f'http://swapi.co/api/people/{i}')
            print(resp['name'])

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

אנחנו יכולים לראות שזה עבד אבל ההדפסה כמו בדוגמא עם הקבצים עובדת בצורה סידרתית: קודם מביאים את המידע עבור דמות מספר 1, אחרי זה עבור דמות 2 וכך הלאה.

אנחנו כבר יודעים להשתמש ב create_task כדי להריץ את כל הבקשות במקביל, אבל כאן יש לנו בעיה חדשה - כל אחת מהמשימות צריכה להחזיר ערך. תשמחו לשמוע שלאוביקט Task ב Python יש מאפיין בשם result דרכו אפשר לקבל את התוצאה, ולכן אפשר יהיה להשתמש ב Task כדי לפתור את הבעיה, אבל אני רוצה להראות כאן מבנה יותר מתוחכם שנקרא asyncio.gather. במבנה זה אנחנו מעבירים מספר אוביקטים שאפשר להמתין עליהם (לדוגמא מספר קורוטינות) ומחזירה רשימה של תוצאות.

נעדכן את הקוד כך שישתמש ב Gather וידפיס בסוף את השמות:

import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.json()

async def main():
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(*[
            fetch(session, f'http://swapi.co/api/people/{i}') for i in range(1, 5)
            ])

        print([x['name'] for x in results])

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

הפקודה gather תחכה עד שכל בקשות הרשת יסתיימו והמערך results מכיל את כל התוצאות לפי הסדר בו הוצאנו אותן. בהשוואה של שתי התוכניות ברוב המקרים התוכנית השניה תסתיים מהר יותר בצורה משמעותית.

5. טיפול ב Exceptions בקוד אסינכרוני

אחד הדברים שחשוב לשים לב אליהם הוא זריקת Exceptions מתוך קוד אסינכרוני. לכל אחת מהדרכים שראינו להפעיל קוד אסינכרוני יש שיטה משלה לטפל ב Exceptions.

בהפעלת קורוטינה עם await, אם אותה קורוטינה תזרוק Exception הקוד שהפעיל את ה await יכול לתפוס ולטפל ב Exception. אנחנו בודקים ותופסים Exception בצורה רגילה עם try ו except:

async def main():
    try:
        await create_file('one.txt')
        await create_file('two.txt')
        await create_file('three.txt')
    except Exception as e:
        print("exception...")

בעבודה עם Task העסק קצת יותר מסובך. ה Task שומר אצלו את ה Exception שהתקבל. הפרמטר debug=True שהעברנו ל asyncio.run מוודא שכל Task שקיבל Exception גם עשה משהו עם ה Exception הזה באמצעות פניה לפונקציה exception של ה Task. המטרה היא לעזור לנו לכתוב קוד שלא משאיר Exceptions שלא טופלו. לכן בכל מצב של Task שזורק Exception נרצה להפעיל את הפונקציה exception של ה Task ולטפל ב Exception באמצעות try/except. הנה דוגמא המבוססת על קוד יצירת הקבצים שכתבנו:

import asyncio, aiofiles

async def create_file(name):
    async with aiofiles.open(name, mode='w') as f:
        for i in range(1_000):
            await f.write('hello async\n')
        raise Exception('Ouch!')

async def main():
    t1 = asyncio.create_task(create_file('one.txt'))
    t2 = asyncio.create_task(create_file('two.txt'))
    t3 = asyncio.create_task(create_file('three.txt'))

    try:
        await t1
    except Exception:
        print("----- exception t1")
        print(t1.exception())        

    try:
        await t2
    except Exception:
        print("----- exception t2")
        print(t2.exception())

    try:
        await t3
    except Exception:
        print("----- exception t3")
        print(t3.exception())

asyncio.run(main())

כמובן שאפשר לאחד את כל ה try/except שלנו לבלוק אחד רק שאז צריך לבדוק מי המשימה שהתרסקה. בעיקרון במצבים כאלה הכי קל להשתמש בלולאה:

import asyncio, aiofiles

async def create_file(name):
    async with aiofiles.open(name, mode='w') as f:
        for i in range(1_000):
            await f.write('hello async\n')
        raise Exception('Ouch!')

async def main():
    tasks = [asyncio.create_task(create_file(filename)) for filename in ['one.txt', 'two.txt', 'three.txt']]

    for task in tasks:
        try:
            await task
        except Exception:
            print("----- exception")
            print(task.exception())


asyncio.run(main())

6. לסיכום: טיפים ב Best Practices לעבודה אסינכרונית

בפיתוח קוד אסינכרוני חשוב לשים לב לנקודות הבאות:

  1. יש להשתמש רק במבנים אסינכרוניים בתוך התוכניות שלכם. אין שום טעם לעטוף בקוד אסינכרוני פונקציה שקוראת מקובץ בצורה "חוסמת" מאחר והקריאה תיתקע ולא תתבצע במקביל. זאת הסיבה שהיינו צריכים את aiohttp ואת aiofiles

  2. יש לוודא שלא הולכות לכם לאיבוד ה Exceptions. ראינו כאן את try/except ויש עוד לא מעט מנגנונים לטיפול ב Exceptions בקוד אסינכרוני. התחילו מלאפיין מה אתם רוצים לעשות כשמשהו זורק Exception והשתמשו במנגנון המתאים למימוש התנהגות זו.

  3. שימו לב מתי אתם משתמשים ב Task, מתי ב await על קורוטינה ומתי ב gather. לכל מבנה התנהגות משלו: כשאנחנו רוצים קשר בין משימה אחת לזו שבאה אחריה נשתמש ב await על כל אחת בנפרד, כשאנחנו רוצים שהמשימות ירוצו באותו זמן נשתמש ב Task או ב gather.

  4. שימו לב שהקוד שאתם מריצים בתוך קורוטינה חייב להסתיים מהר ולא לכלול לוגיקה חישובית מורכבת. בזמן הפעלת קורוטינה אחת אף קורוטינה אחרת לא יכולה להתקדם, ולכן אנחנו רוצים שקוד הטיפול שבא אחרי await יסתיים כמה שיותר מהר.