-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
190 lines (169 loc) · 6.2 KB
/
main.py
File metadata and controls
190 lines (169 loc) · 6.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import asyncio
import uuid
from datetime import date
from example.db.mydb import MYDB_POOL
from example.db.mydb import MydbTaskPriority
from example.db.mydb import MydbTaskStatus
from example.db.mydb import mydb_listen_session
from example.db.mydb import mydb_notify
from example.db.mydb import mydb_sql
from example.db.mydb import mydb_transaction
from example.models import ProjectSettings
from example.models import TaskMetadata
from iron_sql import NoRowsError
async def main() -> None:
user_id = uuid.uuid4()
project_id = uuid.uuid4()
# --- Transaction: create user + project atomically ---
async with mydb_transaction():
await mydb_sql(
"""
INSERT INTO users (id, username, email)
VALUES (@id, @username, @email)
"""
).execute(
id=user_id,
username="alice",
email="alice@example.com",
)
await mydb_sql(
"""
INSERT INTO projects (id, name, owner_id, settings)
VALUES (@id, @name, @owner_id, @settings)
"""
).execute(
id=project_id,
name="iron_sql",
owner_id=user_id,
settings=ProjectSettings(
default_priority="high",
enable_notifications=False,
),
)
# --- Create tasks with enums, optional params, JSON metadata ---
task1_id = uuid.uuid4()
await mydb_sql(
"""
INSERT INTO tasks (id, project_id, title, priority, assignee_id, metadata, due_date)
VALUES (@id, @project_id, @title, @priority, @assignee_id?, @metadata?, @due_date?)
"""
).execute(
id=task1_id,
project_id=project_id,
title="Set up CI",
priority=MydbTaskPriority.HIGH,
assignee_id=user_id,
metadata=TaskMetadata(tags=["infra", "ci"], estimated_hours=4.0),
due_date=date(2026, 3, 1),
)
await mydb_sql(
"""
INSERT INTO tasks (id, project_id, title, priority, assignee_id, metadata, due_date)
VALUES (@id, @project_id, @title, @priority, @assignee_id?, @metadata?, @due_date?)
"""
).execute(
id=uuid.uuid4(),
project_id=project_id,
title="Write README",
priority=MydbTaskPriority.MEDIUM,
assignee_id=None,
metadata=None,
due_date=None,
)
# --- Update task status with enum ---
await mydb_sql("UPDATE tasks SET status = @status WHERE id = @task_id").execute(
task_id=task1_id,
status=MydbTaskStatus.IN_PROGRESS,
)
# --- List all users ---
users = await mydb_sql(
"SELECT id, username, email, created_at FROM users ORDER BY created_at"
).query_all_rows()
print(f"Users: {len(users)}")
# --- Get single user ---
user = await mydb_sql(
"SELECT id, username, email, created_at FROM users WHERE id = @user_id"
).query_single_row(user_id=user_id)
print(f"User: {user.username} ({user.email})")
# --- List tasks with optional status filter ---
all_tasks = await mydb_sql(
"""
SELECT id, project_id, assignee_id, title, status, priority, metadata, due_date, created_at
FROM tasks
WHERE project_id = @project_id AND (sqlc.narg('status')::task_status IS NULL OR status = @status?)
"""
).query_all_rows(
project_id=project_id,
status=None,
)
print(f"All tasks: {len(all_tasks)}")
open_tasks = await mydb_sql(
"""
SELECT id, project_id, assignee_id, title, status, priority, metadata, due_date, created_at
FROM tasks
WHERE project_id = @project_id AND (sqlc.narg('status')::task_status IS NULL OR status = @status?)
"""
).query_all_rows(
project_id=project_id,
status=MydbTaskStatus.OPEN,
)
print(f"Open tasks: {len(open_tasks)}")
# --- Custom row_type: count by status ---
counts = await mydb_sql(
"""
SELECT status, count(*) AS task_count
FROM tasks WHERE project_id = @project_id
GROUP BY status ORDER BY status
""",
row_type="TaskStatusCount",
).query_all_rows(project_id=project_id)
for row in counts:
print(f" {row.status}: {row.task_count}")
# --- Scalar query: get task ID by title ---
found_id = await mydb_sql(
"SELECT id FROM tasks WHERE project_id = @project_id AND title = @title"
).query_single_row(
project_id=project_id,
title="Set up CI",
)
print(f"Found task ID: {found_id}")
# --- Error handling: NoRowsError ---
try:
await mydb_sql(
"SELECT id, username, email, created_at FROM users WHERE id = @user_id"
).query_single_row(user_id=uuid.uuid4())
except NoRowsError:
print("User not found (expected)")
# --- Streaming: iterate tasks one by one ---
async with mydb_sql(
"""
SELECT id, project_id, assignee_id, title, status, priority, metadata, due_date, created_at
FROM tasks
WHERE project_id = @project_id AND (sqlc.narg('status')::task_status IS NULL OR status = @status?)
"""
).query_stream(
project_id=project_id,
status=None,
) as tasks:
async for task in tasks:
print(f"Streamed task: {task.title} ({task.status})")
# --- Explicit connection: run a query outside the current context ---
async with mydb_transaction():
await mydb_sql("UPDATE tasks SET status = @status WHERE id = @task_id").execute(
task_id=task1_id, status=MydbTaskStatus.IN_PROGRESS
)
async with MYDB_POOL.connection() as conn:
audit_count = await (
mydb_sql("SELECT count(*) FROM tasks WHERE status = @status")
.with_connection(conn)
.query_single_row(status=MydbTaskStatus.IN_PROGRESS)
)
print(f"Audit (separate conn): {audit_count} in-progress tasks")
# --- LISTEN/NOTIFY ---
async with mydb_listen_session("task_updates") as task_ids:
await mydb_notify("task_updates", str(task1_id))
async with asyncio.timeout(5):
async for task_id in task_ids:
print(f"Received: {task_id}")
break
asyncio.run(main())