pgmonkey (8/8) — Production Patterns

Summary: Put pgmonkey into production — transaction management, GUC session settings, connection caching, Flask and FastAPI integration patterns, error handling, and a production readiness checklist.

KeyValue
pgmonkey version4.0.0
Python version3.12
Config filepgmonkey_config.yaml
Database nametutorial_db
Flask app fileapp.py
FastAPI app filemain.py

0. Prerequisites

  • Completed Parts 1-4 (pgmonkey installed, connection types understood, pooling configured)
  • Familiarity with Part 5 (environment variables) for production deployments
  • Flask or FastAPI installed if you plan to run the web framework examples

Beginning of tutorial series can be found here.


1. Transaction Management

pgmonkey’s context managers handle transactions automatically.

Auto-commit on clean exit:

from pgmonkey import PGConnectionManager
manager = PGConnectionManager()
conn = manager.get_database_connection('pgmonkey_config.yaml', 'normal')
with conn as c:
with c.cursor() as cur:
cur.execute("INSERT INTO customers (name, email) VALUES (%s, %s)", ('Frank', '[email protected]'))
# Transaction committed automatically

Auto-rollback on exception:

try:
with conn as c:
with c.cursor() as cur:
cur.execute("INSERT INTO customers (name, email) VALUES (%s, %s)", ('Grace', '[email protected]'))
raise ValueError("Something went wrong")
except ValueError:
pass
# Transaction rolled back automatically — Grace was not inserted

Explicit transactions:

with conn as c:
with c.transaction():
with c.cursor() as cur:
cur.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1;")
cur.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2;")
# Both updates commit together, or both roll back

Autocommit mode for DDL statements that cannot run inside a transaction (CREATE DATABASE, VACUUM, etc.):

connection_settings:
autocommit: true

Note: Autocommit applies to the connection, not individual statements. Use a separate config file (or a separate connection_type call) for DDL operations.


2. GUC Session Settings

PostgreSQL GUC (Grand Unified Configuration) parameters control per-session behavior. pgmonkey applies these automatically after connecting.

Sync connections use sync_settings:

sync_settings:
statement_timeout: '30000'
lock_timeout: '10000'
idle_in_transaction_session_timeout: '5000'
work_mem: '256MB'

Async connections use async_settings:

async_settings:
statement_timeout: '30000'
lock_timeout: '10000'
idle_in_transaction_session_timeout: '5000'
SettingPurposeRecommended
statement_timeoutCancel queries exceeding this time (ms)30000 (30s) for web apps
lock_timeoutTimeout waiting for locks (ms)10000 (10s)
idle_in_transaction_session_timeoutKill idle-in-transaction sessions (ms)5000 (5s)
work_memMemory per sort/hash operation256MB for analytics, 64MB for web apps

These settings protect your application from runaway queries, deadlocks, and idle transactions that hold locks.

How pgmonkey applies them:

  • For normal connections: SET commands after connect
  • For pool connections: via psycopg_pool’s configure callback on each checkout
  • For async connections: SET commands after connect
  • For async_pool connections: via psycopg_pool’s configure callback on each checkout

The pool callbacks ensure every borrowed connection has the right settings, even if the connection was previously used with different parameters.

Note: pgmonkey uses safe SQL composition (psycopg.sql.SQL and sql.Identifier) when applying GUC settings — no risk of SQL injection from setting names.


3. Connection Caching

pgmonkey caches connections and pools by configuration content. Calling get_database_connection() with the same config and connection type returns the same instance.

manager = PGConnectionManager()
pool_a = manager.get_database_connection('pgmonkey_config.yaml', 'pool')
pool_b = manager.get_database_connection('pgmonkey_config.yaml', 'pool')
assert pool_a is pool_b # Same pool, not a duplicate

This prevents pool storms — bursts of requests that each create a new pool and overwhelm the database with connections.

Cache management API:

# Inspect the cache
info = manager.cache_info
print(info) # {'size': 2, 'connection_types': {'pool', 'async_pool'}}
# Force a fresh connection (discards the cached one)
fresh = manager.get_database_connection('pgmonkey_config.yaml', 'pool', force_reload=True)
# Clear all cached connections (sync)
manager.clear_cache()
# Clear all cached connections (async — use inside an event loop)
await manager.clear_cache_async()

Cache keys are computed from the resolved config content (SHA-256 hash) plus the connection type. Different connection types with the same config get separate cache entries. Changing a config value produces a different cache key and a new connection.

Tip: pgmonkey registers an atexit handler that closes all cached connections and pools when the process exits. You do not need to clean up manually in most applications.


4. Flask Integration

Flask uses synchronous request handling. Use a pool connection with a shared Database class.

from pgmonkey import PGConnectionManager
class Database:
def __init__(self, config_path):
self.manager = PGConnectionManager()
self.config_path = config_path
self.pool = self.manager.get_database_connection(config_path, 'pool')
def fetch_one(self, query, params=None):
with self.pool as conn:
with conn.cursor() as cur:
cur.execute(query, params)
return cur.fetchone()
def fetch_all(self, query, params=None):
with self.pool as conn:
with conn.cursor() as cur:
cur.execute(query, params)
return cur.fetchall()
def execute(self, query, params=None):
with self.pool as conn:
with conn.cursor() as cur:
cur.execute(query, params)

Wire it into your Flask app.

from flask import Flask
app = Flask(__name__)
db = Database('pgmonkey_config.yaml')
@app.route('/customers')
def list_customers():
rows = db.fetch_all('SELECT id, name, email FROM customers ORDER BY id;')
return {'customers': [{'id': r[0], 'name': r[1], 'email': r[2]} for r in rows]}

The pool is created once when the Database object is instantiated. Each request borrows a connection from the pool and returns it when the route handler completes. Gunicorn workers each get their own pool instance (separate processes), and pgmonkey’s thread-safe pool handles concurrent requests within each worker.


5. FastAPI Integration

FastAPI uses asynchronous request handling. Use an async_pool connection with an AsyncDatabase class.

from pgmonkey import PGConnectionManager
class AsyncDatabase:
def __init__(self, config_path):
self.manager = PGConnectionManager()
self.config_path = config_path
self.pool = None
async def connect(self):
self.pool = await self.manager.get_database_connection(
self.config_path, 'async_pool'
)
async def disconnect(self):
await self.manager.clear_cache_async()
async def fetch_one(self, query, params=None):
async with self.pool as conn:
async with conn.cursor() as cur:
await cur.execute(query, params)
return await cur.fetchone()
async def fetch_all(self, query, params=None):
async with self.pool as conn:
async with conn.cursor() as cur:
await cur.execute(query, params)
return await cur.fetchall()
async def execute(self, query, params=None):
async with self.pool as conn:
async with conn.cursor() as cur:
await cur.execute(query, params)

Wire it into your FastAPI app.

from fastapi import FastAPI
app = FastAPI()
db = AsyncDatabase('pgmonkey_config.yaml')
@app.on_event("startup")
async def startup():
await db.connect()
@app.on_event("shutdown")
async def shutdown():
await db.disconnect()
@app.get("/customers")
async def list_customers():
rows = await db.fetch_all('SELECT id, name, email FROM customers ORDER BY id;')
return {"customers": [{"id": r[0], "name": r[1], "email": r[2]} for r in rows]}

The pool is created at application startup and shared across all request handlers. Each request borrows a connection from the async pool and returns it when the handler completes. pgmonkey’s per-instance ContextVar ensures connections are not shared across concurrent tasks.


6. Error Handling

pgmonkey raises standard psycopg exceptions. Catch them at your application boundary.

import psycopg
from pgmonkey import PGConnectionManager
manager = PGConnectionManager()
try:
conn = manager.get_database_connection('pgmonkey_config.yaml', 'normal')
with conn as c:
with c.cursor() as cur:
cur.execute('SELECT * FROM nonexistent_table;')
except psycopg.errors.UndefinedTable:
print("Table does not exist")
except psycopg.OperationalError:
print("Could not connect to the database")

Common exceptions:

ExceptionCause
psycopg.OperationalErrorConnection failed (wrong host, port, credentials)
psycopg.errors.UndefinedTableTable does not exist
psycopg.errors.InsufficientPrivilegeUser lacks required permissions
psycopg.errors.QueryCanceledQuery exceeded statement_timeout
pgmonkey.EnvInterpolationErrorMissing environment variable or secret file

Tip: pgmonkey raises EnvInterpolationError at connection time — not at query time. If your environment variables are wrong, you find out immediately.


7. Production Readiness Checklist

Before deploying, verify each item.

ItemHow to verify
Credentials not hardcodedConfig uses ${VAR}, from_env, or from_file (Part 5)
SSL enabledsslmode set to require or higher (Part 3)
Pool sized correctlymax_size accounts for peak concurrency (Part 4)
Health checks enabledcheck_on_checkout: true in pool settings (Part 4)
GUC settings configuredstatement_timeout and lock_timeout set (Section 2 above)
Server config alignedpgserverconfig --audit shows no mismatches (Part 6)
Connection caching activeSingle PGConnectionManager instance shared across app
Error handling in placepsycopg exceptions caught at application boundary
Keepalives enabledkeepalives: '1' with appropriate intervals (Part 1)

Summary

You integrated pgmonkey into production applications.

  • Transactions commit on clean exit and roll back on exceptions
  • GUC session settings protect against runaway queries and deadlocks
  • Connection caching prevents pool storms and duplicate pool creation
  • Flask uses pool connections with a shared Database class
  • FastAPI uses async_pool connections with startup/shutdown lifecycle
  • Standard psycopg exceptions for error handling — no custom exception hierarchy to learn
  • The production checklist ties together every topic from all eight parts

This concludes the pgmonkey tutorial series. You started with a pip install and ended with production-ready Flask and FastAPI integration, backed by SSL, pooling, environment variable interpolation, and server-side auditing — all driven by a single YAML config file.

For more information:

Leave a Reply