Skip to content

Infra

infrastructure.settings

ENV_PATH module-attribute

ENV_PATH = join(dirname(dirname(__file__)), '.env')

settings module-attribute

settings = Settings()

Settings

Bases: BaseSettings

Source code in src/infrastructure/settings.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class Settings(BaseSettings):
    # --- Core / APIs ---
    openai_api_key: str = Field(
        default="",
        # Acepta BLAKIA_OPENAI_API_KEY o OPENAI_API_KEY
        validation_alias=AliasChoices("BLAKIA_OPENAI_API_KEY", "OPENAI_API_KEY"),
    )
    telegram_bot_token: Optional[str] = Field(
        default=None,
        validation_alias=AliasChoices("BLAKIA_TELEGRAM_BOT_TOKEN", "TELEGRAM_BOT_TOKEN"),
    )
    empresas_api_token: Optional[str] = Field(
        default=None,
        validation_alias=AliasChoices("BLAKIA_EMPRESAS_API_TOKEN", "EMPRESAS_API_TOKEN"),
    )
    generic_webhook_api_key: Optional[str] = Field(
        default=None,
        validation_alias=AliasChoices("BLAKIA_GENERIC_WEBHOOK_API_KEY", "GENERIC_WEBHOOK_API_KEY"),
    )

    # --- WhatsApp / Meta ---
    whatsapp_token: Optional[str] = Field(
        default=None,
        validation_alias=AliasChoices("BLAKIA_WHATSAPP_TOKEN", "WHATSAPP_TOKEN"),
    )
    whatsapp_phone_id: Optional[str] = Field(
        default=None,
        validation_alias=AliasChoices("BLAKIA_WHATSAPP_PHONE_ID", "WHATSAPP_PHONE_ID"),
    )
    whatsapp_verify_token: Optional[str] = Field(
        default=None,
        validation_alias=AliasChoices("BLAKIA_WHATSAPP_VERIFY_TOKEN", "WHATSAPP_VERIFY_TOKEN"),
    )
    whatsapp_api_version: str = Field(
        default="v19.0",
        validation_alias=AliasChoices("BLAKIA_WHATSAPP_API_VERSION", "WHATSAPP_API_VERSION"),
    )
    meta_app_secret: Optional[str] = Field(
        default=None,
        validation_alias=AliasChoices("BLAKIA_META_APP_SECRET", "META_APP_SECRET"),
    )

    # --- Memory / Redis ---
    memory_backend: str = Field(
        default="in_memory",
        validation_alias=AliasChoices("BLAKIA_MEMORY_BACKEND", "MEMORY_BACKEND"),
    )
    redis_url: Optional[str] = Field(
        default=None,
        validation_alias=AliasChoices("BLAKIA_REDIS_URL", "REDIS_URL"),
    )
    redis_host: str = Field(
        default="localhost",
        validation_alias=AliasChoices("BLAKIA_REDIS_HOST", "REDIS_HOST"),
    )
    redis_port: int = Field(
        default=6379,
        validation_alias=AliasChoices("BLAKIA_REDIS_PORT", "REDIS_PORT"),
    )
    redis_db: int = Field(
        default=0,
        validation_alias=AliasChoices("BLAKIA_REDIS_DB", "REDIS_DB"),
    )
    redis_password: Optional[str] = Field(
        default=None,
        validation_alias=AliasChoices("BLAKIA_REDIS_PASSWORD", "REDIS_PASSWORD"),
    )

    # --- Langfuse ---
    langfuse_public_key: Optional[str] = Field(
        default=None,
        validation_alias=AliasChoices("BLAKIA_LANGFUSE_PUBLIC_KEY", "LANGFUSE_PUBLIC_KEY"),
    )
    langfuse_secret_key: Optional[str] = Field(
        default=None,
        validation_alias=AliasChoices("BLAKIA_LANGFUSE_SECRET_KEY", "LANGFUSE_SECRET_KEY"),
    )
    langfuse_host: str = Field(
        default="https://cloud.langfuse.com",
        validation_alias=AliasChoices("BLAKIA_LANGFUSE_HOST", "LANGFUSE_HOST"),
    )

    # --- OpenTelemetry ---
    otel_traces_exporter: Optional[Literal["otlp", "none"]] = Field(
        default="otlp",
        validation_alias=AliasChoices("BLAKIA_OTEL_TRACES_EXPORTER", "OTEL_TRACES_EXPORTER"),
    )
    otel_exporter_otlp_protocol: Optional[Literal["http/protobuf", "grpc"]] = Field(
        default="http/protobuf",
        validation_alias=AliasChoices("BLAKIA_OTEL_EXPORTER_OTLP_PROTOCOL", "OTEL_EXPORTER_OTLP_PROTOCOL"),
    )
    otel_service_name: str = Field(
        default="cordobai-agent",
        validation_alias=AliasChoices("BLAKIA_OTEL_SERVICE_NAME", "OTEL_SERVICE_NAME"),
    )
    otel_resource_attributes: Optional[str] = Field(
        default=None,
        validation_alias=AliasChoices("BLAKIA_OTEL_RESOURCE_ATTRIBUTES", "OTEL_RESOURCE_ATTRIBUTES"),
    )

    # --- Meta ---
    env: str = Field(default="dev", validation_alias=AliasChoices("BLAKIA_ENV", "ENV"))

    model_config = SettingsConfigDict(
        env_file=ENV_PATH,
        case_sensitive=False,
        extra="ignore",          # ↔ en tu antiguo usabas "forbid": relajamos para transición
        env_prefix="BLAKIA_",    # prefijo por defecto (seguimos aceptando legacy via AliasChoices)
    )

    @model_validator(mode="after")
    def _validate_meta_whatsapp(self):
        # Si se configura cualquier credencial WAB, exige meta_app_secret
        if any([self.whatsapp_token, self.whatsapp_phone_id, self.whatsapp_verify_token]):
            if not self.meta_app_secret:
                raise ValueError("META_APP_SECRET/BLAKIA_META_APP_SECRET es obligatorio para verificar X-Hub-Signature-256")
        return self

openai_api_key class-attribute instance-attribute

openai_api_key = Field(
    default="",
    validation_alias=AliasChoices(
        "BLAKIA_OPENAI_API_KEY", "OPENAI_API_KEY"
    ),
)

telegram_bot_token class-attribute instance-attribute

telegram_bot_token = Field(
    default=None,
    validation_alias=AliasChoices(
        "BLAKIA_TELEGRAM_BOT_TOKEN", "TELEGRAM_BOT_TOKEN"
    ),
)

empresas_api_token class-attribute instance-attribute

empresas_api_token = Field(
    default=None,
    validation_alias=AliasChoices(
        "BLAKIA_EMPRESAS_API_TOKEN", "EMPRESAS_API_TOKEN"
    ),
)

generic_webhook_api_key class-attribute instance-attribute

generic_webhook_api_key = Field(
    default=None,
    validation_alias=AliasChoices(
        "BLAKIA_GENERIC_WEBHOOK_API_KEY",
        "GENERIC_WEBHOOK_API_KEY",
    ),
)

whatsapp_token class-attribute instance-attribute

whatsapp_token = Field(
    default=None,
    validation_alias=AliasChoices(
        "BLAKIA_WHATSAPP_TOKEN", "WHATSAPP_TOKEN"
    ),
)

whatsapp_phone_id class-attribute instance-attribute

whatsapp_phone_id = Field(
    default=None,
    validation_alias=AliasChoices(
        "BLAKIA_WHATSAPP_PHONE_ID", "WHATSAPP_PHONE_ID"
    ),
)

whatsapp_verify_token class-attribute instance-attribute

whatsapp_verify_token = Field(
    default=None,
    validation_alias=AliasChoices(
        "BLAKIA_WHATSAPP_VERIFY_TOKEN",
        "WHATSAPP_VERIFY_TOKEN",
    ),
)

whatsapp_api_version class-attribute instance-attribute

whatsapp_api_version = Field(
    default="v19.0",
    validation_alias=AliasChoices(
        "BLAKIA_WHATSAPP_API_VERSION",
        "WHATSAPP_API_VERSION",
    ),
)

meta_app_secret class-attribute instance-attribute

meta_app_secret = Field(
    default=None,
    validation_alias=AliasChoices(
        "BLAKIA_META_APP_SECRET", "META_APP_SECRET"
    ),
)

memory_backend class-attribute instance-attribute

memory_backend = Field(
    default="in_memory",
    validation_alias=AliasChoices(
        "BLAKIA_MEMORY_BACKEND", "MEMORY_BACKEND"
    ),
)

redis_url class-attribute instance-attribute

redis_url = Field(
    default=None,
    validation_alias=AliasChoices(
        "BLAKIA_REDIS_URL", "REDIS_URL"
    ),
)

redis_host class-attribute instance-attribute

redis_host = Field(
    default="localhost",
    validation_alias=AliasChoices(
        "BLAKIA_REDIS_HOST", "REDIS_HOST"
    ),
)

redis_port class-attribute instance-attribute

redis_port = Field(
    default=6379,
    validation_alias=AliasChoices(
        "BLAKIA_REDIS_PORT", "REDIS_PORT"
    ),
)

redis_db class-attribute instance-attribute

redis_db = Field(
    default=0,
    validation_alias=AliasChoices(
        "BLAKIA_REDIS_DB", "REDIS_DB"
    ),
)

redis_password class-attribute instance-attribute

redis_password = Field(
    default=None,
    validation_alias=AliasChoices(
        "BLAKIA_REDIS_PASSWORD", "REDIS_PASSWORD"
    ),
)

langfuse_public_key class-attribute instance-attribute

langfuse_public_key = Field(
    default=None,
    validation_alias=AliasChoices(
        "BLAKIA_LANGFUSE_PUBLIC_KEY", "LANGFUSE_PUBLIC_KEY"
    ),
)

langfuse_secret_key class-attribute instance-attribute

langfuse_secret_key = Field(
    default=None,
    validation_alias=AliasChoices(
        "BLAKIA_LANGFUSE_SECRET_KEY", "LANGFUSE_SECRET_KEY"
    ),
)

langfuse_host class-attribute instance-attribute

langfuse_host = Field(
    default="https://cloud.langfuse.com",
    validation_alias=AliasChoices(
        "BLAKIA_LANGFUSE_HOST", "LANGFUSE_HOST"
    ),
)

otel_traces_exporter class-attribute instance-attribute

otel_traces_exporter = Field(
    default="otlp",
    validation_alias=AliasChoices(
        "BLAKIA_OTEL_TRACES_EXPORTER",
        "OTEL_TRACES_EXPORTER",
    ),
)

otel_exporter_otlp_protocol class-attribute instance-attribute

otel_exporter_otlp_protocol = Field(
    default="http/protobuf",
    validation_alias=AliasChoices(
        "BLAKIA_OTEL_EXPORTER_OTLP_PROTOCOL",
        "OTEL_EXPORTER_OTLP_PROTOCOL",
    ),
)

otel_service_name class-attribute instance-attribute

otel_service_name = Field(
    default="cordobai-agent",
    validation_alias=AliasChoices(
        "BLAKIA_OTEL_SERVICE_NAME", "OTEL_SERVICE_NAME"
    ),
)

otel_resource_attributes class-attribute instance-attribute

otel_resource_attributes = Field(
    default=None,
    validation_alias=AliasChoices(
        "BLAKIA_OTEL_RESOURCE_ATTRIBUTES",
        "OTEL_RESOURCE_ATTRIBUTES",
    ),
)

env class-attribute instance-attribute

env = Field(
    default="dev",
    validation_alias=AliasChoices("BLAKIA_ENV", "ENV"),
)

model_config class-attribute instance-attribute

model_config = SettingsConfigDict(
    env_file=ENV_PATH,
    case_sensitive=False,
    extra="ignore",
    env_prefix="BLAKIA_",
)

infrastructure.server

app module-attribute

app = build_app()

metrics_response

metrics_response()
Source code in src/infrastructure/server.py
17
18
19
def metrics_response() -> Response:
    # Fallback mínimo: no métricas reales, pero no rompe el /metrics
    return Response("# prometheus_client no instalado\n", media_type="text/plain")

build_app

build_app()
Source code in src/infrastructure/server.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def build_app() -> FastAPI:
    app = FastAPI(title="BlakIA Agent", version="0.1.0")

    # ---------- Rutas de negocio (webhooks) ----------
    # Genérico y WhatsApp siempre montados
    app.include_router(generic_router, prefix="/webhooks/generic", tags=["generic"])
    app.include_router(wab_router,     prefix="/webhooks/whatsapp", tags=["whatsapp"])

    # Telegram solo si hay token (handler hace NO-OP si no, pero mejor no montar rutas)
    if getattr(tg_handler, "TELEGRAM_BOT_TOKEN", None):
        app.include_router(tg_handler.router, prefix="/webhooks/telegram", tags=["telegram"])
    else:
        # Log ligero por consola para desarrollo
        print("⚠️ Telegram router NOT loaded: no TELEGRAM_BOT_TOKEN configured")

    # ---------- Operacional ----------
    @app.get("/", tags=["ops"])
    def index():
        return JSONResponse({"status": "ok", "docs": "/docs", "health": "/health", "metrics": "/metrics"})

    @app.get("/health", tags=["ops"])
    def health():
        # Si más adelante tienes settings, puedes incluir env aquí
        return JSONResponse({"ok": True})

    @app.get("/metrics", tags=["ops"])
    def metrics():
        return metrics_response()

    return app