משחקים עם מקביליות בסקאלה (חלק 2)

בחלק הקודם של הפוסט כתבתי על pmap ואיך אפשר להשתמש בו כדי לחלק פעולה חישובית למספר תהליכונים כדי לשפר ביצועים. היום אני רוצה לדבר על עבודת IO, על המגבלה של Thread Pool במיקבול משימות הקשורות ל IO ועל הפיתרון עם Virtual Threads.

1. מה בעצם הבעיה

נתחיל עם הקוד הבא בסקאלה שמשתמש ב Thread Pool כדי להוריד במקביל מידע מ 50 עמודים באתר מסוים (כן הנתונים באתר מזויפים אבל זה לא משנה):

object futures {
  given ExecutionContext = ExecutionContext.global

  private def printUrlContent(url: String): Unit =
    Using(Source.fromURL(url)) { reader =>
      reader.getLines().mkString("\n")
    }
    println(s"Done: ${url}")


  @main
  def test(): Unit =
    val s1 = System.nanoTime()
    1.to(50)
      .map {i => s"https://dummyjson.com/products/${i}" }
      .map { url => Future { printUrlContent(url)} }
      .foreach { f => Await.result(f, Duration.Inf) }

    val s2 = System.nanoTime()
    println(s"Took ${s2 - s1} ns")

}

אז בגדול אנחנו מחשבים 50 כתובות אינטרנט מאתר dummyjson.com, שולחים בקשת HTTP לכל כתובת ובודקים כמה זמן לקח למשוך את כל המידע.

הרצה של התוכנית מציגה תוצאה מעניינת - אנחנו רואים את הפלט נכתב למסך בקבוצות של URL-ים, כלומר מחכים קצת זמן, רואים הדפסה של מספר שורות ואז מחכים עוד קצת ועוד הדפסה. זה קורה בגלל המבנה של Thread Pool והשילוב עם עבודת IO. בסוג עבודה כזה מה שחוסם אותנו הוא הרשת ולא המחשב שלנו, ולכן כל התהליכונים עובדים על ניוטרל ובעצם מחכים למידע. הבעיה שה Thread Pool מוכן להריץ רק מספר מוגבל של תהליכונים ולכן צריך לחכות עד שתהליכון יסיים ויתפנה לפני שמתחילים להוריד את הנתיב הבא.

2. מעבר ל Virtual Threads

וכאן אנחנו מגלים את הפוטנציאל של Virtual Threads. בדרך כלל כל Thread תופס משאבים ולכן לא מומלץ ליצור יותר מדי מהם, בשלב מסוים הם יצרו עומס על מערכת ההפעלה ורק יאטו את התוכנית. תהליכון וירטואלי הוא מנגנון של ה JVM שלא יוצר Thread במערכת ההפעלה (זה נקרא fiber בהרבה מקומות אחרים) ולכן אפשר ליצור המון ממנו. נכון אם הפעולה שאנחנו מנסים לעשות היא משימה חישובית להוסיף עוד תהליכונים או תהליכונים וירטואליים לא ישפר את מצבנו, כי עדיין יהיה עומס על המעבד, אבל כשמדובר בפעולות IO ממילא המעבד לא עושה כלום והאיטיות מגיעה מהרשת ולא מהמחשב שלנו - ולכן תהליכונים וירטואליים יכולים לשנות את התמונה.

בסקאלה (וגם ב Java) יש קלאס בשם ExecutorService שאחראי על יצירת התהליכונים ולכן כל מה שצריך בשביל להתחיל לעבוד עם תהליכונים וירטואליים הוא לשנות את ה ExecutorService שלנו. עדכון התוכנית לעבודה עם תהליכונים וירטואליים הוא בסך הכל שינוי של השורה הראשונה ל:

given ExecutionContext = ExecutionContext.fromExecutorService(Executors.newVirtualThreadPerTaskExecutor())

והתוכנית המלאה:

object futures {
  given ExecutionContext = ExecutionContext.fromExecutorService(Executors.newVirtualThreadPerTaskExecutor())
//  given ExecutionContext = ExecutionContext.global

  private def printUrlContent(url: String): Unit =
    Using(Source.fromURL(url)) { reader =>
      reader.getLines().mkString("\n")
    }
    println(s"Done: ${url}")


  @main
  def test(): Unit =
    val s1 = System.nanoTime()
    1.to(50)
      .map {i => s"https://dummyjson.com/products/${i}" }
      .map { url => Future { printUrlContent(url)} }
      .foreach { f => Await.result(f, Duration.Inf) }

    val s2 = System.nanoTime()
    println(s"Took ${s2 - s1} ns")

}

גירסה זו של התוכנית רצה במחצית הזמן, ומה שיותר מעניין הוא שאין בה את החלוקה לקבוצות שאפיינה את העבודה עם Thread Pool. כל ההודעות נכתבות למסך יחד, בגלל שמראש יצרנו Thread וירטואלי לכל משימה.