Skip to content

Commit 383fc31

Browse files
committed
Added fast_executemany and commit_every params to improve the speed of upload
1 parent 7a6c287 commit 383fc31

1 file changed

Lines changed: 39 additions & 56 deletions

File tree

app/airflow/dags/libs/SR_processing/db_services.py

Lines changed: 39 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from libs.settings import AIRFLOW_DAGRUN_TIMEOUT, AIRFLOW_DEBUG_MODE
99
from libs.utils import update_job_status
1010
from openpyxl.worksheet.worksheet import Worksheet
11-
from psycopg2.extras import execute_values
1211

1312
# PostgreSQL connection hook
1413
pg_hook = PostgresHook(
@@ -106,35 +105,28 @@ def update_temp_data_dictionary_table(
106105
}
107106
)
108107

109-
# Insert records into the temporary table using execute_values for fast bulk inserts
108+
# Insert records into the temporary table
110109
if dictionary_records:
111-
conn = pg_hook.get_conn()
112-
cursor = conn.cursor()
113-
try:
114-
execute_values(
115-
cursor,
116-
f"""
117-
INSERT INTO temp_data_dictionary_{scan_report_id}
118-
(table_name, field_name, value, value_description)
119-
VALUES %s
120-
""",
121-
[
122-
(
123-
d["table_name"],
124-
d["field_name"],
125-
d["value"],
126-
d["value_description"],
127-
)
128-
for d in dictionary_records
129-
],
130-
)
131-
conn.commit()
132-
except Exception:
133-
conn.rollback()
134-
raise
135-
finally:
136-
cursor.close()
137-
conn.close()
110+
pg_hook.insert_rows(
111+
table=f"temp_data_dictionary_{scan_report_id}",
112+
rows=[
113+
(
114+
d["table_name"],
115+
d["field_name"],
116+
d["value"],
117+
d["value_description"],
118+
)
119+
for d in dictionary_records
120+
],
121+
target_fields=[
122+
"table_name",
123+
"field_name",
124+
"value",
125+
"value_description",
126+
],
127+
fast_executemany=True,
128+
commit_every=3000,
129+
)
138130

139131
logging.info(
140132
f"Created temporary data dictionary table with {len(dictionary_records)} records"
@@ -205,34 +197,25 @@ def create_temp_field_values_table(
205197
}
206198
)
207199

208-
# Using execute_values() for fast bulk inserts (see notes above)
209200
if field_values_data:
210-
conn = pg_hook.get_conn()
211-
cursor = conn.cursor()
212-
try:
213-
execute_values(
214-
cursor,
215-
f"""
216-
INSERT INTO temp_field_values_{table_id}
217-
(field_name, value, frequency)
218-
VALUES %s
219-
""",
220-
[
221-
(
222-
d["field_name"],
223-
d["value"],
224-
d["frequency"],
225-
)
226-
for d in field_values_data
227-
],
228-
)
229-
conn.commit()
230-
except Exception:
231-
conn.rollback()
232-
raise
233-
finally:
234-
cursor.close()
235-
conn.close()
201+
pg_hook.insert_rows(
202+
table=f"temp_field_values_{table_id}",
203+
rows=[
204+
(
205+
d["field_name"],
206+
d["value"],
207+
d["frequency"],
208+
)
209+
for d in field_values_data
210+
],
211+
target_fields=[
212+
"field_name",
213+
"value",
214+
"frequency",
215+
],
216+
fast_executemany=True,
217+
commit_every=3000,
218+
)
236219

237220
except Exception as e:
238221
logging.error(f"Error creating data dictionary table: {str(e)}")

0 commit comments

Comments
 (0)