• בלוג
  • עמוד 13
  • פריימוורקים אג'נטיים ושמירת הודעות בבסיס הנתונים

פריימוורקים אג'נטיים ושמירת הודעות בבסיס הנתונים

31/07/2025

אחד הפערים בין ספריות לכתיבת מערכות משולבות AI הוא האופן בו מערכות אלה שומרות הודעות בבסיס הנתונים. בואו נראה שלוש תבניות מרכזיות ומהן נלמד על מה חשוב להסתכל.

נתחיל עם OpenAI Agents SDK, ספריה יחסית חדשה של OpenAI. זאת הדוגמה מתוך התיעוד שלהם:

import asyncio

from agents import Agent, Runner, SQLiteSession

async def main():
    agent = Agent(name="Assistant", instructions="Reply very concisely.")

    # Create session instance
    session = SQLiteSession("conversation_123", "chats.db")

    # First turn
    result = await Runner.run(agent, "What city is the Golden Gate Bridge in?", session=session)
    print(result.final_output)
    # San Francisco

    # Second turn - agent automatically remembers previous context
    result = await Runner.run(agent, "What state is it in?", session=session)
    print(result.final_output)
    # California

if __name__ == '__main__':
    asyncio.run(main())

הספריה שומרת את כל ההודעות הישנות בבסיס נתונים SQLite או בזיכרון. בדף התיעוד הם מציעים לי לכתוב Custom Session Backend אם אני רוצה להשתמש בספריה בפרודקשן כדי לשמור את השיחות ב Postgresql או רדיס, אבל הם לא מספקים אחד. בכניסה לבסיס הנתונים שנוצר אני יכול לזהות שם טבלה של שיחות וטבלה של הודעות:

sqlite> .tables
agent_messages  agent_sessions
sqlite> select * from agent_messages ;
1|conversation_123|{"content": "What city is the Golden Gate Bridge in?", "role": "user"}|2025-07-30 13:35:36
2|conversation_123|{"id": "msg_688a1fa7fbe0819b9f4096e23741045f0d855ca99eba94a7", "content": [{"annotations": [], "text": "San Francisco.", "type": "output_text", "logprobs": []}], "role": "assistant", "status": "completed", "type": "message"}|2025-07-30 13:35:36
3|conversation_123|{"content": "What state is it in?", "role": "user"}|2025-07-30 13:35:37
4|conversation_123|{"id": "msg_688a1fa8fde0819b958bac6b45c84f900d855ca99eba94a7", "content": [{"annotations": [], "text": "California.", "type": "output_text", "logprobs": []}], "role": "assistant", "status": "completed", "type": "message"}|2025-07-30 13:35:37
sqlite> select * from agent_sessions ;
conversation_123|2025-07-30 13:35:36|2025-07-30 13:35:37

כל הודעה נשמרת בתור JSON בלי סכימה מסודרת. יש פה יתרון שהמימוש פשוט ואם יהיו לנו חלקים נוספים להודעה יהיה קל להוסיף הודעות חדשות עם אותם חלקים, אבל גם חיסרון שקשה להריץ שאילתות על JSON או לוודא נכונות של התוכן.

ספריה שניה היא RubyLLM בשפת רובי שמתחברת עם ריילס ומציעה את המבנה הבא:

rails g model Chat model_id:string user:references
rails g model Message chat:references role:string content:text model_id:string input_tokens:integer output_tokens:integer tool_call:references
rails g model ToolCall message:references tool_call_id:string:index name:string arguments:jsonb

השינוי הראשון שאנחנו רואים הוא הבחירה להשתמש בסכימה מסודרת, לכל הודעה יש תוכן, role, טוקנים ואיזה מודל יצר אותה, ובנוסף יש לנו טבלה של הפעלת כלים ששומרת את הפרמטרים ושמות כל הכלים שנקראו. כל ToolCall מחובר לשתי הודעות, ההודעה שהפעילה את הכלי וההודעה עם התוצאה. יש פה צעד קדימה מבחינת בטיחות המידע וחיבור לבסיסי נתונים אמיתיים. חלקים אחרים בהודעה כמו קבצים או תמונות יישמרו בטבלה נפרדת של קבצים מצורפים במנגנון Active Storage. משתמשים שרוצים יכולים להוסיף עוד עמודות לטבלת ההודעות כמו metadata.

ספריה שלישית לסקירה היא Vercel AI SDK והיא מציעה את הפיתרון המורכב מכולן. כך נראית סכימה מהדוגמה שלהם:

import {
  check,
  index,
  integer,
  jsonb,
  pgTable,
  real,
  text,
  timestamp,
  varchar,
} from "drizzle-orm/pg-core";
import { MyDataPart, MyUIMessage, MyProviderMetadata } from "../message-type";
import { generateId, ToolUIPart } from "ai";
import { sql } from "drizzle-orm";
import {
  getLocationInput,
  getLocationOutput,
  getWeatherInformationInput,
  getWeatherInformationOutput,
} from "@/ai/tools";

export const chats = pgTable("chats", {
  id: varchar()
    .primaryKey()
    .$defaultFn(() => generateId()),
});

export const messages = pgTable(
  "messages",
  {
    id: varchar()
      .primaryKey()
      .$defaultFn(() => generateId()),
    chatId: varchar()
      .references(() => chats.id, { onDelete: "cascade" })
      .notNull(),
    createdAt: timestamp().defaultNow().notNull(),
    role: varchar().$type<MyUIMessage["role"]>().notNull(),
  },
  (table) => [
    index("messages_chat_id_idx").on(table.chatId),
    index("messages_chat_id_created_at_idx").on(table.chatId, table.createdAt),
  ],
);

export const parts = pgTable(
  "parts",
  {
    id: varchar()
      .primaryKey()
      .$defaultFn(() => generateId()),
    messageId: varchar()
      .references(() => messages.id, { onDelete: "cascade" })
      .notNull(),
    type: varchar().$type<MyUIMessage["parts"][0]["type"]>().notNull(),
    createdAt: timestamp().defaultNow().notNull(),
    order: integer().notNull().default(0),

    // Text fields
    text_text: text(),

    // Reasoning fields
    reasoning_text: text(),

    // File fields
    file_mediaType: varchar(),
    file_filename: varchar(), // optional
    file_url: varchar(),

    // Source url fields
    source_url_sourceId: varchar(),
    source_url_url: varchar(),
    source_url_title: varchar(), // optional

    // Source document fields
    source_document_sourceId: varchar(),
    source_document_mediaType: varchar(),
    source_document_title: varchar(),
    source_document_filename: varchar(), // optional

    // shared tool call columns
    tool_toolCallId: varchar(),
    tool_state: varchar().$type<ToolUIPart["state"]>(),
    tool_errorText: varchar().$type<ToolUIPart["state"]>(),

    // tools inputs and outputss are stored in separate cols
    tool_getWeatherInformation_input:
      jsonb().$type<getWeatherInformationInput>(),
    tool_getWeatherInformation_output:
      jsonb().$type<getWeatherInformationOutput>(),

    tool_getLocation_input: jsonb().$type<getLocationInput>(),
    tool_getLocation_output: jsonb().$type<getLocationOutput>(),

    // Data parts
    data_weather_id: varchar().$defaultFn(() => generateId()),
    data_weather_location: varchar().$type<MyDataPart["weather"]["location"]>(),
    data_weather_weather: varchar().$type<MyDataPart["weather"]["weather"]>(),
    data_weather_temperature:
      real().$type<MyDataPart["weather"]["temperature"]>(),

    providerMetadata: jsonb().$type<MyProviderMetadata>(),
  },
  (t) => [
    // Indexes for performance optimisation
    index("parts_message_id_idx").on(t.messageId),
    index("parts_message_id_order_idx").on(t.messageId, t.order),

    // Check constraints
    check(
      "text_text_required_if_type_is_text",
      // This SQL expression enforces: if type = 'text' then text_text IS NOT NULL
      sql`CASE WHEN ${t.type} = 'text' THEN ${t.text_text} IS NOT NULL ELSE TRUE END`,
    ),
    check(
      "reasoning_text_required_if_type_is_reasoning",
      sql`CASE WHEN ${t.type} = 'reasoning' THEN ${t.reasoning_text} IS NOT NULL ELSE TRUE END`,
    ),
    check(
      "file_fields_required_if_type_is_file",
      sql`CASE WHEN ${t.type} = 'file' THEN ${t.file_mediaType} IS NOT NULL AND ${t.file_url} IS NOT NULL ELSE TRUE END`,
    ),
    check(
      "source_url_fields_required_if_type_is_source_url",
      sql`CASE WHEN ${t.type} = 'source_url' THEN ${t.source_url_sourceId} IS NOT NULL AND ${t.source_url_url} IS NOT NULL ELSE TRUE END`,
    ),
    check(
      "source_document_fields_required_if_type_is_source_document",
      sql`CASE WHEN ${t.type} = 'source_document' THEN ${t.source_document_sourceId} IS NOT NULL AND ${t.source_document_mediaType} IS NOT NULL AND ${t.source_document_title} IS NOT NULL ELSE TRUE END`,
    ),
    check(
      "tool_getWeatherInformation_fields_required",
      sql`CASE WHEN ${t.type} = 'tool-getWeatherInformation' THEN ${t.tool_toolCallId} IS NOT NULL AND ${t.tool_state} IS NOT NULL ELSE TRUE END`,
    ),
    check(
      "tool_getLocation_fields_required",
      sql`CASE WHEN ${t.type} = 'tool-getLocation' THEN ${t.tool_toolCallId} IS NOT NULL AND ${t.tool_state} IS NOT NULL ELSE TRUE END`,
    ),
    check(
      "data_weather_fields_required",
      sql`CASE WHEN ${t.type} = 'data-weather' THEN ${t.data_weather_location} IS NOT NULL AND ${t.data_weather_weather} IS NOT NULL AND ${t.data_weather_temperature} IS NOT NULL ELSE TRUE END`,
    ),
  ],
);

export type MyDBUIMessagePart = typeof parts.$inferInsert;
export type MyDBUIMessagePartSelect = typeof parts.$inferSelect;

התוכן נשמר בטבלה שנקראת parts כאשר לכל הודעה יש הרבה חלקים. טבלה זו מכילה עמודות לכל "סוג" חלק ויותר מזה לכל כלי פוטנציאלי יש שתי עמודות עבור הקלט והפלט של אותו כלי. כששאלתי את קלוד על הסכימה הוא כתב בכתב מודגש "This is over-engineered for most AI conversation needs".

מכירים גישות נוספות? ניסיתם חלק מהגישות פה ורוצים לשתף איך היה? מוזמנים לשתף בתגובות בטלגרם או ברשתות החברתיות.