Skip to content

Memoria

core.memory.manager

HistoryStore

Bases: Protocol

Source code in src/core/memory/manager.py
 9
10
11
12
class HistoryStore(Protocol):
    def get(self, sid: str) -> List[ModelMessage]: ...
    def set(self, sid: str, messages: List[ModelMessage]) -> None: ...
    def clear(self, sid: str) -> None: ...

get

get(sid)
Source code in src/core/memory/manager.py
10
def get(self, sid: str) -> List[ModelMessage]: ...

set

set(sid, messages)
Source code in src/core/memory/manager.py
11
def set(self, sid: str, messages: List[ModelMessage]) -> None: ...

clear

clear(sid)
Source code in src/core/memory/manager.py
12
def clear(self, sid: str) -> None: ...

MemoryManager

Coordina acceso a una o varias stores de historial.

Source code in src/core/memory/manager.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class MemoryManager:
    """Coordina acceso a una o varias stores de historial."""

    def __init__(self, store: HistoryStore | list[HistoryStore] | None):
        if store is None:
            self.stores: List[HistoryStore] = []
        elif isinstance(store, list):
            self.stores = store
        else:
            self.stores = [store]

    def load(self, session_id: str) -> List[ModelMessage]:
        for store in self.stores:
            messages = store.get(session_id)
            if messages:
                return messages
        return []

    async def save_from_result(
        self, session_id: str, all_messages: List[ModelMessage], MAX_HISTORY: int = 15
    ) -> List[ModelMessage]:
        cleaned = strip_tool_traffic(all_messages)
        cropped = await keep_recent_messages(cleaned, MAX_HISTORY=MAX_HISTORY)
        for store in self.stores:
            store.set(session_id, cropped)
        return cropped

    def reset(self, session_id: str) -> None:
        for store in self.stores:
            store.clear(session_id)

stores instance-attribute

stores = []

load

load(session_id)
Source code in src/core/memory/manager.py
26
27
28
29
30
31
def load(self, session_id: str) -> List[ModelMessage]:
    for store in self.stores:
        messages = store.get(session_id)
        if messages:
            return messages
    return []

save_from_result async

save_from_result(session_id, all_messages, MAX_HISTORY=15)
Source code in src/core/memory/manager.py
33
34
35
36
37
38
39
40
async def save_from_result(
    self, session_id: str, all_messages: List[ModelMessage], MAX_HISTORY: int = 15
) -> List[ModelMessage]:
    cleaned = strip_tool_traffic(all_messages)
    cropped = await keep_recent_messages(cleaned, MAX_HISTORY=MAX_HISTORY)
    for store in self.stores:
        store.set(session_id, cropped)
    return cropped

reset

reset(session_id)
Source code in src/core/memory/manager.py
42
43
44
def reset(self, session_id: str) -> None:
    for store in self.stores:
        store.clear(session_id)

core.memory.in_memory

memory_store module-attribute

memory_store = InMemoryHistory()

InMemoryHistory

Source code in src/core/memory/in_memory.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class InMemoryHistory:
    def __init__(self):
        # clave: session_id -> lista de ModelMessage
        self._store: dict[str, list] = {}

    def get(self, sid: str):
        # devuelve SIEMPRE una lista (copia para no “pisarla” fuera)
        return list(self._store.get(sid, []))

    def add(self, sid: str, messages):
        if not messages:
            return
        # Asegura que hay una lista REAL dentro del diccionario
        bucket = self._store.setdefault(sid, [])
        if isinstance(messages, list):
            bucket.extend(messages)
        else:
            bucket.append(messages)

    # Muy útil para guardar el historial completo de golpe
    def set(self, sid: str, messages):
        if not messages:
            self._store[sid] = []
        elif isinstance(messages, list):
            self._store[sid] = list(messages)
        else:
            self._store[sid] = [messages]

    def clear(self, sid: str):
        self._store.pop(sid, None)

get

get(sid)
Source code in src/core/memory/in_memory.py
 9
10
11
def get(self, sid: str):
    # devuelve SIEMPRE una lista (copia para no “pisarla” fuera)
    return list(self._store.get(sid, []))

add

add(sid, messages)
Source code in src/core/memory/in_memory.py
13
14
15
16
17
18
19
20
21
def add(self, sid: str, messages):
    if not messages:
        return
    # Asegura que hay una lista REAL dentro del diccionario
    bucket = self._store.setdefault(sid, [])
    if isinstance(messages, list):
        bucket.extend(messages)
    else:
        bucket.append(messages)

set

set(sid, messages)
Source code in src/core/memory/in_memory.py
24
25
26
27
28
29
30
def set(self, sid: str, messages):
    if not messages:
        self._store[sid] = []
    elif isinstance(messages, list):
        self._store[sid] = list(messages)
    else:
        self._store[sid] = [messages]

clear

clear(sid)
Source code in src/core/memory/in_memory.py
32
33
def clear(self, sid: str):
    self._store.pop(sid, None)

core.memory.redis_store

Almacenamiento de ventana de conversación en Redis.

RedisWindowStore

Bases: HistoryStore

Persistencia sencilla de mensajes usando Redis (un JSON por item).

Source code in src/core/memory/redis_store.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class RedisWindowStore(HistoryStore):
    """Persistencia sencilla de mensajes usando Redis (un JSON por item)."""

    def __init__(self, redis_client: Redis):
        self.redis_client = redis_client

    def get(self, session_id: str) -> List[ModelMessage]:
        raw = cast("list[str | bytes]", self.redis_client.lrange(session_id, 0, -1))
        return loads_list(raw)

    def set(self, session_id: str, messages: List[ModelMessage]) -> None:
        self.redis_client.delete(session_id)
        payloads = dumps_list(messages)
        if payloads:
            self.redis_client.rpush(session_id, *payloads)

    def clear(self, session_id: str) -> None:
        self.redis_client.delete(session_id)

redis_client instance-attribute

redis_client = redis_client

get

get(session_id)
Source code in src/core/memory/redis_store.py
21
22
23
def get(self, session_id: str) -> List[ModelMessage]:
    raw = cast("list[str | bytes]", self.redis_client.lrange(session_id, 0, -1))
    return loads_list(raw)

set

set(session_id, messages)
Source code in src/core/memory/redis_store.py
25
26
27
28
29
def set(self, session_id: str, messages: List[ModelMessage]) -> None:
    self.redis_client.delete(session_id)
    payloads = dumps_list(messages)
    if payloads:
        self.redis_client.rpush(session_id, *payloads)

clear

clear(session_id)
Source code in src/core/memory/redis_store.py
31
32
def clear(self, session_id: str) -> None:
    self.redis_client.delete(session_id)

core.memory.redis_history

RedisHistory

Bases: HistoryStore

Historial por sesión en Redis usando lista (un JSON por item). Implementa HistoryStore (get/set/clear) y expone _load/_save como compat.

Source code in src/core/memory/redis_history.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class RedisHistory(HistoryStore):
    """
    Historial por sesión en Redis usando lista (un JSON por item).
    Implementa HistoryStore (get/set/clear) y expone _load/_save como compat.
    """

    def __init__(self, session_id: str, redis_client: redis.Redis):
        self.session_id = session_id
        self.redis_client = redis_client

    # --- Interfaz HistoryStore ---
    def get(self, sid: str) -> List[ModelMessage]:
        if sid != self.session_id:
            # historial atado a un session_id concreto
            return []
        raw = cast(
            "list[str | bytes]", self.redis_client.lrange(self.session_id, 0, -1)
        )
        return loads_list(raw)

    def set(self, sid: str, messages: List[ModelMessage]) -> None:
        if sid != self.session_id:
            return
        self.redis_client.delete(self.session_id)
        payloads = dumps_list(messages)
        if payloads:
            self.redis_client.rpush(self.session_id, *payloads)

    def clear(self, sid: str) -> None:
        if sid != self.session_id:
            return
        self.redis_client.delete(self.session_id)

    # --- Compatibilidad con el código/tests existentes ---
    def _load(self) -> List[ModelMessage]:
        return self.get(self.session_id)

    def _save(self, messages: List[ModelMessage]) -> None:
        self.set(self.session_id, messages)

session_id instance-attribute

session_id = session_id

redis_client instance-attribute

redis_client = redis_client

get

get(sid)
Source code in src/core/memory/redis_history.py
25
26
27
28
29
30
31
32
def get(self, sid: str) -> List[ModelMessage]:
    if sid != self.session_id:
        # historial atado a un session_id concreto
        return []
    raw = cast(
        "list[str | bytes]", self.redis_client.lrange(self.session_id, 0, -1)
    )
    return loads_list(raw)

set

set(sid, messages)
Source code in src/core/memory/redis_history.py
34
35
36
37
38
39
40
def set(self, sid: str, messages: List[ModelMessage]) -> None:
    if sid != self.session_id:
        return
    self.redis_client.delete(self.session_id)
    payloads = dumps_list(messages)
    if payloads:
        self.redis_client.rpush(self.session_id, *payloads)

clear

clear(sid)
Source code in src/core/memory/redis_history.py
42
43
44
45
def clear(self, sid: str) -> None:
    if sid != self.session_id:
        return
    self.redis_client.delete(self.session_id)

core.memory.codec

Este módulo proporciona funciones para serializar y deserializar mensajes utilizando JSON, facilitando la comunicación entre componentes.

dumps_list

dumps_list(messages)

Convierte una lista de ModelMessage en una lista de strings JSON, uno por mensaje (formato objeto JSON).

Source code in src/core/memory/codec.py
21
22
23
24
25
26
27
28
29
30
def dumps_list(messages: List[ModelMessage]) -> list[str]:
    """
    Convierte una lista de ModelMessage en una lista de strings JSON,
    uno por mensaje (formato objeto JSON).
    """
    if not messages:
        return []
    arr_bytes = ModelMessagesTypeAdapter.dump_json(messages)  # b'[ {...}, {...} ]'
    objs = json.loads(arr_bytes)
    return [json.dumps(o, ensure_ascii=False) for o in objs]

loads_list

loads_list(raw_items)

Convierte una lista de strings/bytes JSON (uno por mensaje) en ModelMessage[]. Tolerante a bytes y a algunos errores de parseo.

Source code in src/core/memory/codec.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def loads_list(raw_items: list[str | bytes]) -> List[ModelMessage]:
    """
    Convierte una lista de strings/bytes JSON (uno por mensaje) en ModelMessage[].
    Tolerante a bytes y a algunos errores de parseo.
    """
    out: List[ModelMessage] = []
    for raw in raw_items:
        s = (
            raw.decode("utf-8", errors="replace")
            if isinstance(raw, (bytes, bytearray))
            else str(raw)
        )
        try:
            obj = json.loads(s)
            msg = ModelMessagesTypeAdapter.validate_python([obj])[0]
        except Exception:
            # fallback si viene raro pero casi-json
            msg = ModelMessagesTypeAdapter.validate_json(f"[{s}]")[0]
        out.append(msg)
    return out

dump_one_message_to_json

dump_one_message_to_json(msg)

Serializa un único mensaje a JSON objeto (string) usando el adapter de lista.

Source code in src/core/memory/codec.py
55
56
57
def dump_one_message_to_json(msg: ModelMessage) -> str:
    """Serializa un único mensaje a JSON objeto (string) usando el adapter de lista."""
    return dumps_list([msg])[0]

attach_model_dump_json_shim

attach_model_dump_json_shim()

Añade .model_dump_json() a ModelRequest/ModelResponse (y Tool* si existen) sin romper si ya existe. Permite comparaciones en tests.

Source code in src/core/memory/codec.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def attach_model_dump_json_shim() -> None:
    """
    Añade .model_dump_json() a ModelRequest/ModelResponse (y Tool* si existen)
    sin romper si ya existe. Permite comparaciones en tests.
    """

    def _attach(cls):
        if not hasattr(cls, "model_dump_json"):

            def _model_dump_json(self):
                return dump_one_message_to_json(self)

            setattr(cls, "model_dump_json", _model_dump_json)

    _TOOL_CLASSES = ()
    for _cls in (ModelRequest, ModelResponse) + _TOOL_CLASSES:
        _attach(_cls)

core.memory.processors

Processor for message cleaning

keep_recent_messages async

keep_recent_messages(messages, MAX_HISTORY=15)
Source code in src/core/memory/processors.py
15
16
async def keep_recent_messages(messages: List[ModelMessage], MAX_HISTORY: int = 15) -> List[ModelMessage]:
    return messages[-MAX_HISTORY:] if len(messages) > MAX_HISTORY else messages

strip_tool_traffic

strip_tool_traffic(messages)
Source code in src/core/memory/processors.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def strip_tool_traffic(messages: Sequence[ModelMessage]) -> list[ModelMessage]:
    cleaned: list[ModelMessage] = []
    for m in messages:
        if isinstance(m, ModelResponse):
            if any(isinstance(p, ToolCallPart) for p in m.parts):
                continue
            cleaned.append(m)
        elif isinstance(m, ModelRequest):
            pruned_parts: list[
                SystemPromptPart | UserPromptPart | ToolReturnPart | RetryPromptPart
            ] = [
                p for p in m.parts if isinstance(p, (SystemPromptPart, UserPromptPart))
            ]
            if pruned_parts:
                cleaned.append(ModelRequest(parts=pruned_parts))
    return cleaned