66 SQL Injection Code Audit Exercises: From Beginner to Expert π
Master SQL injection detection through hands-on code review β no exploit development required.
Whether you're preparing for a Security Engineering interview, studying for the OSWE, or building AppSec skills, these exercises will train your eye to spot vulnerable code patterns across Python's most popular database libraries.
π― What You'll Learn
- Identify SQL injection vulnerabilities in sqlite3, psycopg2, mysql.connector, SQLAlchemy, and Django ORM
- Recognize when input validation IS sufficient vs. when parameterization is required
- Understand the difference between identifiers (column/table names) and values
- Write secure fixes using proper parameterization patterns
π Prerequisites
- Basic Python knowledge
- Basic SQL syntax understanding
- Familiarity with web application concepts
π Challenge Format
For each snippet, determine:
- Vulnerable? (Yes/No)
- If yes, write the fix
Scoring Guide:
- Correct identification: 1 point
- Correct fix (if vulnerable): 1 point
π Key Concepts Before You Start
Identifiers vs Values
| Type | Examples | Can Parameterize? | Protection |
|---|---|---|---|
| Identifier | Column names, table names, aliases, ASC/DESC
|
β No | Allowlist validation |
| Value | Data in WHERE, LIMIT, function arguments |
β Yes | Parameterization |
Placeholder Syntax by Library
| Library | Placeholder | Example |
|---|---|---|
| sqlite3 | ? |
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,)) |
| psycopg2 | %s |
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) |
| mysql.connector | %s |
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) |
| SQLAlchemy | :param |
conn.execute(text("SELECT * FROM users WHERE id = :id"), {"id": user_id}) |
Round 1: Fundamentals (Snippets 1-5)
Difficulty: Beginner
Snippet 1: Basic Authentication
import sqlite3
def authenticate_user(username, password):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
cursor.execute(query)
user = cursor.fetchone()
conn.close()
if user:
return {"status": "success", "user_id": user[0]}
else:
return {"status": "failed"}
Snippet 2: Product Search
import psycopg2
def search_products(search_term):
conn = psycopg2.connect("dbname=store")
cursor = conn.cursor()
query = "SELECT name, price FROM products WHERE name ILIKE %s"
cursor.execute(query, (f"%{search_term}%",))
results = cursor.fetchall()
conn.close()
return results
Snippet 3: Account Check
import mysql.connector
def check_account_exists(email):
conn = mysql.connector.connect(host="localhost", database="accounts")
cursor = conn.cursor()
query = "SELECT 1 FROM users WHERE email = '" + email + "' LIMIT 1"
cursor.execute(query)
exists = cursor.fetchone() is not None
conn.close()
return {"exists": exists}
Snippet 4: Order Status Lookup
from sqlalchemy import create_engine, text
def get_order_status(order_id):
engine = create_engine('postgresql://localhost/orders')
with engine.connect() as conn:
query = text("SELECT status FROM orders WHERE id = :order_id")
result = conn.execute(query, {"order_id": order_id})
row = result.fetchone()
if row:
return {"status": row[0]}
return {"status": "not_found"}
Snippet 5: Report Generator
import sqlite3
def generate_report(table_name, date_filter):
conn = sqlite3.connect('reports.db')
cursor = conn.cursor()
allowed_tables = ['sales', 'inventory', 'customers']
if table_name not in allowed_tables:
return {"error": "Invalid table"}
query = f"SELECT * FROM {table_name} WHERE created_at > '{date_filter}'"
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"data": results}
Round 2: Mixed Patterns (Snippets 6-14)
Difficulty: Intermediate
Snippet 6: User Profile with ID Check
import sqlite3
def get_user_profile(user_id):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
if not user_id.isdigit():
return {"error": "Invalid user ID"}
query = f"SELECT username, email, bio FROM users WHERE id = {user_id}"
cursor.execute(query)
profile = cursor.fetchone()
conn.close()
if profile:
return {"username": profile[0], "email": profile[1], "bio": profile[2]}
return {"error": "User not found"}
Snippet 7: Flask Search with Sort
from flask import Flask, request
import psycopg2
app = Flask(__name__)
@app.route('/search')
def search():
query_param = request.args.get('q', '')
sort_by = request.args.get('sort', 'name')
conn = psycopg2.connect("dbname=products")
cursor = conn.cursor()
allowed_sort = ['name', 'price', 'date_added']
if sort_by not in allowed_sort:
sort_by = 'name'
query = f"SELECT * FROM products WHERE name ILIKE %s ORDER BY {sort_by}"
cursor.execute(query, (f"%{query_param}%",))
results = cursor.fetchall()
conn.close()
return {"results": results}
Snippet 8: Bulk Delete
import mysql.connector
def delete_users(user_ids: list):
conn = mysql.connector.connect(host="localhost", database="app")
cursor = conn.cursor()
placeholders = ','.join(['%s'] * len(user_ids))
query = f"DELETE FROM users WHERE id IN ({placeholders})"
cursor.execute(query, tuple(user_ids))
conn.commit()
deleted_count = cursor.rowcount
conn.close()
return {"deleted": deleted_count}
Snippet 9: Login with Remember Token
import sqlite3
import hashlib
def login(username, password, remember_token=None):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
if remember_token:
query = f"SELECT * FROM users WHERE remember_token = '{remember_token}'"
cursor.execute(query)
else:
password_hash = hashlib.sha256(password.encode()).hexdigest()
query = "SELECT * FROM users WHERE username = ? AND password_hash = ?"
cursor.execute(query, (username, password_hash))
user = cursor.fetchone()
conn.close()
if user:
return {"status": "success", "user_id": user[0]}
return {"status": "failed"}
Snippet 10: Django Connection Query
from django.db import connection
def get_orders_by_status(status, customer_id):
with connection.cursor() as cursor:
cursor.execute(
"SELECT * FROM orders WHERE status = %s AND customer_id = %s",
[status, customer_id]
)
rows = cursor.fetchall()
return [{"id": row[0], "total": row[1], "status": row[2]} for row in rows]
Snippet 11: Audit Log Search
import psycopg2
from datetime import datetime
def search_audit_logs(action_type, start_date, end_date, limit=100):
conn = psycopg2.connect("dbname=audit")
cursor = conn.cursor()
query = """
SELECT timestamp, user_id, action, details
FROM audit_logs
WHERE action = %s
AND timestamp BETWEEN %s AND %s
ORDER BY timestamp DESC
LIMIT """ + str(limit)
cursor.execute(query, (action_type, start_date, end_date))
logs = cursor.fetchall()
conn.close()
return {"logs": logs}
Snippet 12: User Registration
import sqlite3
import hashlib
def register_user(username, email, password):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
cursor.execute("SELECT 1 FROM users WHERE username = ?", (username,))
if cursor.fetchone():
conn.close()
return {"error": "Username taken"}
cursor.execute("SELECT 1 FROM users WHERE email = ?", (email,))
if cursor.fetchone():
conn.close()
return {"error": "Email already registered"}
password_hash = hashlib.sha256(password.encode()).hexdigest()
cursor.execute(
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
(username, email, password_hash)
)
conn.commit()
user_id = cursor.lastrowid
conn.close()
return {"success": True, "user_id": user_id}
Snippet 13: Dynamic Report Builder
import psycopg2
def build_report(table, columns, filters):
"""
table: string - table name
columns: list - columns to select
filters: dict - {column: value} pairs for WHERE clause
"""
conn = psycopg2.connect("dbname=reports")
cursor = conn.cursor()
allowed_tables = ['sales', 'inventory', 'employees']
allowed_columns = ['id', 'name', 'amount', 'date', 'department']
if table not in allowed_tables:
return {"error": "Invalid table"}
safe_columns = [c for c in columns if c in allowed_columns]
if not safe_columns:
return {"error": "No valid columns"}
column_str = ', '.join(safe_columns)
where_clauses = []
values = []
for col, val in filters.items():
if col in allowed_columns:
where_clauses.append(f"{col} = %s")
values.append(val)
query = f"SELECT {column_str} FROM {table}"
if where_clauses:
query += " WHERE " + " AND ".join(where_clauses)
cursor.execute(query, tuple(values))
results = cursor.fetchall()
conn.close()
return {"data": results}
Snippet 14: Password Reset
import sqlite3
import secrets
def request_password_reset(email):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
cursor.execute("SELECT id FROM users WHERE email = ?", (email,))
user = cursor.fetchone()
if not user:
conn.close()
return {"message": "If email exists, reset link sent"}
reset_token = secrets.token_urlsafe(32)
cursor.execute(
"UPDATE users SET reset_token = ?, reset_expires = datetime('now', '+1 hour') WHERE id = ?",
(reset_token, user[0])
)
conn.commit()
conn.close()
return {"message": "If email exists, reset link sent"}
def reset_password(token, new_password):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = f"SELECT id FROM users WHERE reset_token = '{token}' AND reset_expires > datetime('now')"
cursor.execute(query)
user = cursor.fetchone()
if not user:
conn.close()
return {"error": "Invalid or expired token"}
cursor.execute(
"UPDATE users SET password_hash = ?, reset_token = NULL WHERE id = ?",
(new_password, user[0])
)
conn.commit()
conn.close()
return {"success": True}
Round 3: Identifier Patterns (Snippets 15-26)
Difficulty: Intermediate-Advanced
Snippet 15: Dynamic Column Filter
import psycopg2
def filter_products(column, value):
conn = psycopg2.connect("dbname=store")
cursor = conn.cursor()
allowed_columns = ['category', 'brand', 'status']
if column not in allowed_columns:
return {"error": "Invalid filter column"}
query = f"SELECT * FROM products WHERE {column} = %s"
cursor.execute(query, (value,))
results = cursor.fetchall()
conn.close()
return {"products": results}
Snippet 16: Sort Direction
import sqlite3
def get_users_sorted(sort_dir):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = f"SELECT id, username, created_at FROM users ORDER BY created_at {sort_dir}"
cursor.execute(query)
users = cursor.fetchall()
conn.close()
return {"users": users}
Snippet 17: Numeric ID Check
import mysql.connector
def get_order(order_id):
conn = mysql.connector.connect(host="localhost", database="shop")
cursor = conn.cursor()
try:
order_id = int(order_id)
except ValueError:
return {"error": "Invalid order ID"}
query = f"SELECT * FROM orders WHERE id = {order_id}"
cursor.execute(query)
order = cursor.fetchone()
conn.close()
return {"order": order}
Snippet 18: Multiple Column Sort
import psycopg2
def search_inventory(search_term, sort_columns: list):
conn = psycopg2.connect("dbname=warehouse")
cursor = conn.cursor()
allowed_sorts = ['name', 'quantity', 'price', 'updated_at']
safe_sorts = [col for col in sort_columns if col in allowed_sorts]
if not safe_sorts:
safe_sorts = ['name']
sort_str = ', '.join(safe_sorts)
query = f"SELECT * FROM inventory WHERE name ILIKE %s ORDER BY {sort_str}"
cursor.execute(query, (f"%{search_term}%",))
results = cursor.fetchall()
conn.close()
return {"items": results}
Snippet 19: Pagination
import sqlite3
def get_posts(page, per_page):
conn = sqlite3.connect('blog.db')
cursor = conn.cursor()
offset = (page - 1) * per_page
query = f"SELECT * FROM posts ORDER BY created_at DESC LIMIT {per_page} OFFSET {offset}"
cursor.execute(query)
posts = cursor.fetchall()
conn.close()
return {"posts": posts}
Snippet 20: Schema Browser
import psycopg2
def get_table_columns(table_name):
conn = psycopg2.connect("dbname=app")
cursor = conn.cursor()
query = f"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = '{table_name}'
"""
cursor.execute(query)
columns = cursor.fetchall()
conn.close()
return {"columns": columns}
Snippet 21: Aggregate Query
import mysql.connector
def get_sales_summary(group_by_col, year):
conn = mysql.connector.connect(host="localhost", database="sales")
cursor = conn.cursor()
allowed_groups = ['product_id', 'category', 'region', 'salesperson']
if group_by_col not in allowed_groups:
group_by_col = 'category'
if not isinstance(year, int) or year < 2000 or year > 2100:
return {"error": "Invalid year"}
query = f"""
SELECT {group_by_col}, SUM(amount) as total
FROM sales
WHERE YEAR(sale_date) = {year}
GROUP BY {group_by_col}
"""
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"summary": results}
Snippet 22: User Preferences Update
import sqlite3
def update_preference(user_id, pref_key, pref_value):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
allowed_prefs = ['theme', 'language', 'timezone', 'notifications']
if pref_key not in allowed_prefs:
return {"error": "Invalid preference"}
query = f"UPDATE user_preferences SET {pref_key} = ? WHERE user_id = ?"
cursor.execute(query, (pref_value, user_id))
conn.commit()
conn.close()
return {"success": True}
Snippet 23: Log Search with Date Range
import psycopg2
def search_logs(start_date, end_date, log_level):
conn = psycopg2.connect("dbname=logging")
cursor = conn.cursor()
query = """
SELECT timestamp, level, message
FROM logs
WHERE timestamp >= %s
AND timestamp <= %s
AND level = '""" + log_level + "' ORDER BY timestamp DESC"
cursor.execute(query, (start_date, end_date))
logs = cursor.fetchall()
conn.close()
return {"logs": logs}
Snippet 24: Dynamic IN Clause
import sqlite3
def get_users_by_role(roles: list):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
allowed_roles = ['admin', 'editor', 'viewer', 'guest']
safe_roles = [r for r in roles if r in allowed_roles]
if not safe_roles:
return {"error": "No valid roles provided"}
placeholders = ','.join(['?'] * len(safe_roles))
query = f"SELECT id, username, role FROM users WHERE role IN ({placeholders})"
cursor.execute(query, tuple(safe_roles))
users = cursor.fetchall()
conn.close()
return {"users": users}
Snippet 25: Conditional Column Selection
import mysql.connector
def export_users(include_email, include_phone):
conn = mysql.connector.connect(host="localhost", database="app")
cursor = conn.cursor()
columns = ['id', 'username']
if include_email:
columns.append('email')
if include_phone:
columns.append('phone')
column_str = ', '.join(columns)
query = f"SELECT {column_str} FROM users"
cursor.execute(query)
users = cursor.fetchall()
conn.close()
return {"users": users}
Snippet 26: Boolean Filter
import psycopg2
def get_active_items(category, is_active):
conn = psycopg2.connect("dbname=inventory")
cursor = conn.cursor()
if not isinstance(is_active, bool):
return {"error": "is_active must be boolean"}
query = f"SELECT * FROM items WHERE category = %s AND active = {is_active}"
cursor.execute(query, (category,))
items = cursor.fetchall()
conn.close()
return {"items": items}
Round 4: Complex Patterns (Snippets 27-38)
Difficulty: Advanced
Snippet 27: Search with Multiple Filters
import psycopg2
def advanced_search(filters: dict):
"""
filters: {"name": "widget", "min_price": 10, "max_price": 100}
"""
conn = psycopg2.connect("dbname=store")
cursor = conn.cursor()
allowed_filters = ['name', 'min_price', 'max_price', 'category', 'brand']
where_clauses = []
values = []
for key, val in filters.items():
if key not in allowed_filters:
continue
if key == 'name':
where_clauses.append("name ILIKE %s")
values.append(f"%{val}%")
elif key == 'min_price':
where_clauses.append("price >= %s")
values.append(val)
elif key == 'max_price':
where_clauses.append("price <= %s")
values.append(val)
elif key in ('category', 'brand'):
where_clauses.append(f"{key} = %s")
values.append(val)
query = "SELECT * FROM products"
if where_clauses:
query += " WHERE " + " AND ".join(where_clauses)
cursor.execute(query, tuple(values))
results = cursor.fetchall()
conn.close()
return {"products": results}
Snippet 28: JSON Field Query
import sqlite3
def search_metadata(field_name, field_value):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = f"SELECT * FROM documents WHERE json_extract(metadata, '$.{field_name}') = ?"
cursor.execute(query, (field_value,))
results = cursor.fetchall()
conn.close()
return {"documents": results}
Snippet 29: Bulk Status Update
import mysql.connector
def update_order_status(order_ids: list, new_status):
conn = mysql.connector.connect(host="localhost", database="shop")
cursor = conn.cursor()
allowed_statuses = ['pending', 'processing', 'shipped', 'delivered', 'cancelled']
if new_status not in allowed_statuses:
return {"error": "Invalid status"}
if not all(isinstance(oid, int) for oid in order_ids):
return {"error": "Invalid order IDs"}
placeholders = ','.join(['%s'] * len(order_ids))
query = f"UPDATE orders SET status = %s WHERE id IN ({placeholders})"
cursor.execute(query, (new_status, *order_ids))
conn.commit()
conn.close()
return {"updated": cursor.rowcount}
Snippet 30: Column Alias
import psycopg2
def get_report(value_column, label):
conn = psycopg2.connect("dbname=reports")
cursor = conn.cursor()
allowed_columns = ['revenue', 'expenses', 'profit', 'units_sold']
if value_column not in allowed_columns:
return {"error": "Invalid column"}
query = f"SELECT date, {value_column} AS {label} FROM monthly_reports ORDER BY date"
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"report": results}
Snippet 31: SQLAlchemy Text Query
from sqlalchemy import create_engine, text
def find_user_by_field(field, value):
engine = create_engine('postgresql://localhost/app')
allowed_fields = ['username', 'email', 'phone']
if field not in allowed_fields:
return {"error": "Invalid field"}
with engine.connect() as conn:
query = text(f"SELECT * FROM users WHERE {field} = :value")
result = conn.execute(query, {"value": value})
user = result.fetchone()
return {"user": user}
Snippet 32: Date Range with Regex
import sqlite3
import re
def get_events(start_date, end_date):
conn = sqlite3.connect('events.db')
cursor = conn.cursor()
date_pattern = r'^\d{4}-\d{2}-\d{2}$'
if not re.match(date_pattern, start_date) or not re.match(date_pattern, end_date):
return {"error": "Invalid date format. Use YYYY-MM-DD"}
query = f"SELECT * FROM events WHERE event_date BETWEEN '{start_date}' AND '{end_date}'"
cursor.execute(query)
events = cursor.fetchall()
conn.close()
return {"events": events}
Snippet 33: Subquery Filter
import psycopg2
def get_top_customers(min_orders):
conn = psycopg2.connect("dbname=shop")
cursor = conn.cursor()
try:
min_orders = int(min_orders)
except (ValueError, TypeError):
return {"error": "Invalid minimum orders value"}
query = """
SELECT c.id, c.name, c.email
FROM customers c
WHERE (SELECT COUNT(*) FROM orders o WHERE o.customer_id = c.id) >= %s
"""
cursor.execute(query, (min_orders,))
customers = cursor.fetchall()
conn.close()
return {"customers": customers}
Snippet 34: LIKE with Match Types
import mysql.connector
def search_products(search_term, match_type):
conn = mysql.connector.connect(host="localhost", database="store")
cursor = conn.cursor()
if match_type == 'starts_with':
pattern = f"{search_term}%"
elif match_type == 'ends_with':
pattern = f"%{search_term}"
elif match_type == 'contains':
pattern = f"%{search_term}%"
else:
pattern = search_term
query = "SELECT * FROM products WHERE name LIKE %s"
cursor.execute(query, (pattern,))
results = cursor.fetchall()
conn.close()
return {"products": results}
Snippet 35: Django ORM
from django.contrib.auth.models import User
def search_users(search_term, order_by):
allowed_ordering = ['username', 'email', 'date_joined', '-username', '-email', '-date_joined']
if order_by not in allowed_ordering:
order_by = 'username'
users = User.objects.filter(
username__icontains=search_term
).order_by(order_by)
return list(users.values('id', 'username', 'email'))
Snippet 36: Table Join
import sqlite3
def get_order_details(order_id, include_customer):
conn = sqlite3.connect('shop.db')
cursor = conn.cursor()
if include_customer:
query = """
SELECT o.*, c.name, c.email
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.id = ?
"""
else:
query = "SELECT * FROM orders WHERE id = ?"
cursor.execute(query, (order_id,))
order = cursor.fetchone()
conn.close()
return {"order": order}
Snippet 37: Regex Column Validation
import psycopg2
import re
def get_column_stats(column_name):
conn = psycopg2.connect("dbname=analytics")
cursor = conn.cursor()
if not re.match(r'^[a-z_]+$', column_name):
return {"error": "Invalid column name"}
query = f"SELECT MIN({column_name}), MAX({column_name}), AVG({column_name}) FROM metrics"
cursor.execute(query)
stats = cursor.fetchone()
conn.close()
return {"min": stats[0], "max": stats[1], "avg": stats[2]}
Snippet 38: Cursor Pagination
import psycopg2
def get_paginated_items(cursor_id, limit, direction):
conn = psycopg2.connect("dbname=app")
cursor = conn.cursor()
if not isinstance(cursor_id, int):
cursor_id = 0
if not isinstance(limit, int) or limit < 1 or limit > 100:
limit = 20
if direction == 'next':
operator = '>'
order = 'ASC'
elif direction == 'prev':
operator = '<'
order = 'DESC'
else:
return {"error": "Invalid direction"}
query = f"SELECT * FROM items WHERE id {operator} %s ORDER BY id {order} LIMIT %s"
cursor.execute(query, (cursor_id, limit))
items = cursor.fetchall()
conn.close()
return {"items": items}
Round 5: Expert Patterns (Snippets 39-52)
Difficulty: Expert
Snippet 39: UUID Validation
import sqlite3
import re
def get_session(session_id):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
uuid_pattern = r'^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$'
if not re.match(uuid_pattern, session_id):
return {"error": "Invalid session ID"}
query = f"SELECT * FROM sessions WHERE id = '{session_id}'"
cursor.execute(query)
session = cursor.fetchone()
conn.close()
return {"session": session}
Snippet 40: PostgreSQL JSON Operator
import psycopg2
def search_by_tag(tag_name):
conn = psycopg2.connect("dbname=content")
cursor = conn.cursor()
query = f"SELECT * FROM articles WHERE metadata->>'tags' ILIKE '%{tag_name}%'"
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"articles": results}
Snippet 41: Numeric String Check
import mysql.connector
def get_product(product_id):
conn = mysql.connector.connect(host="localhost", database="store")
cursor = conn.cursor()
if not str(product_id).isnumeric():
return {"error": "Invalid product ID"}
query = f"SELECT * FROM products WHERE id = {product_id}"
cursor.execute(query)
product = cursor.fetchone()
conn.close()
return {"product": product}
Snippet 42: Window Function
import psycopg2
def get_ranked_sales(partition_col):
conn = psycopg2.connect("dbname=sales")
cursor = conn.cursor()
allowed_partitions = ['region', 'category', 'salesperson', 'quarter']
if partition_col not in allowed_partitions:
return {"error": "Invalid partition column"}
query = f"""
SELECT *, RANK() OVER (PARTITION BY {partition_col} ORDER BY amount DESC) as rank
FROM sales
"""
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"sales": results}
Snippet 43: COALESCE Function
import sqlite3
def get_user_display_name(user_id, default_name):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = f"SELECT COALESCE(display_name, '{default_name}') FROM users WHERE id = ?"
cursor.execute(query, (user_id,))
result = cursor.fetchone()
conn.close()
return {"display_name": result[0] if result else None}
Snippet 44: Offset Pagination with Validation
import psycopg2
def get_comments(post_id, page):
conn = psycopg2.connect("dbname=blog")
cursor = conn.cursor()
try:
page = int(page)
if page < 1:
page = 1
except (ValueError, TypeError):
page = 1
offset = (page - 1) * 20
query = f"SELECT * FROM comments WHERE post_id = %s ORDER BY created_at LIMIT 20 OFFSET {offset}"
cursor.execute(query, (post_id,))
comments = cursor.fetchall()
conn.close()
return {"comments": comments}
Snippet 45: CAST Function
import mysql.connector
def search_by_year(year_input):
conn = mysql.connector.connect(host="localhost", database="archive")
cursor = conn.cursor()
query = f"SELECT * FROM documents WHERE CAST(created_year AS CHAR) = '{year_input}'"
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"documents": results}
Snippet 46: PostgreSQL ANY Operator
import psycopg2
def find_users_with_role(role):
conn = psycopg2.connect("dbname=app")
cursor = conn.cursor()
allowed_roles = ['admin', 'editor', 'viewer', 'moderator']
if role not in allowed_roles:
return {"error": "Invalid role"}
query = f"SELECT * FROM users WHERE '{role}' = ANY(roles)"
cursor.execute(query)
users = cursor.fetchall()
conn.close()
return {"users": users}
Snippet 47: INTERVAL Expression
import psycopg2
def get_recent_activity(hours_ago):
conn = psycopg2.connect("dbname=activity")
cursor = conn.cursor()
if not isinstance(hours_ago, int) or hours_ago < 1 or hours_ago > 168:
hours_ago = 24
query = f"SELECT * FROM activity_log WHERE timestamp > NOW() - INTERVAL '{hours_ago} hours'"
cursor.execute(query)
activities = cursor.fetchall()
conn.close()
return {"activities": activities}
Snippet 48: Simple Parameterization
import sqlite3
def search_full_name(first_name, last_name):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = "SELECT * FROM users WHERE first_name = ? AND last_name = ?"
cursor.execute(query, (first_name, last_name))
users = cursor.fetchall()
conn.close()
return {"users": users}
Snippet 49: Table Function with isidentifier
import psycopg2
def get_table_size(table_name):
conn = psycopg2.connect("dbname=app")
cursor = conn.cursor()
if not table_name.isidentifier():
return {"error": "Invalid table name"}
query = f"SELECT pg_size_pretty(pg_total_relation_size('{table_name}'))"
cursor.execute(query)
size = cursor.fetchone()
conn.close()
return {"size": size[0]}
Snippet 50: Float Validation
import mysql.connector
def get_products_under_price(max_price):
conn = mysql.connector.connect(host="localhost", database="store")
cursor = conn.cursor()
try:
max_price = float(max_price)
except (ValueError, TypeError):
return {"error": "Invalid price"}
query = f"SELECT * FROM products WHERE price <= {max_price}"
cursor.execute(query)
products = cursor.fetchall()
conn.close()
return {"products": products}
Snippet 51: EXISTS Subquery
import sqlite3
def get_authors_with_posts(min_posts):
conn = sqlite3.connect('blog.db')
cursor = conn.cursor()
try:
min_posts = int(min_posts)
except ValueError:
return {"error": "Invalid minimum posts"}
query = """
SELECT * FROM authors a
WHERE EXISTS (
SELECT 1 FROM posts p
WHERE p.author_id = a.id
GROUP BY p.author_id
HAVING COUNT(*) >= ?
)
"""
cursor.execute(query, (min_posts,))
authors = cursor.fetchall()
conn.close()
return {"authors": authors}
Snippet 52: Schema-Qualified Table
import psycopg2
def get_data(schema_name, limit):
conn = psycopg2.connect("dbname=warehouse")
cursor = conn.cursor()
allowed_schemas = ['public', 'staging', 'archive']
if schema_name not in allowed_schemas:
return {"error": "Invalid schema"}
if not isinstance(limit, int) or limit < 1:
limit = 100
query = f"SELECT * FROM {schema_name}.reports ORDER BY created_at DESC LIMIT {limit}"
cursor.execute(query)
data = cursor.fetchall()
conn.close()
return {"data": data}
π Answer Key
Click to reveal answers
Round 1 Answers
| Snippet | Vulnerable? | Reason |
|---|---|---|
| 1 | β Yes | f-string interpolation of username and password
|
| 2 | β No | Properly parameterized with %s
|
| 3 | β Yes | String concatenation of email
|
| 4 | β No | SQLAlchemy :param parameterization |
| 5 | β Yes |
date_filter interpolated via f-string (table is allowlisted) |
Snippet 1 Fix
# Vulnerable
query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
cursor.execute(query)
# Fixed
query = "SELECT * FROM users WHERE username = ? AND password = ?"
cursor.execute(query, (username, password))
Snippet 3 Fix
# Vulnerable
query = "SELECT 1 FROM users WHERE email = '" + email + "' LIMIT 1"
cursor.execute(query)
# Fixed
query = "SELECT 1 FROM users WHERE email = %s LIMIT 1"
cursor.execute(query, (email,))
Snippet 5 Fix
# Vulnerable
query = f"SELECT * FROM {table_name} WHERE created_at > '{date_filter}'"
cursor.execute(query)
# Fixed (table_name already allowlisted, just parameterize date_filter)
query = f"SELECT * FROM {table_name} WHERE created_at > ?"
cursor.execute(query, (date_filter,))
Round 2 Answers
| Snippet | Vulnerable? | Reason |
|---|---|---|
| 6 | β No |
.isdigit() prevents non-numeric input |
| 7 | β No |
sort_by allowlisted, query_param parameterized |
| 8 | β No | Placeholders generated safely, values parameterized |
| 9 | β Yes |
remember_token branch uses f-string |
| 10 | β No | Django parameterization with %s
|
| 11 | β Yes |
limit concatenated via str()
|
| 12 | β No | All queries use ? parameterization |
| 13 | β No | All identifiers allowlisted, values parameterized |
| 14 | β Yes |
reset_password function uses f-string for token
|
Snippet 9 Fix
# Vulnerable
if remember_token:
query = f"SELECT * FROM users WHERE remember_token = '{remember_token}'"
cursor.execute(query)
# Fixed
if remember_token:
query = "SELECT * FROM users WHERE remember_token = ?"
cursor.execute(query, (remember_token,))
Snippet 11 Fix
# Vulnerable
query = """
SELECT timestamp, user_id, action, details
FROM audit_logs
WHERE action = %s
AND timestamp BETWEEN %s AND %s
ORDER BY timestamp DESC
LIMIT """ + str(limit)
cursor.execute(query, (action_type, start_date, end_date))
# Fixed
query = """
SELECT timestamp, user_id, action, details
FROM audit_logs
WHERE action = %s
AND timestamp BETWEEN %s AND %s
ORDER BY timestamp DESC
LIMIT %s
"""
cursor.execute(query, (action_type, start_date, end_date, limit))
Snippet 14 Fix (reset_password function)
# Vulnerable
query = f"SELECT id FROM users WHERE reset_token = '{token}' AND reset_expires > datetime('now')"
cursor.execute(query)
# Fixed
query = "SELECT id FROM users WHERE reset_token = ? AND reset_expires > datetime('now')"
cursor.execute(query, (token,))
Round 3 Answers
| Snippet | Vulnerable? | Reason |
|---|---|---|
| 15 | β No |
column allowlisted, value parameterized |
| 16 | β Yes |
sort_dir not validated |
| 17 | β No |
int() conversion prevents injection |
| 18 | β No | Columns filtered against allowlist |
| 19 | β Yes |
page and per_page not validated |
| 20 | β Yes |
table_name interpolated without validation |
| 21 | β No |
group_by_col allowlisted, year validated with isinstance(int)
|
| 22 | β No |
pref_key allowlisted, values parameterized |
| 23 | β Yes |
log_level concatenated |
| 24 | β No | Roles filtered against allowlist, then parameterized |
| 25 | β No | Boolean flags control hardcoded columns |
| 26 | β No |
isinstance(bool) only allows True/False |
Snippet 16 Fix
# Vulnerable
query = f"SELECT id, username, created_at FROM users ORDER BY created_at {sort_dir}"
cursor.execute(query)
# Fixed (allowlist validation β cannot parameterize ASC/DESC)
allowed_directions = ['ASC', 'DESC']
if sort_dir.upper() not in allowed_directions:
sort_dir = 'ASC'
query = f"SELECT id, username, created_at FROM users ORDER BY created_at {sort_dir.upper()}"
cursor.execute(query)
Snippet 19 Fix
# Vulnerable
offset = (page - 1) * per_page
query = f"SELECT * FROM posts ORDER BY created_at DESC LIMIT {per_page} OFFSET {offset}"
cursor.execute(query)
# Fixed (add validation + parameterization)
try:
page = int(page)
per_page = int(per_page)
if page < 1:
page = 1
if per_page < 1 or per_page > 100:
per_page = 20
except (ValueError, TypeError):
page, per_page = 1, 20
offset = (page - 1) * per_page
query = "SELECT * FROM posts ORDER BY created_at DESC LIMIT ? OFFSET ?"
cursor.execute(query, (per_page, offset))
Snippet 20 Fix
# Vulnerable
query = f"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = '{table_name}'
"""
cursor.execute(query)
# Fixed
query = """
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = %s
"""
cursor.execute(query, (table_name,))
Snippet 23 Fix
# Vulnerable
query = """
SELECT timestamp, level, message
FROM logs
WHERE timestamp >= %s
AND timestamp <= %s
AND level = '""" + log_level + "' ORDER BY timestamp DESC"
cursor.execute(query, (start_date, end_date))
# Fixed
query = """
SELECT timestamp, level, message
FROM logs
WHERE timestamp >= %s
AND timestamp <= %s
AND level = %s
ORDER BY timestamp DESC
"""
cursor.execute(query, (start_date, end_date, log_level))
Round 4 Answers
| Snippet | Vulnerable? | Reason |
|---|---|---|
| 27 | β No | Keys allowlisted, values parameterized |
| 28 | β Yes |
field_name in JSON path not validated |
| 29 | β No | Status allowlisted, IDs type-checked |
| 30 | β Yes |
label (alias) not validated |
| 31 | β No |
field allowlisted, value parameterized |
| 32 | β No | Strict regex allows only YYYY-MM-DD
|
| 33 | β No |
int() + parameterization |
| 34 | β No | Pattern built safely, parameterized |
| 35 | β No | Django ORM + allowlisted ordering |
| 36 | β No | Boolean selects hardcoded queries |
| 37 | β No | Strict regex ^[a-z_]+$
|
| 38 | β No | Type checks + hardcoded operators |
Snippet 28 Fix
# Vulnerable
query = f"SELECT * FROM documents WHERE json_extract(metadata, '$.{field_name}') = ?"
cursor.execute(query, (field_value,))
# Fixed (allowlist validation β cannot parameterize JSON paths)
allowed_fields = ['author', 'category', 'status', 'priority']
if field_name not in allowed_fields:
return {"error": "Invalid field name"}
query = f"SELECT * FROM documents WHERE json_extract(metadata, '$.{field_name}') = ?"
cursor.execute(query, (field_value,))
Snippet 30 Fix
# Vulnerable
query = f"SELECT date, {value_column} AS {label} FROM monthly_reports ORDER BY date"
cursor.execute(query)
# Fixed (allowlist validation β cannot parameterize aliases)
allowed_columns = ['revenue', 'expenses', 'profit', 'units_sold']
allowed_labels = ['total', 'amount', 'value', 'metric', 'result']
if value_column not in allowed_columns:
return {"error": "Invalid column"}
if label not in allowed_labels:
return {"error": "Invalid label"}
query = f"SELECT date, {value_column} AS {label} FROM monthly_reports ORDER BY date"
cursor.execute(query)
Round 5 Answers
| Snippet | Vulnerable? | Reason |
|---|---|---|
| 39 | β No | UUID regex only allows hex + hyphens |
| 40 | β Yes |
tag_name in ILIKE not parameterized |
| 41 | β No |
.isnumeric() prevents non-numeric |
| 42 | β No |
partition_col allowlisted |
| 43 | β Yes |
default_name in COALESCE not parameterized |
| 44 | β No |
int() with error handling |
| 45 | β Yes |
year_input in CAST not parameterized |
| 46 | β No |
role allowlisted |
| 47 | β No |
isinstance(int) + range check |
| 48 | β No | Proper parameterization |
| 49 | β No |
.isidentifier() allows only valid identifiers |
| 50 | β No |
float() with error handling |
| 51 | β No |
int() + parameterization |
| 52 | β No | Schema allowlisted, limit type-checked |
Snippet 40 Fix
# Vulnerable
query = f"SELECT * FROM articles WHERE metadata->>'tags' ILIKE '%{tag_name}%'"
cursor.execute(query)
# Fixed (parameterize with wildcards in parameter value)
query = "SELECT * FROM articles WHERE metadata->>'tags' ILIKE %s"
cursor.execute(query, (f"%{tag_name}%",))
Snippet 43 Fix
# Vulnerable
query = f"SELECT COALESCE(display_name, '{default_name}') FROM users WHERE id = ?"
cursor.execute(query, (user_id,))
# Fixed (parameterize both values)
query = "SELECT COALESCE(display_name, ?) FROM users WHERE id = ?"
cursor.execute(query, (default_name, user_id))
Snippet 45 Fix
# Vulnerable
query = f"SELECT * FROM documents WHERE CAST(created_year AS CHAR) = '{year_input}'"
cursor.execute(query)
# Fixed
query = "SELECT * FROM documents WHERE CAST(created_year AS CHAR) = %s"
cursor.execute(query, (year_input,))
π Score Yourself (Rounds 1-5)
| Score | Level |
|---|---|
| 0-20 | Beginner β Review parameterization basics |
| 21-35 | Intermediate β Practice identifier vs value distinction |
| 36-45 | Advanced β Focus on subtle patterns |
| 46-52 | Expert β Ready for Round 6! |
Round 6: Advanced Exploitation Patterns (Snippets 53-66)
Difficulty: Expert
Snippet 53: Django Raw SQL Query
from django.contrib.auth.models import User
def get_users_by_role(role_name):
sql = "SELECT id, username FROM auth_user WHERE first_name = '%s'" % role_name
users = User.objects.raw(sql)
return [{"id": u.id, "username": u.username} for u in users]
Snippet 54: Second-Order SQL Injection Storage
import sqlite3
def update_user_preferences(user_id, display_name):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
# First query - stores user input
query1 = "UPDATE users SET display_name = ? WHERE id = ?"
cursor.execute(query1, (display_name, user_id))
conn.commit()
# Second query - retrieves and uses stored data
query2 = "SELECT display_name FROM users WHERE id = ?"
cursor.execute(query2, (user_id,))
name = cursor.fetchone()[0]
# Third query - uses retrieved data in dynamic SQL
query3 = f"INSERT INTO audit_log (action, details) VALUES ('name_change', 'New name: {name}')"
cursor.execute(query3)
conn.commit()
conn.close()
return {"status": "updated"}
Snippet 55: PostgreSQL Array Operations
import psycopg2
def search_by_tags(tag_list: list):
conn = psycopg2.connect("dbname=blog")
cursor = conn.cursor()
# Convert Python list to PostgreSQL array format
tags_array = '{' + ','.join(tag_list) + '}'
query = f"SELECT * FROM posts WHERE tags && ARRAY{tags_array}::text[]"
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return results
Snippet 56: Time-Based Attack Context
import mysql.connector
import time
def check_user_exists_timed(username):
conn = mysql.connector.connect(host="localhost", database="app")
cursor = conn.cursor()
# Validate username format first
if not username.replace('_', '').replace('-', '').isalnum():
return {"error": "Invalid username format"}
query = f"SELECT COUNT(*) FROM users WHERE username = '{username}'"
start = time.time()
cursor.execute(query)
result = cursor.fetchone()[0]
elapsed = time.time() - start
conn.close()
return {"exists": result > 0, "query_time": elapsed}
Snippet 57: Django Database Connection Query
from django.db import connection
def delete_expired_sessions(session_type, days_old):
sql = f"""DELETE FROM auth_session
WHERE session_type = %s
AND expire_date < DATE_SUB(NOW(), INTERVAL {days_old} DAY)"""
with connection.cursor() as cursor:
cursor.execute(sql, [session_type])
deleted = cursor.rowcount
return {"deleted": deleted}
Snippet 58: PostgreSQL JSONB Query
import psycopg2
def search_metadata(field_name, field_value):
conn = psycopg2.connect("dbname=documents")
cursor = conn.cursor()
allowed_fields = ['status', 'priority', 'category', 'author']
if field_name not in allowed_fields:
return {"error": "Invalid field"}
query = f"SELECT * FROM documents WHERE metadata->>'{field_name}' = %s"
cursor.execute(query, (field_value,))
results = cursor.fetchall()
conn.close()
return results
Snippet 59: Stored Procedure Call
import mysql.connector
def generate_report_via_sp(report_type, start_date, end_date):
conn = mysql.connector.connect(host="localhost", database="analytics")
cursor = conn.cursor()
allowed_types = ['sales', 'inventory', 'users', 'transactions']
if report_type not in allowed_types:
return {"error": "Invalid report type"}
proc_name = f"generate_{report_type}_report"
# Call stored procedure
cursor.callproc(proc_name, [start_date, end_date])
results = []
for result in cursor.stored_results():
results.extend(result.fetchall())
conn.close()
return {"data": results}
Snippet 60: Error-Based Blind SQLi Context
import sqlite3
def validate_license_key(license_key):
conn = sqlite3.connect('licenses.db')
cursor = conn.cursor()
# Basic format validation
if len(license_key) != 36 or license_key.count('-') != 4:
return {"valid": False, "error": "Invalid format"}
try:
query = f"SELECT 1 FROM licenses WHERE key = '{license_key}' AND status = 'active'"
cursor.execute(query)
result = cursor.fetchone()
conn.close()
if result:
return {"valid": True}
return {"valid": False, "error": "Invalid or inactive license"}
except sqlite3.Error as e:
conn.close()
return {"valid": False, "error": str(e)}
Snippet 61: Complex Subquery Pattern
import psycopg2
def get_top_customers(metric, limit):
conn = psycopg2.connect("dbname=sales")
cursor = conn.cursor()
allowed_metrics = ['revenue', 'orders', 'items_purchased']
if metric not in allowed_metrics:
metric = 'revenue'
try:
limit_val = int(limit)
if limit_val < 1 or limit_val > 100:
limit_val = 10
except (ValueError, TypeError):
limit_val = 10
query = f"""
SELECT customer_id, SUM({metric}) as total
FROM (
SELECT customer_id, {metric}
FROM orders
WHERE status = 'completed'
) subquery
GROUP BY customer_id
ORDER BY total DESC
LIMIT ?
"""
cursor.execute(query, (limit_val,))
results = cursor.fetchall()
conn.close()
return results
Snippet 62: Batch Update with Transaction
import mysql.connector
def batch_update_prices(price_updates: dict):
"""
price_updates: {"product_123": 29.99, "product_456": 49.99}
"""
conn = mysql.connector.connect(host="localhost", database="inventory")
cursor = conn.cursor()
try:
conn.start_transaction()
for product_id, new_price in price_updates.items():
# Validate product_id format
if not product_id.startswith('product_'):
continue
# Extract numeric ID
numeric_id = product_id.replace('product_', '')
if not numeric_id.isdigit():
continue
query = f"UPDATE products SET price = %s WHERE id = {numeric_id}"
cursor.execute(query, (new_price,))
conn.commit()
updated = cursor.rowcount
except Exception as e:
conn.rollback()
return {"error": str(e)}
finally:
conn.close()
return {"updated": updated}
Snippet 63: Dynamic Table Selection with CTE
import psycopg2
def get_monthly_summary(table_suffix, year, month):
conn = psycopg2.connect("dbname=analytics")
cursor = conn.cursor()
# Validate year and month
try:
year = int(year)
month = int(month)
if not (2020 <= year <= 2030) or not (1 <= month <= 12):
return {"error": "Invalid date range"}
except (ValueError, TypeError):
return {"error": "Invalid date format"}
# Validate table suffix
allowed_suffixes = ['revenue', 'expenses', 'profit']
if table_suffix not in allowed_suffixes:
return {"error": "Invalid table"}
table_name = f"monthly_{table_suffix}"
query = f"""
WITH monthly_data AS (
SELECT * FROM {table_name}
WHERE year = %s AND month = %s
)
SELECT SUM(amount) as total FROM monthly_data
"""
cursor.execute(query, (year, month))
result = cursor.fetchone()
conn.close()
return {"total": result[0] if result else 0}
Snippet 64: XML Path Context
import sqlite3
def query_xml_field(doc_id, xpath_expr):
conn = sqlite3.connect('documents.db')
cursor = conn.cursor()
# Validate doc_id is numeric
if not str(doc_id).isdigit():
return {"error": "Invalid document ID"}
# Allowlist common XPath expressions
allowed_paths = [
'//title',
'//author',
'//date',
'//content',
'/document/metadata/tags'
]
if xpath_expr not in allowed_paths:
return {"error": "Invalid XPath expression"}
query = f"SELECT xpath('{xpath_expr}', xml_content) FROM documents WHERE id = ?"
cursor.execute(query, (doc_id,))
result = cursor.fetchone()
conn.close()
return {"data": result[0] if result else None}
Snippet 65: NoSQL-Style Operators in SQL
import psycopg2
def find_users_by_criteria(filter_op, field, value):
conn = psycopg2.connect("dbname=users")
cursor = conn.cursor()
# Map NoSQL-style operators to SQL
operator_map = {
'$eq': '=',
'$ne': '!=',
'$gt': '>',
'$lt': '<',
'$gte': '>=',
'$lte': '<='
}
if filter_op not in operator_map:
return {"error": "Invalid operator"}
sql_op = operator_map[filter_op]
allowed_fields = ['age', 'created_at', 'last_login', 'points']
if field not in allowed_fields:
return {"error": "Invalid field"}
query = f"SELECT id, username FROM users WHERE {field} {sql_op} %s"
cursor.execute(query, (value,))
results = cursor.fetchall()
conn.close()
return results
Snippet 66: SQLAlchemy Text with Named Parameters
from sqlalchemy import create_engine, text
def advanced_search(filters: dict):
"""
filters: {
"status": "active",
"min_price": 100,
"sort_by": "price",
"sort_dir": "DESC"
}
"""
engine = create_engine('postgresql://localhost/products')
allowed_sort_cols = ['price', 'name', 'created_at', 'stock']
sort_col = filters.get('sort_by', 'name')
if sort_col not in allowed_sort_cols:
sort_col = 'name'
allowed_directions = ['ASC', 'DESC']
sort_dir = filters.get('sort_dir', 'ASC').upper()
if sort_dir not in allowed_directions:
sort_dir = 'ASC'
base_query = f"""
SELECT * FROM products
WHERE status = :status
AND price >= :min_price
ORDER BY {sort_col} {sort_dir}
"""
with engine.connect() as conn:
result = conn.execute(
text(base_query),
{
"status": filters.get("status", "active"),
"min_price": filters.get("min_price", 0)
}
)
rows = result.fetchall()
return [dict(row._mapping) for row in rows]
π‘ Round 6 Answers & Fixes (Click to Expand)
Round 6 Answers
| Snippet | Vulnerable? | Reason |
|---|---|---|
| 53 | β Yes | Django raw() with string interpolation |
| 54 | β Yes | Second-order injection in audit log query |
| 55 | β Yes | Array literal not parameterized |
| 56 | β Yes | Time-based context with string concatenation |
| 57 | β Yes |
INTERVAL {days_old} DAY not parameterized |
| 58 | β No | JSONB key allowlisted, value parameterized |
| 59 | β Yes | Stored procedure name dynamically constructed |
| 60 | β Yes | Error messages leak SQL details enabling blind injection |
| 61 | β No | Metric allowlisted, limit validated and parameterized |
| 62 | β Yes |
numeric_id interpolated into WHERE clause |
| 63 | β No | Table allowlisted, parameters validated and parameterized |
| 64 | β No | XPath expression allowlisted, doc_id parameterized |
| 65 | β No | Operators mapped via allowlist, field allowlisted, value parameterized |
| 66 | β No | Sort columns/directions allowlisted, values use named parameters |
Snippet 53 Fix (Django raw SQL)
# Vulnerable
sql = "SELECT id, username FROM auth_user WHERE first_name = '%s'" % role_name
users = User.objects.raw(sql)
# Fixed (use parameterization with list)
sql = "SELECT id, username FROM auth_user WHERE first_name = %s"
users = User.objects.raw(sql, [role_name])
Source: Full Stack Python Security, pp. 205-207 β Django's raw() method requires manual parameterization by passing parameter values as a list. Never use string interpolation (% operator) with raw SQL.1
Snippet 54 Fix (Second-order SQL injection)
# Vulnerable (third query uses unescaped stored data)
query3 = f"INSERT INTO audit_log (action, details) VALUES ('name_change', 'New name: {name}')"
cursor.execute(query3)
# Fixed (parameterize all queries, even with previously stored data)
query3 = "INSERT INTO audit_log (action, details) VALUES ('name_change', ?)"
cursor.execute(query3, (f'New name: {name}',))
Critical Insight: Second-order SQL injection occurs when data stored safely (via parameterization) is later retrieved and used unsafely in a subsequent query. Always parameterize queries even when using data from your own database.2
Snippet 55 Fix (PostgreSQL array operations)
# Vulnerable
tags_array = '{' + ','.join(tag_list) + '}'
query = f"SELECT * FROM posts WHERE tags && ARRAY{tags_array}::text[]"
cursor.execute(query)
# Fixed (use ANY with parameterized array)
query = "SELECT * FROM posts WHERE tags && %s"
cursor.execute(query, (tag_list,))
# Alternative: Use psycopg2's array adaptation
from psycopg2.extensions import adapt
query = "SELECT * FROM posts WHERE tags && %s::text[]"
cursor.execute(query, (tag_list,))
Source: PostgreSQL array operations require special handling. Use psycopg2's built-in array adaptation or the ANY operator with parameterization.3
Snippet 56 Fix (Time-based attack context)
# Vulnerable (even with validation, string concatenation enables time-based attacks)
query = f"SELECT COUNT(*) FROM users WHERE username = '{username}'"
cursor.execute(query)
# Fixed
query = "SELECT COUNT(*) FROM users WHERE username = %s"
cursor.execute(query, (username,))
Attack Vector: An attacker could inject admin' AND SLEEP(5)-- - to determine if user 'admin' exists based on response time. Input validation doesn't prevent time-based blind SQL injection β only parameterization does.4
Snippet 57 Fix (Django database connection)
# Vulnerable
sql = f"""DELETE FROM auth_session
WHERE session_type = %s
AND expire_date < DATE_SUB(NOW(), INTERVAL {days_old} DAY)"""
cursor.execute(sql, [session_type])
# Fixed (parameterize ALL dynamic values)
sql = """DELETE FROM auth_session
WHERE session_type = %s
AND expire_date < DATE_SUB(NOW(), INTERVAL %s DAY)"""
cursor.execute(sql, [session_type, days_old])
Source: Full Stack Python Security, pp. 206-207 β Django's connection.cursor() requires the params argument for parameterization. Never mix f-strings with partial parameterization.1
Snippet 59 Fix (Stored procedure call)
# Vulnerable (procedure name dynamically constructed)
proc_name = f"generate_{report_type}_report"
cursor.callproc(proc_name, [start_date, end_date])
# Fixed (explicit procedure mapping)
procedure_map = {
'sales': 'generate_sales_report',
'inventory': 'generate_inventory_report',
'users': 'generate_users_report',
'transactions': 'generate_transactions_report'
}
if report_type not in procedure_map:
return {"error": "Invalid report type"}
proc_name = procedure_map[report_type]
cursor.callproc(proc_name, [start_date, end_date])
Critical: Stored procedure names cannot be parameterized. Use explicit string mapping from allowlist to procedure name. Never construct procedure names dynamically.5
Snippet 60 Fix (Error-based blind injection)
# Vulnerable (returns detailed SQL errors)
except sqlite3.Error as e:
conn.close()
return {"valid": False, "error": str(e)}
# Fixed (use generic error messages)
except sqlite3.Error as e:
conn.close()
# Log detailed error server-side only
logger.error(f"License validation error: {e}")
# Return generic message to client
return {"valid": False, "error": "License validation failed"}
Critical: Even with parameterized queries, verbose error messages enable blind SQL injection by revealing query structure. Always return generic errors to clients; log details server-side.6
Snippet 62 Fix (Batch update)
# Vulnerable
query = f"UPDATE products SET price = %s WHERE id = {numeric_id}"
cursor.execute(query, (new_price,))
# Fixed (parameterize both values)
query = "UPDATE products SET price = %s WHERE id = %s"
cursor.execute(query, (new_price, numeric_id))
Common Mistake: Developers often validate input (.isdigit()) and assume this makes direct interpolation safe. Validation is insufficient β always parameterize. An attacker could manipulate the request to bypass validation logic before it reaches this function.2
π Advanced Security Lessons from Round 6
Second-Order SQL Injection (Snippet 54)
Real-World Example: The 2014 eBay data breach involved second-order SQL injection where attackers stored malicious payloads in user profiles, which were later executed when administrators viewed audit logs.7
Defense: Treat ALL data as untrusted, even data from your own database. Previous parameterization doesn't make data "safe" for future queries.
Time-Based Blind SQL Injection (Snippet 56)
Real-World Example: In 2017, security researchers demonstrated time-based blind SQL injection in major telecom provider APIs, extracting entire customer databases using SLEEP() functions to map database structure.8
Defense: Parameterization prevents time-based attacks by ensuring user input can never be interpreted as SQL commands, even when validation appears sufficient.
Error-Based Blind SQL Injection (Snippet 60)
Real-World Example: The 2013 Yahoo! data breach began with error-based SQL injection where attackers used verbose SQL error messages to map database schema before exfiltration.9
Defense Strategy:
- β Always parameterize queries
- β Return generic error messages to clients
- β Log detailed errors server-side only
- β Implement rate limiting on error responses
- β Monitor for repeated error-triggering patterns
Stored Procedures (Snippet 59)
Critical Rule: Stored procedure names are SQL identifiers and cannot be parameterized. Any dynamic construction of procedure names requires strict allowlist validation.
PostgreSQL-Specific Patterns (Snippets 55, 58, 63)
PostgreSQL's advanced features (arrays, JSONB, CTEs) create unique injection vectors:
- Arrays: Use psycopg2's built-in array adaptation
-
JSONB operators:
->,->>keys must be allowlisted (cannot parameterize) - CTEs: Table names must be allowlisted; only data values can be parameterized
π Extended References
Primary Sources for Round 6:
-
Full Stack Python Security by Dennis Byrne (Manning, 2021)
- Chapter 13.7: "SQL Injection" (pp. 205-207)
- Django
raw()method parameterization - Database connection query safety
- Error handling best practices
-
API Security in Action by Neil Madden (Manning, 2020)
- Chapter 2.4: "Preventing Injection Attacks" (pp. 42-44)
- Prepared statements vs string escaping
- Second-order injection prevention
- Input validation limitations
-
Hacking APIs by Corey Ball (No Starch Press, 2022)
- Chapter 12: "Injection" (pp. 254-258)
- Blind SQL injection techniques
- Time-based and error-based attacks
- NoSQL injection patterns
- SQLmap exploitation methodology
-
Secure by Design by Johnsson, Deogun, Sawano (Manning, 2019)
- Chapter 5: "Domain Primitives"
- Validation vs parameterization trade-offs
- Secure-by-default API design
-
PortSwigger Web Security Academy
- SQL Injection Labs: Blind SQL injection module
- Time delays and information retrieval
- Out-of-band techniques
- Filter bypass via XML encoding
Additional Research:
-
OWASP: "Blind SQL Injection" (https://owasp.org/www-community/attacks/Blind_SQL_Injection)
- Time-based attack patterns
- Error-based exploitation
- Boolean-based techniques
-
CWE-89: "Improper Neutralization of Special Elements used in an SQL Command"
- Common Weakness Enumeration database
- Real-world vulnerability examples
- Mitigation strategies
-
PostgreSQL Documentation: "Arrays" (https://www.postgresql.org/docs/current/arrays.html)
- Array type handling in queries
- Safe array parameter binding
-
Django Documentation: "Performing raw SQL queries" (https://docs.djangoproject.com/en/stable/topics/db/sql/)
- Official guidance on
raw()method -
connection.cursor()parameterization
- Official guidance on
π― Final Mastery Check
If you scored 62-66 correctly (including fixes), you've mastered:
β
Second-order injection β Understanding that database-stored data isn't inherently safe
β
Blind SQL injection β Time-based and error-based attack patterns
β
Framework-specific patterns β Django raw SQL, PostgreSQL arrays/JSONB
β
Stored procedures β Identifier parameterization limitations
β
Defensive error handling β Preventing information leakage
β
Advanced SQL features β CTEs, subqueries, XML/XPath contexts
You're ready for:
- π’ Senior Security Engineer interviews
- π Production code review responsibilities
- π‘οΈ SAST/DAST tool customization
- π Security architecture design documents
- π OSWE certification exam
π What's Next?
Continue Your Training
Week 5-6: Cross-Site Scripting (XSS) Fundamentals
- Reflected XSS pattern detection
- Stored XSS vulnerability analysis
- DOM-based XSS in modern JavaScript
- Content Security Policy (CSP) implementation
Week 7-8: Authentication & Session Security
- JWT vulnerability patterns
- OAuth 2.0 misconfigurations
- Session fixation attacks
- CSRF token validation
Week 11-24: System Design for Security Interviews
- Designing secure authentication systems
- Rate limiting architectures
- API gateway security patterns
- Threat modeling with STRIDE methodology
Join the Community
β Star the repo: github.com/fosres/SecEng-Exercises
What's coming:
- π 100+ additional SQL injection exercises with real CVE patterns
- π‘οΈ XSS exercise suite (reflected, stored, DOM-based)
- π Authentication security challenges (JWT, OAuth, SAML)
- π‘ API security exercises (rate limiting, IDOR, BOLA)
- π§ͺ 100+ test cases per exercise with detailed explanations
- π Companion blog posts for every challenge
Share Your Progress
Did you complete all 66 exercises? Share your score on:
- π¦ Twitter: Tag @fosres with
#SQLInjectionMastery - πΌ LinkedIn: Share your completion badge
- π Dev.to: Write about what you learned
Building secure software one exercise at a time. Follow @fosres for weekly security challenges.

Top comments (0)