pgmonkey (8/8) — Production Patterns
Summary: Put pgmonkey into production — transaction management, GUC session settings, connection caching, Flask and FastAPI integration patterns, error handling, and a production readiness checklist.
| Key | Value |
|---|---|
| pgmonkey version | 4.0.0 |
| Python version | 3.12 |
| Config file | pgmonkey_config.yaml |
| Database name | tutorial_db |
| Flask app file | app.py |
| FastAPI app file | main.py |
0. Prerequisites
- Completed Parts 1-4 (pgmonkey installed, connection types understood, pooling configured)
- Familiarity with Part 5 (environment variables) for production deployments
- Flask or FastAPI installed if you plan to run the web framework examples
Beginning of tutorial series can be found here.
1. Transaction Management
pgmonkey’s context managers handle transactions automatically.
Auto-commit on clean exit:
from pgmonkey import PGConnectionManager
manager = PGConnectionManager()
conn = manager.get_database_connection('pgmonkey_config.yaml', 'normal')
with conn as c:
with c.cursor() as cur:
cur.execute("INSERT INTO customers (name, email) VALUES (%s, %s)", ('Frank', '[email protected]'))
# Transaction committed automatically
Code language: PHP (php)
Auto-rollback on exception:
try:
with conn as c:
with c.cursor() as cur:
cur.execute("INSERT INTO customers (name, email) VALUES (%s, %s)", ('Grace', '[email protected]'))
raise ValueError("Something went wrong")
except ValueError:
pass
# Transaction rolled back automatically — Grace was not inserted
Code language: PHP (php)
Explicit transactions:
with conn as c:
with c.transaction():
with c.cursor() as cur:
cur.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1;")
cur.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2;")
# Both updates commit together, or both roll back
Code language: PHP (php)
Autocommit mode for DDL statements that cannot run inside a transaction (CREATE DATABASE, VACUUM, etc.):
connection_settings:
autocommit: true
Code language: JavaScript (javascript)
Note: Autocommit applies to the connection, not individual statements. Use a separate config file (or a separate
connection_typecall) for DDL operations.
2. GUC Session Settings
PostgreSQL GUC (Grand Unified Configuration) parameters control per-session behavior. pgmonkey applies these automatically after connecting.
Sync connections use sync_settings:
sync_settings:
statement_timeout: '30000'
lock_timeout: '10000'
idle_in_transaction_session_timeout: '5000'
work_mem: '256MB'
Code language: JavaScript (javascript)
Async connections use async_settings:
async_settings:
statement_timeout: '30000'
lock_timeout: '10000'
idle_in_transaction_session_timeout: '5000'
Code language: JavaScript (javascript)
| Setting | Purpose | Recommended |
|---|---|---|
statement_timeout | Cancel queries exceeding this time (ms) | 30000 (30s) for web apps |
lock_timeout | Timeout waiting for locks (ms) | 10000 (10s) |
idle_in_transaction_session_timeout | Kill idle-in-transaction sessions (ms) | 5000 (5s) |
work_mem | Memory per sort/hash operation | 256MB for analytics, 64MB for web apps |
These settings protect your application from runaway queries, deadlocks, and idle transactions that hold locks.
How pgmonkey applies them:
- For
normalconnections:SETcommands after connect - For
poolconnections: via psycopg_pool’sconfigurecallback on each checkout - For
asyncconnections:SETcommands after connect - For
async_poolconnections: via psycopg_pool’sconfigurecallback on each checkout
The pool callbacks ensure every borrowed connection has the right settings, even if the connection was previously used with different parameters.
Note: pgmonkey uses safe SQL composition (
psycopg.sql.SQLandsql.Identifier) when applying GUC settings — no risk of SQL injection from setting names.
3. Connection Caching
pgmonkey caches connections and pools by configuration content. Calling get_database_connection() with the same config and connection type returns the same instance.
manager = PGConnectionManager()
pool_a = manager.get_database_connection('pgmonkey_config.yaml', 'pool')
pool_b = manager.get_database_connection('pgmonkey_config.yaml', 'pool')
assert pool_a is pool_b # Same pool, not a duplicate
Code language: PHP (php)
This prevents pool storms — bursts of requests that each create a new pool and overwhelm the database with connections.
Cache management API:
# Inspect the cache
info = manager.cache_info
print(info) # {'size': 2, 'connection_types': {'pool', 'async_pool'}}
# Force a fresh connection (discards the cached one)
fresh = manager.get_database_connection('pgmonkey_config.yaml', 'pool', force_reload=True)
# Clear all cached connections (sync)
manager.clear_cache()
# Clear all cached connections (async — use inside an event loop)
await manager.clear_cache_async()
Code language: PHP (php)
Cache keys are computed from the resolved config content (SHA-256 hash) plus the connection type. Different connection types with the same config get separate cache entries. Changing a config value produces a different cache key and a new connection.
Tip: pgmonkey registers an
atexithandler that closes all cached connections and pools when the process exits. You do not need to clean up manually in most applications.
4. Flask Integration
Flask uses synchronous request handling. Use a pool connection with a shared Database class.
from pgmonkey import PGConnectionManager
class Database:
def __init__(self, config_path):
self.manager = PGConnectionManager()
self.config_path = config_path
self.pool = self.manager.get_database_connection(config_path, 'pool')
def fetch_one(self, query, params=None):
with self.pool as conn:
with conn.cursor() as cur:
cur.execute(query, params)
return cur.fetchone()
def fetch_all(self, query, params=None):
with self.pool as conn:
with conn.cursor() as cur:
cur.execute(query, params)
return cur.fetchall()
def execute(self, query, params=None):
with self.pool as conn:
with conn.cursor() as cur:
cur.execute(query, params)
Wire it into your Flask app.
from flask import Flask
app = Flask(__name__)
db = Database('pgmonkey_config.yaml')
@app.route('/customers')
def list_customers():
rows = db.fetch_all('SELECT id, name, email FROM customers ORDER BY id;')
return {'customers': [{'id': r[0], 'name': r[1], 'email': r[2]} for r in rows]}
Code language: JavaScript (javascript)
The pool is created once when the Database object is instantiated. Each request borrows a connection from the pool and returns it when the route handler completes. Gunicorn workers each get their own pool instance (separate processes), and pgmonkey’s thread-safe pool handles concurrent requests within each worker.
5. FastAPI Integration
FastAPI uses asynchronous request handling. Use an async_pool connection with an AsyncDatabase class.
from pgmonkey import PGConnectionManager
class AsyncDatabase:
def __init__(self, config_path):
self.manager = PGConnectionManager()
self.config_path = config_path
self.pool = None
async def connect(self):
self.pool = await self.manager.get_database_connection(
self.config_path, 'async_pool'
)
async def disconnect(self):
await self.manager.clear_cache_async()
async def fetch_one(self, query, params=None):
async with self.pool as conn:
async with conn.cursor() as cur:
await cur.execute(query, params)
return await cur.fetchone()
async def fetch_all(self, query, params=None):
async with self.pool as conn:
async with conn.cursor() as cur:
await cur.execute(query, params)
return await cur.fetchall()
async def execute(self, query, params=None):
async with self.pool as conn:
async with conn.cursor() as cur:
await cur.execute(query, params)
Wire it into your FastAPI app.
from fastapi import FastAPI
app = FastAPI()
db = AsyncDatabase('pgmonkey_config.yaml')
@app.on_event("startup")
async def startup():
await db.connect()
@app.on_event("shutdown")
async def shutdown():
await db.disconnect()
@app.get("/customers")
async def list_customers():
rows = await db.fetch_all('SELECT id, name, email FROM customers ORDER BY id;')
return {"customers": [{"id": r[0], "name": r[1], "email": r[2]} for r in rows]}
Code language: JavaScript (javascript)
The pool is created at application startup and shared across all request handlers. Each request borrows a connection from the async pool and returns it when the handler completes. pgmonkey’s per-instance ContextVar ensures connections are not shared across concurrent tasks.
6. Error Handling
pgmonkey raises standard psycopg exceptions. Catch them at your application boundary.
import psycopg
from pgmonkey import PGConnectionManager
manager = PGConnectionManager()
try:
conn = manager.get_database_connection('pgmonkey_config.yaml', 'normal')
with conn as c:
with c.cursor() as cur:
cur.execute('SELECT * FROM nonexistent_table;')
except psycopg.errors.UndefinedTable:
print("Table does not exist")
except psycopg.OperationalError:
print("Could not connect to the database")
Code language: JavaScript (javascript)
Common exceptions:
| Exception | Cause |
|---|---|
psycopg.OperationalError | Connection failed (wrong host, port, credentials) |
psycopg.errors.UndefinedTable | Table does not exist |
psycopg.errors.InsufficientPrivilege | User lacks required permissions |
psycopg.errors.QueryCanceled | Query exceeded statement_timeout |
pgmonkey.EnvInterpolationError | Missing environment variable or secret file |
Tip: pgmonkey raises
EnvInterpolationErrorat connection time — not at query time. If your environment variables are wrong, you find out immediately.
7. Production Readiness Checklist
Before deploying, verify each item.
| Item | How to verify |
|---|---|
| Credentials not hardcoded | Config uses ${VAR}, from_env, or from_file (Part 5) |
| SSL enabled | sslmode set to require or higher (Part 3) |
| Pool sized correctly | max_size accounts for peak concurrency (Part 4) |
| Health checks enabled | check_on_checkout: true in pool settings (Part 4) |
| GUC settings configured | statement_timeout and lock_timeout set (Section 2 above) |
| Server config aligned | pgserverconfig --audit shows no mismatches (Part 6) |
| Connection caching active | Single PGConnectionManager instance shared across app |
| Error handling in place | psycopg exceptions caught at application boundary |
| Keepalives enabled | keepalives: '1' with appropriate intervals (Part 1) |
Summary
You integrated pgmonkey into production applications.
- Transactions commit on clean exit and roll back on exceptions
- GUC session settings protect against runaway queries and deadlocks
- Connection caching prevents pool storms and duplicate pool creation
- Flask uses
poolconnections with a sharedDatabaseclass - FastAPI uses
async_poolconnections with startup/shutdown lifecycle - Standard
psycopgexceptions for error handling — no custom exception hierarchy to learn - The production checklist ties together every topic from all eight parts
This concludes the pgmonkey tutorial series. You started with a pip install and ended with production-ready Flask and FastAPI integration, backed by SSL, pooling, environment variable interpolation, and server-side auditing — all driven by a single YAML config file.
For more information:
- pgmonkey documentation: pgmonkey.net
- Source code: github.com/RexBytes/pgmonkey
- Best practice recipes: pgmonkey.net/best_practices.html
pgmonkey — All Parts
- 1 pgmonkey (1/8) — Getting Started
- 2 pgmonkey (2/8) — Connection Types
- 3 pgmonkey (3/8) — Securing Connections with SSL/TLS
- 4 pgmonkey (4/8) — Connection Pooling
- 5 pgmonkey (5/8) — Environment Variables and Secrets
- 6 pgmonkey (6/8) — The CLI Toolbox
- 7 pgmonkey (7/8) — CSV Import and Export
- 8 pgmonkey (8/8) — Production Patterns You are here
