יום 19 - הרצת בקשות במקביל
בזכות השימוש ב asyncio קל להשתמש ב Agents SDK כדי להריץ מספר סוכנים במקביל. בדוגמה של היום נראה איך זה עובד עבור תרגום מאנגלית לספרדית.
1. מה אנחנו בונים
יש מספר מצבים בהם נרצה להריץ סוכנים במקביל:
כשאנחנו בונים Workflow חלק מהשלבים בו אולי לא תלויים אחד בשני. הרצה במקביל של המסלולים שאינם תלויים אחד בשני יכולה להאיץ את התהליך כולו.
כשאנחנו בונים סוכן אינטרקטיבי אנחנו יכולים להפעיל במקביל לסוכן "מעקה בטיחות" - סוכן נוסף שסורק את הקלט במקביל לסוכן הראשי רק בשביל לזהות בקשות זדוניות או לא הגיוניות. בשביל לא לפגוע בביצועים נריץ את מעקה הבטיחות במקביל לסוכן הרגיל אבל אם מעקה הבטיחות מגלה שיש בעיה אוטומטית נעצור גם את הסוכן הרגיל. למעשה אבסטרקציה של "מעקה בטיחות" כבר ממומשת בתוך OpenAI Agents בדיוק בצורה הזאת.
בפועל הרצה במקביל של מספר בקשות משתמשת בדיוק במנגנוני הרצה במקביל הרגילים של asyncio. בדוגמה שלנו נבנה סוכן תרגום שמתרגם טקסט מאנגלית לספרדית ונפעיל אותו על הטקסט 3 פעמים במקביל. את שלושת התרגומים נעביר לסוכן LLM As a judge שיגיד מי התרגום הטוב ביותר ואותו נציג למשתמש.
2. קוד התוכנית
זה הקוד המלא מבוסס על הדוגמה של Openai עם התאמות שלי:
import asyncio
from agents import Agent, Runner, trace
"""
This example shows the parallelization pattern. We run the agent three times in parallel, and pick
the best result.
"""
spanish_agent = Agent(
name="spanish_agent",
instructions="You translate the user's message to Spanish",
)
translation_picker = Agent(
name="translation_picker",
instructions="You pick the best Spanish translation from the given options.",
)
async def main():
msg = """
I was a ghost, I was alone
Given the throne, I didn't know how to believe
I was the queen that I'm meant to be
I lived two lives, tried to play both sides
But I couldn't find my own place
Called a problem child 'cause I got too wild
But now that's how I'm getting paid on stage
"""
# Ensure the entire workflow is a single trace
with trace("Parallel translation"):
results = await asyncio.gather(
Runner.run(
spanish_agent,
msg,
),
Runner.run(
spanish_agent,
msg,
),
Runner.run(
spanish_agent,
msg,
),
)
outputs = [
res.final_output
for res in results
]
translations = "\n\n".join(outputs)
print(f"\n\nTranslations:\n\n{translations}")
best_translation = await Runner.run(
translation_picker,
f"Input: {msg}\n\nTranslations:\n{translations}",
)
print("\n\n-----")
print(f"Best translation: {best_translation.final_output}")
if __name__ == "__main__":
asyncio.run(main())
נשים לב:
בשביל להריץ דברים במקביל אני משתמש בסך הכל ב asyncio.gather - פונקציית ההרצה המקבילית הרגילה של asyncio.
אין צורך להשתמש כאן ב session כי כל סוכן מטפל רק בבקשה אחת. אין התיחסות לשיחה ולהודעות היסטוריות.
סך הכל זה כל הקוד שהייתי צריך בשביל להריץ שלוש בקשות במקביל ולקבל שלוש תוצאות:
results = await asyncio.gather(
Runner.run(
spanish_agent,
msg,
),
Runner.run(
spanish_agent,
msg,
),
Runner.run(
spanish_agent,
msg,
),
)
3. עכשיו אתם
כתבו תהליך יצירת פוסט לבלוג שמורכב ממספר משימות, את חלקן נבצע במקביל ואת חלקן בטור:
התהליך מבצע חיפוש ברשת על נושא שניתן לו.
התהליך מריץ במקביל 5 סוכנים שממציאים רעיונות לפוסטים, ואז אוסף את כל הרעיונות ובוחר את הטוב ביותר.
ממשיכים לכתיבת הפוסט - מבקשים מ AI אחד לייצר פוסט מלא ונעזרים ב LLM As a judge כדי לשפר אותו.
לאחר מכן ממקבלים את 3 ויוצרים 3 זוגות של סוכני כתיבה ומשוב שרצים במקביל.
אחרי שקיבלנו 3 עותקים טובים לפוסט נותנים את שלושתם לסוכן אחרון שבוחר את הטוב ביותר או משלב את שלושתם כדי ליצור גירסה חדשה ומשופרת.