• בלוג
  • על השגיאה got Future attached to a different loop ב Python

על השגיאה got Future attached to a different loop ב Python

31/12/2019

פוסט זה כולל טיפ קצר לעבודה יעילה עם Python. אם אתם רוצים ללמוד פייתון יותר לעומק אני ממליץ על קורס Python כאן באתר.
הקורס כולל עשרות שיעורי וידאו והמון תרגול מעשי וילמד אתכם Python בצורה מקצועית מההתחלה ועד הנושאים המתקדמים.

הרבה פעמים בתכנות יש שגיאות שנראות ממבט ראשון לא קשורות לחיים, אבל אחרי שאנחנו מבינים מה הקוד שלנו באמת עושה אנחנו מגלים שהשגיאה היא תיאור מדויק של מה שקרה לא בסדר, ושל הדרך לתקן אותו.

כך השגיאה שבכותרת ששמחתי לפגוש היום בקוד פייתון שכתבתי. התחלתי עם גירסא פשוטה לפונקציה download הבאה שלוקחת נתיב ושומרת אותו לקובץ מקומי על המחשב באמצעות Python ו asyncio:

import aiohttp, asyncio, aiofiles

async def download(url, to):
    print(f"Download {url} to {to}")
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            if resp.status == 200:
                f = await aiofiles.open(to, mode='wb')
                await f.write(await resp.read())
                await f.close()

הפונקציה עובדת ואפילו אפשר להשתמש בה כדי להוריד כמה URL-ים במקביל, למשל הקוד הבא מוריד מספר פוסטים מהבלוג:

async def main():
    await asyncio.wait([
        asyncio.create_task(download('https://www.tocode.co.il/blog/2019-12-git-rm-vs-reset', 'post1.html')),
        asyncio.create_task(download('https://www.tocode.co.il/blog/2019-12-keep-on-learning', 'post2.html')),
        asyncio.create_task(download('https://www.tocode.co.il/blog/2019-12-python-memory-management', 'post3.html')),
        ])

asyncio.run(main())

אבל אתם יודעים איך זה כשלוקחים קוד דוגמא ומעבירים אותו לעולם האמיתי אנחנו צריכים להתחיל להלביש עליו גם בגדים של העולם האמיתי. במקרה של asyncio ותקשורת הבגדים האלה נקראים Throttling, שזה הרעיון שאנחנו לא רוצים להוציא יותר מדי בקשות רשת במקביל. גם בגלל שזה לא בריא למחשב שלנו, אבל בעיקר בגלל שזה לא בריא לשרת וגם לא מנומס. פה בטוקוד יש מנגנון אוטומטי שחוסם את כתובת ה IP שלכם לכמה שעות אם אתם שולחים יותר מדי בקשות במקביל (הכנסתי אותו אחרי שכמה טורקים הפילו לי את האתר פעם אחר פעם עם כלים אוטומטיים שחיפשו פירצות אבטחה).

בכל מקרה ובחזרה לפייתון חשבתי להשתמש ב asyncio.Semaphore כדי לשלוט בכמות הבקשות שנשלחות במקביל: כל בקשה לוקחת אסימון מהסמפור, ואם אין אסימון פנוי מחכים שבקשה אחרת תסיים ותחזיר אחד. הניסיון הראשון שלי לכתוב את זה נראה כך:

# DO NOT USE - CODE WITH BUG
import aiohttp, asyncio, aiofiles

throttle = asyncio.Semaphore(2)

async def download(url, to):
    async with throttle:
        print(f"Download {url} to {to}")
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                if resp.status == 200:
                    f = await aiofiles.open(to, mode='wb')
                    await f.write(await resp.read())
                    await f.close()

async def main():
    await asyncio.wait([
        asyncio.create_task(download('https://www.tocode.co.il/blog/2019-12-git-rm-vs-reset', 'post1.html')),
        asyncio.create_task(download('https://www.tocode.co.il/blog/2019-12-keep-on-learning', 'post2.html')),
        asyncio.create_task(download('https://www.tocode.co.il/blog/2019-12-python-memory-management', 'post3.html')),
        ])

asyncio.run(main())

והשגיאה לא איחרה לבוא:

Download https://www.tocode.co.il/blog/2019-12-git-rm-vs-reset to post1.html
Download https://www.tocode.co.il/blog/2019-12-keep-on-learning to post2.html
Task exception was never retrieved
future: <Task finished name='Task-4' coro=<download() done, defined at post.py:5> exception=RuntimeError("Task <Task pending name='Task-4' coro=<download() running at post.py:6> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py:507]> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "post.py", line 6, in download
    async with throttle:
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/locks.py", line 97, in __aenter__
    await self.acquire()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/locks.py", line 496, in acquire
    await fut
RuntimeError: Task <Task pending name='Task-4' coro=<download() running at post.py:6> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py:507]> got Future <Future pending> attached to a different loop

מה קרה כאן? ההודעה אומרת שיש לנו Future Object שמחובר ל Event Loop אחר. פה המקום להזכיר ש asyncio מאפשר לנו ליצור מספר Event Loops במקביל. כל פעם שאנחנו יוצרים Task או Future Object אותו אוביקט מחובר לאיזושהי Event Loop. הקוד הראשי:

asyncio.run(main())

יוצר את ה Event Loop הראשון במערכת ומריץ בתוכה את הפונקציה main.

בחזרה לשגיאה - אנחנו יכולים לראות שהיא הופיעה רק בקובץ השלישי שאנחנו מנסים להוריד, לכן בוודאות היא הגיעה מה Semaphore, וספציפית מהניסיון להמתין על ה Semaphore (כי שתי הבקשות הראשונות עברו חלק). מזה קל להסיק שהדבר שאנחנו מנסים להמתין עליו אבל לא מחובר ל Event Loop הנוכחי הוא בדיוק ה Semaphore.

ואיך Semaphore (או כל אוביקט אחר) יודע לאיזה Event Loop הוא מחובר? ל asyncio יש פונקציה גלובלית בשם get_event_loop() שמחזירה בדיוק את זה. הבנאי של Semaphore קורא לפונקציה זו ושומר את ה Event Loop. אני מעדכן קצת את הקוד כדי לראות מה ה Event Loop שאותו בנאי יקבל ולהשוות אותה ל Event Loop הראשית של התוכנית:

import aiohttp, asyncio, aiofiles

print("Event Loop when creating the semaphore: ", id(asyncio.get_event_loop()))
throttle = asyncio.Semaphore(2)

async def main():
    print("Event Loop in main()", id(asyncio.get_event_loop()))

asyncio.run(main())

והנה התוצאה:

Event Loop when creating the semaphore:  4516635360
Event Loop in main() 4533746416

עכשיו הסיפור ברור: יצרתי את ה Semaphore מוקדם מדי ולכן הוא מחובר ל Event Loop שונה מזו של ה main. בשביל לתקן את הקוד צריך רק להזיז את יצירת ה Semaphore פנימה לתוך ה main:

import aiohttp, asyncio, aiofiles

throttle = None

async def download(url, to):
    async with throttle:
        print(f"Download {url} to {to}")
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                if resp.status == 200:
                    f = await aiofiles.open(to, mode='wb')
                    await f.write(await resp.read())
                    await f.close()

async def main():
    global throttle
    throttle = asyncio.Semaphore(2)
    await asyncio.wait([
        asyncio.create_task(download('https://www.tocode.co.il/blog/2019-12-git-rm-vs-reset', 'post1.html')),
        asyncio.create_task(download('https://www.tocode.co.il/blog/2019-12-keep-on-learning', 'post2.html')),
        asyncio.create_task(download('https://www.tocode.co.il/blog/2019-12-python-memory-management', 'post3.html')),
        ])

asyncio.run(main())

נ.ב. הרבה פעמים נרצה מנגנונים יותר מתוחכמים כדי להאט את התקשורת, למשל נרצה להגדיר שלא נוציא יותר מ-5 בקשות במקביל אבל גם בשום מקרה לא נוציא יותר מ 10 בקשות בשניה (גם אם השרת עונה ממש מהר לשאילתות שלנו). ספריה נחמדה שעוזרת ליצור Throttling מתוחכם יותר ל asyncio נקראת asyncio-throttle ושווה לבדוק אותה לפני שמתחילים לבנות מנגנון לבד בעולם האמיתי.