import asyncio from logging.config import fileConfig from sqlalchemy import pool from sqlalchemy.engine import Connection from sqlalchemy.ext.asyncio import async_engine_from_config from alembic import context # Import your models from app.core.database import Base from app.models import User, Team, Tree, Session, Attachment, InviteCode from app.models.email_verification_token import EmailVerificationToken from app.models.tree_embedding import TreeEmbedding from app.models.copilot_conversation import CopilotConversation from app.models.assistant_chat import AssistantChat from app.models.survey_response import SurveyResponse from app.models.survey_invite import SurveyInvite from app.models.ai_suggestion import AISuggestion # noqa: F401 from app.models.kb_import import KBImport, KBImportNode # noqa: F401 from app.models.script_template import ScriptCategory, ScriptTemplate, ScriptGeneration # noqa: F401 from app.models.psa_connection import PsaConnection # noqa: F401 from app.models.ai_session import AISession # noqa: F401 from app.models.ai_session_step import AISessionStep # noqa: F401 from app.models.psa_post_log import PsaPostLog # noqa: F401 from app.models.psa_member_mapping import PsaMemberMapping # noqa: F401 from app.models.script_builder_session import ScriptBuilderSession, ScriptBuilderMessage # noqa: F401 from app.models.session_branch import SessionBranch # noqa: F401 from app.models.fork_point import ForkPoint # noqa: F401 from app.models.session_handoff import SessionHandoff # noqa: F401 from app.models.session_resolution_output import SessionResolutionOutput # noqa: F401 import os from app.core.config import settings def _alembic_sync_url() -> str: """Return a psycopg2-compatible sync URL for Alembic. Priority order: 1. Railway native PG env vars (PGHOST/PGPASSWORD/PGUSER/PGDATABASE) — these are auto-linked per-environment by the Railway PostgreSQL service, so they always carry the correct superuser credentials for the current environment (production, PR preview, etc.), including fresh DBs with no custom roles yet. 2. ADMIN_DATABASE_URL (resolutionflow_admin, BYPASSRLS) converted to a sync driver — used for local dev and stable environments where the role exists. 3. DATABASE_URL_SYNC — last resort / legacy fallback. """ pg_host = os.getenv("PGHOST") pg_user = os.getenv("PGUSER") pg_password = os.getenv("PGPASSWORD") pg_db = os.getenv("PGDATABASE") pg_port = os.getenv("PGPORT", "5432") if all([pg_host, pg_user, pg_password, pg_db]): return f"postgresql://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{pg_db}" admin_url = settings.ADMIN_DATABASE_URL if admin_url and "+asyncpg" in admin_url: return admin_url.replace("postgresql+asyncpg://", "postgresql://") return settings.DATABASE_URL_SYNC # this is the Alembic Config object config = context.config # Override sqlalchemy.url with the sync version for migrations config.set_main_option("sqlalchemy.url", _alembic_sync_url()) # Interpret the config file for Python logging. if config.config_file_name is not None: fileConfig(config.config_file_name) # add your model's MetaData object here target_metadata = Base.metadata def run_migrations_offline() -> None: """Run migrations in 'offline' mode.""" url = config.get_main_option("sqlalchemy.url") context.configure( url=url, target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, ) with context.begin_transaction(): context.run_migrations() def do_run_migrations(connection: Connection) -> None: context.configure(connection=connection, target_metadata=target_metadata) with context.begin_transaction(): context.run_migrations() async def run_async_migrations() -> None: """Run migrations in 'online' mode with async engine.""" from sqlalchemy.ext.asyncio import create_async_engine connectable = create_async_engine( settings.DATABASE_URL, poolclass=pool.NullPool, ) async with connectable.connect() as connection: await connection.run_sync(do_run_migrations) await connectable.dispose() def run_migrations_online() -> None: """Run migrations in 'online' mode.""" from sqlalchemy import create_engine connectable = create_engine( _alembic_sync_url(), poolclass=pool.NullPool, ) with connectable.connect() as connection: do_run_migrations(connection) if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online()