Implement a SQLAlchemy repository
This guide shows how to write a SqlAlchemyXxxRepository that uses SqlAlchemyQueryExecutor for raw-SQL persistence — no ORM, no magic.
Prerequisites
- A domain already scaffolded (
entity.py,repository.py,use_case.py,handler.py) - A running nene2 app using
AppSettingswithDB_ADAPTER=sqlite(ormysql/pgsql)
1. Create sqlalchemy_repository.py
Schema helper
Define ensure_schema() at the top of the file. Call it once at startup from create_app().
python
from nene2.database import DatabaseQueryExecutorInterface
def ensure_schema(executor: DatabaseQueryExecutorInterface) -> None:
executor.write(
"""
CREATE TABLE IF NOT EXISTS books (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
author TEXT NOT NULL,
isbn TEXT NOT NULL,
published_year INTEGER NOT NULL
)
"""
)Put
ensure_schema()insqlalchemy_repository.py(not a separateschema.py), so the schema definition lives next to the repository that owns the table. For multi-domain apps with a shared schema file, keep oneensure_schema()per domain and call them all fromcreate_app().
Row-to-entity helper
fetch_one / fetch_all return dict[str, Any]. Use a private static method to centralise the mapping and keep each query method lean.
python
from typing import Any
from .entity import Book
from .repository import BookRepositoryInterface
class SqlAlchemyBookRepository(BookRepositoryInterface):
def __init__(self, executor: DatabaseQueryExecutorInterface) -> None:
self._executor = executor
@staticmethod
def _to_book(row: dict[str, Any]) -> Book:
return Book(
id=row["id"],
title=row["title"],
author=row["author"],
isbn=row["isbn"],
published_year=row["published_year"],
)Use
dict[str, Any]— notdict[str, object].fetch_one()/fetch_all()returndict[str, Any], sorow["id"]isAnywhich is assignable tointundermypy --strictwithout any casts. Usingdict[str, object]instead requires# type: ignore[call-overload]and triggers follow-upunused-ignoreerrors.
Full implementation
python
class SqlAlchemyBookRepository(BookRepositoryInterface):
def __init__(self, executor: DatabaseQueryExecutorInterface) -> None:
self._executor = executor
def find_all(self, limit: int, offset: int) -> list[Book]:
rows = self._executor.fetch_all(
"SELECT id, title, author, isbn, published_year FROM books "
"ORDER BY id LIMIT :limit OFFSET :offset",
{"limit": limit, "offset": offset},
)
return [self._to_book(row) for row in rows]
def count_all(self) -> int:
row = self._executor.fetch_one("SELECT COUNT(*) AS cnt FROM books")
return int(row["cnt"]) if row else 0
def find_by_id(self, book_id: int) -> Book | None:
row = self._executor.fetch_one(
"SELECT id, title, author, isbn, published_year FROM books WHERE id = :id",
{"id": book_id},
)
return self._to_book(row) if row else None
def save(self, book: Book) -> Book:
new_id = self._executor.write(
"INSERT INTO books (title, author, isbn, published_year) "
"VALUES (:title, :author, :isbn, :published_year)",
{"title": book.title, "author": book.author,
"isbn": book.isbn, "published_year": book.published_year},
)
return Book(id=new_id, title=book.title, author=book.author,
isbn=book.isbn, published_year=book.published_year)
def update(self, book: Book) -> Book:
self._executor.write(
"UPDATE books SET title=:title, author=:author, isbn=:isbn, "
"published_year=:published_year WHERE id=:id",
{"title": book.title, "author": book.author,
"isbn": book.isbn, "published_year": book.published_year, "id": book.id},
)
return book
def delete(self, book_id: int) -> None:
self._executor.write("DELETE FROM books WHERE id = :id", {"id": book_id})
@staticmethod
def _to_book(row: dict[str, Any]) -> Book:
return Book(
id=row["id"],
title=row["title"],
author=row["author"],
isbn=row["isbn"],
published_year=row["published_year"],
)2. Wire into create_app()
python
from sqlalchemy import create_engine
from sqlalchemy.pool import StaticPool
from nene2.config import AppSettings
from nene2.database import SqlAlchemyQueryExecutor
from bookshelf.repository import BookRepositoryInterface, InMemoryBookRepository
from bookshelf.sqlalchemy_repository import SqlAlchemyBookRepository, ensure_schema
def _build_repository(cfg: AppSettings) -> BookRepositoryInterface:
if cfg.db_adapter == "sqlite":
is_memory = cfg.db_name == ":memory:"
engine = create_engine(
cfg.db_url,
connect_args={"check_same_thread": False},
poolclass=StaticPool if is_memory else None,
)
executor = SqlAlchemyQueryExecutor(engine)
ensure_schema(executor) # creates table on first run
return SqlAlchemyBookRepository(executor)
return InMemoryBookRepository() # fallback for tests / local devWrap the if/else branch in a helper function like
_build_repository()that returns the interface type. This is cleaner than declaringrepo: BookRepositoryInterfacebefore an if/else block increate_app()— both approaches satisfymypy --strict, but the helper keepscreate_app()readable.If you prefer inline branching, declare the type first:
pythonrepo: BookRepositoryInterface if cfg.db_adapter == "sqlite": repo = SqlAlchemyBookRepository(executor) else: repo = InMemoryBookRepository()
StaticPoolis required for SQLite in-memory databases (DB_NAME=:memory:) to prevent SQLAlchemy from opening multiple connections — each of which would see an empty database. File-based SQLite and other adapters do not need it.
3. write() return value
executor.write() returns:
| Operation | Return value |
|---|---|
INSERT | lastrowid — the auto-generated integer primary key of the new row |
UPDATE / DELETE | rowcount — number of rows affected (0 if nothing matched) |
Use lastrowid to reconstruct the entity after INSERT:
python
new_id = self._executor.write("INSERT INTO ...", {...})
return Book(id=new_id, ...)Use rowcount to detect missing rows on UPDATE / DELETE:
python
affected = self._executor.write("UPDATE books SET ... WHERE id = :id", {"id": book_id})
if affected == 0:
raise BookNotFoundException(book_id)
lastrowidis guaranteed to be a positiveintfor single-row INSERTs on SQLite, MySQL, and PostgreSQL. It is0for multi-row INSERTs or when the table has noAUTOINCREMENT/SERIALcolumn — avoid those patterns if you need the ID back.
4. Entities with datetime fields
When your entity has a created_at: datetime field backed by a database-generated DEFAULT CURRENT_TIMESTAMP, use parse_db_datetime() from nene2.database.
Why it is needed
SQLite stores CURRENT_TIMESTAMP as a plain string ("2026-05-20 12:34:56"), not as a Python datetime object. datetime.fromisoformat() parses the string but returns a naive datetime (no timezone), so the JSON response leaks an ambiguous timestamp. parse_db_datetime() handles all three cases transparently:
| Driver | Raw value | After parse_db_datetime() |
|---|---|---|
| SQLite | "2026-05-20 12:34:56" (str) | datetime(…, tzinfo=UTC) |
| MySQL/PostgreSQL | naive datetime object | datetime(…, tzinfo=UTC) |
| MySQL/PostgreSQL | aware datetime object | unchanged |
Schema
python
"created_at DATETIME DEFAULT CURRENT_TIMESTAMP"SELECT-after-INSERT pattern
After write() you only get back the lastrowid, not the DB-generated created_at. Do a second fetch_one() to retrieve the full row:
python
from datetime import datetime
from typing import Any
from nene2.database import DatabaseQueryExecutorInterface, parse_db_datetime
from .entity import Post
def _to_post(row: dict[str, Any]) -> Post:
return Post(
id=row["id"],
title=row["title"],
body=row["body"],
created_at=parse_db_datetime(row["created_at"]),
)
class SqlAlchemyPostRepository(PostRepositoryInterface):
def save(self, title: str, body: str) -> Post:
new_id = self._executor.write(
"INSERT INTO posts (title, body) VALUES (:title, :body)",
{"title": title, "body": body},
)
row = self._executor.fetch_one(
"SELECT id, title, body, created_at FROM posts WHERE id = :id",
{"id": new_id},
)
if row is None:
raise RuntimeError(f"Row {new_id} not found after INSERT into posts")
return _to_post(row)The
if row is None: raise RuntimeError(...)guard is needed becausefetch_one()returnsdict | None. The row cannot actually beNoneright after INSERT — the guard exists to satisfy the type checker. PreferRuntimeErroroverassert:assertis stripped bypython -Oand flagged by ruff's S101 rule in non-test code.
InMemory repository with datetime
The InMemoryXxxRepository should generate the timestamp in Python:
python
from datetime import datetime, timezone
def save(self, title: str, body: str) -> Post:
now = datetime.now(timezone.utc)
post = Post(id=self._next_id, title=title, body=body, created_at=now)
self._store[self._next_id] = post
self._next_id += 1
return postJSON serialisation
datetime.isoformat() on a UTC-aware datetime produces "2026-05-20T12:34:56+00:00". Return it as a string in the response dict:
python
def _post_dict(post: Post) -> dict[str, object]:
return {
"id": post.id,
"title": post.title,
"body": post.body,
"created_at": post.created_at.isoformat(), # "2026-05-20T12:34:56+00:00"
}5. Nested resources — ownership validation in DELETE
When a resource is nested under a parent (e.g. DELETE /posts/{post_id}/comments/{comment_id}), always validate that the child belongs to the parent in the UseCase, not just in the database.
Wrong — ignores post_id
python
# handler
@router.delete("/posts/{post_id}/comments/{comment_id}", status_code=204)
async def delete_comment(post_id: int, comment_id: int) -> None:
delete_use_case.execute(DeleteCommentInput(comment_id)) # post_id unused!This allows DELETE /posts/1/comments/5 to delete comment 5 even when it belongs to post 2.
Correct — validate ownership in the UseCase
python
# use_case.py
@dataclass(frozen=True, slots=True)
class DeleteCommentInput:
post_id: int
comment_id: int
class DeleteCommentUseCase:
def execute(self, input_: DeleteCommentInput) -> None:
comment = self._repository.find_by_id(input_.comment_id)
if comment is None or comment.post_id != input_.post_id:
raise CommentNotFoundException(input_.comment_id)
self._repository.delete(input_.comment_id)
# handler
@router.delete("/posts/{post_id}/comments/{comment_id}", status_code=204)
async def delete_comment(post_id: int, comment_id: int) -> None:
delete_use_case.execute(DeleteCommentInput(post_id, comment_id))The same pattern applies to GET and PUT on nested resources: always pass
post_idinto the UseCase and verifycomment.post_id == input_.post_id.
6. Use InMemoryXxxRepository in tests
Never mock the database. Use the in-memory implementation for unit tests:
python
from bookshelf.repository import InMemoryBookRepository
from bookshelf.use_case import CreateBookUseCase, CreateBookInput
def test_create_book() -> None:
repo = InMemoryBookRepository()
use_case = CreateBookUseCase(repo)
book = use_case.execute(CreateBookInput(
title="Clean Code", author="Robert C. Martin",
isbn="978-0132350884", published_year=2008,
))
assert book.id == 1
assert book.title == "Clean Code"For SQLAlchemy repository tests, use an in-memory SQLite engine:
python
from sqlalchemy import create_engine
from sqlalchemy.pool import StaticPool
from nene2.database import SqlAlchemyQueryExecutor
from bookshelf.sqlalchemy_repository import SqlAlchemyBookRepository, ensure_schema
def _make_repo() -> SqlAlchemyBookRepository:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
executor = SqlAlchemyQueryExecutor(engine)
ensure_schema(executor)
return SqlAlchemyBookRepository(executor)7. Atomic multi-write operations with transactional()
When a UseCase needs to write to multiple tables atomically, use SqlAlchemyTransactionManager.transactional() together with _in_tx repository methods.
Define _in_tx methods on the interface
Add dedicated methods that accept an explicit executor parameter. These are called only inside a transactional() callback — never outside one.
python
from nene2.database import DatabaseQueryExecutorInterface
from abc import ABC, abstractmethod
class AccountRepositoryInterface(ABC):
@abstractmethod
def find_by_id(self, account_id: int) -> Account | None: ...
# _in_tx variants — executor is provided by the transactional() callback
@abstractmethod
def find_by_id_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int
) -> Account | None: ...
@abstractmethod
def update_balance_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int, delta_cents: int
) -> None: ...Implement _in_tx methods in the SQLAlchemy repository
The _in_tx methods use the passed-in executor instead of self._executor, so they share the same connection and participate in the same transaction.
python
class SqlAlchemyAccountRepository(AccountRepositoryInterface):
def __init__(self, executor: SqlAlchemyQueryExecutor) -> None:
self._executor = executor
def find_by_id(self, account_id: int) -> Account | None:
row = self._executor.fetch_one(
"SELECT id, name, balance_cents FROM accounts WHERE id = :id",
{"id": account_id},
)
return self._to_entity(row) if row else None
def find_by_id_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int
) -> Account | None:
row = executor.fetch_one(
"SELECT id, name, balance_cents FROM accounts WHERE id = :id",
{"id": account_id},
)
return self._to_entity(row) if row else None
def update_balance_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int, delta_cents: int
) -> None:
executor.write(
"UPDATE accounts SET balance_cents = balance_cents + :delta WHERE id = :id",
{"delta": delta_cents, "id": account_id},
)Wire the UseCase with SqlAlchemyTransactionManager
python
from nene2.database import SqlAlchemyTransactionManager
engine = create_engine(cfg.db_url, connect_args={"check_same_thread": False})
transaction_manager = SqlAlchemyTransactionManager(engine)
transfer_use_case = TransferUseCase(transaction_manager, account_repo, transfer_repo)Implement InMemory _in_tx for unit tests
The InMemory implementation ignores the executor — operations go directly to the in-memory store. InMemoryTransactionManager calls the callback immediately with a no-op executor.
python
from nene2.database import DatabaseQueryExecutorInterface, DatabaseTransactionManagerInterface
from collections.abc import Callable
class InMemoryAccountRepository(AccountRepositoryInterface):
def find_by_id_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int
) -> Account | None:
return self._accounts.get(account_id)
def update_balance_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int, delta_cents: int
) -> None:
account = self._accounts[account_id]
self._accounts[account_id] = Account(
id=account.id, name=account.name, balance_cents=account.balance_cents + delta_cents
)
class _NoOpExecutor(DatabaseQueryExecutorInterface):
def fetch_all(self, sql: str, params: dict[str, object] | None = None) -> list[dict[str, object]]:
return []
def fetch_one(self, sql: str, params: dict[str, object] | None = None) -> dict[str, object] | None:
return None
def write(self, sql: str, params: dict[str, object] | None = None) -> int:
return 0
class InMemoryTransactionManager(DatabaseTransactionManagerInterface):
def transactional[T](self, callback: Callable[[DatabaseQueryExecutorInterface], T]) -> T:
return callback(_NoOpExecutor())
def begin(self) -> None: pass
def commit(self) -> None: pass
def rollback(self) -> None: passRollback on exception:
SqlAlchemyTransactionManager.transactional()usesengine.begin()— any exception inside the callback triggers an automatic rollback. Domain exceptions (AccountNotFoundException, etc.) propagate normally after rollback.
6. Using MySQL 8
Required packages
MySQL 8 uses caching_sha2_password authentication by default. Install both pymysql and cryptography — without cryptography the connection fails with Authentication plugin 'caching_sha2_password' is not supported.
bash
uv add pymysql cryptographyConnection URL
python
url = f"mysql+pymysql://{user}:{password}@{host}:{port}/{name}"
engine = create_engine(url, pool_pre_ping=True)pool_pre_ping=True is recommended for MySQL — it tests the connection before use to handle stale connections after the server's wait_timeout.
Health check
DatabaseHealthCheck takes a SqlAlchemyQueryExecutor, not the engine directly:
python
from nene2.database import DatabaseHealthCheck, SqlAlchemyQueryExecutor
executor = SqlAlchemyQueryExecutor(engine)
health = DatabaseHealthCheck(executor) # ← executor, not engine
app.add_api_route("/health", health.check, methods=["GET"])CURRENT_TIMESTAMP type difference
SQLite returns CURRENT_TIMESTAMP as a str; MySQL returns a naive datetime object. Use parse_db_datetime() in _to_entity() to handle both transparently:
python
from nene2.database import parse_db_datetime
def _to_entity(row: dict[str, Any]) -> Product:
return Product(
id=int(row["id"]),
created_at=parse_db_datetime(row["created_at"]), # works for both SQLite and MySQL
)Docker Compose example
yaml
services:
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: rootpass
MYSQL_DATABASE: mydb
MYSQL_USER: appuser
MYSQL_PASSWORD: apppass
ports:
- "3310:3306"
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-uappuser", "-papppass"]
interval: 5s
timeout: 5s
retries: 10
app:
build: .
depends_on:
mysql:
condition: service_healthy