Skip to content

Commit f8d69f1

Browse files
authored
Analytics container creates connection string from injected env vars (#46)
* Analytics container creates connection string from injected env vars Previously, the db connection string was passed in by the client. This is now optional behaviour, the default with no string is now to construct the string *on the container* using the injected credentials. Default env vars renamed to match 5STES defaults. Minor updates to readme and examples. * Fixed Tests Fixed tests to correctly pass, taking into account renamed variables and the envrironment variables. * Fixed more tests Fixed more tests. Added tests and validation for env vars. * Fixed broken import Import of files in test from docker directory failed without changes to pyproject.toml. File is now imported at runtime (but this is only for a unit test). * Parsed url in tests rather than checking substrings Previous tests didn't check url was correctly formed. Failed automatic checks. * for testing new contianer for testing new contianer * remove build on push Remove the github action to build container on push. * Tidy up General tidy up, removing comments, etc. * Tidy test output messages Tidy test output messages * Corrected tests and added docker readme Moved some block comments out of one of the query_resolver and used them to build a small (AI generated) readme for the docker container instead. Removed AI generated scripts from the docker tests and used pytest integration decorators instead.
1 parent 03c4189 commit f8d69f1

17 files changed

Lines changed: 492 additions & 395 deletions

README.md

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,13 @@ All variables in `env.example` are **required**. Here's what you need to configu
2929
TES_BASE_URL=http://your-tes-endpoint:5034/v1/tasks
3030
TES_DOCKER_IMAGE=harbor.your-registry.com/your-image:tag
3131

32-
# Database Configuration
33-
DB_HOST=your-database-host
34-
DB_PORT=5432
35-
DB_USERNAME=your-database-username
36-
DB_PASSWORD=your-database-password
37-
DB_NAME=your-database-name
38-
3932
# MinIO Configuration
4033
MINIO_STS_ENDPOINT=http://your-minio-endpoint:9000/sts
4134
MINIO_ENDPOINT=your-minio-endpoint:9000
4235
MINIO_OUTPUT_BUCKET=your-output-bucket-name
4336
```
4437

38+
4539
### 3. Installation
4640

4741
```bash
@@ -66,13 +60,13 @@ import os
6660
analytics_tes = AnalyticsTES()
6761
orchestrator = AnalysisOrchestrator(tes_client=analytics_tes)
6862
analysis_runner = AnalysisRunner(tes_client=analytics_tes)
69-
sql_schema = os.getenv("SQL_SCHEMA", "public")
63+
sql_schema = os.getenv("postgresSchema", "public")
7064

7165

7266

7367
# Define your own SQL query
7468
query_template = Template("""WITH user_query AS (
75-
SELECT value_as_number FROM $schema.measurement
69+
SELECT value_as_number FROM $sql_schema.measurement
7670
WHERE measurement_concept_id = 21490742
7771
AND value_as_number IS NOT NULL
7872
)

analysis_orchestrator.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,11 @@ def __init__(
4141
self.token_session = token_session
4242
self.project = project
4343
self.tes_client = tes_client
44+
## set to None here to be explicitly set later, either by passing args or environment variables
45+
self.tres = None
4446
self.minio_client = MinIOClient(token_session=token_session)
4547

48+
4649
def parse_tres(self, tres: str) -> List[str]:
4750
"""
4851
Parse the TREs from the environment variable.
@@ -114,13 +117,33 @@ def _submit_and_collect_results(self,
114117

115118
task_id = result['id']
116119
print(f"Task ID: {task_id}")
120+
117121
results_paths = [f"{int(task_id) + i + 1}/output.{output_format}" for i in range(n_results)]
118122

119123
# Use polling engine to collect results
120124
polling_engine = polling.Polling(self.tes_client, self.minio_client, task_id)
121125
data = polling_engine.poll_results(results_paths, bucket, n_results, polling_interval=10)
122126

123127
return task_id, data
128+
129+
def collect_results(self, task_id: str, token: str = None, bucket: str=None, output_format: str = "json"):
130+
if token is None:
131+
token = os.getenv('5STES_TOKEN')
132+
if not token:
133+
raise ValueError("5STES_TOKEN environment variable is required when token parameter is not provided")
134+
self.token = token
135+
if self.tres is None:
136+
tres = os.getenv('5STES_TRES')
137+
if not tres:
138+
raise ValueError("5STES_TRES environment variable is required when tres parameter is not provided")
139+
self.tres = self.parse_tres(tres)
140+
if bucket is None:
141+
bucket = os.getenv('MINIO_OUTPUT_BUCKET')
142+
if not bucket:
143+
raise ValueError("MINIO_OUTPUT_BUCKET environment variable is required when bucket parameter is not provided")
144+
n_results = len(self.tres)
145+
results_paths = [f"{int(task_id) + i + 1}/output.{output_format}" for i in range(n_results)]
146+
return self._collect_results(results_paths, bucket, n_results)
124147

125148
def _collect_results(self, results_paths: List[str], bucket: str, n_results: int) -> List[str]:
126149
"""

analysis_runner.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -261,14 +261,14 @@ def get_supported_analysis_types(self) -> List[str]:
261261
analysis_runner = AnalysisRunner()
262262

263263
## need this to populate the query template
264-
sql_schema = os.getenv("SQL_SCHEMA", "public")
264+
sql_schema = os.getenv("postgresSchema", "public")
265265

266266
# Example: Run variance analysis first, then mean analysis on the same data
267-
query_template = Template("""SELECT value_as_number FROM $schema.measurement
268-
WHERE measurement_concept_id = 21490742
267+
query_template = Template("""SELECT value_as_number FROM $sql_schema.measurement
268+
WHERE measurement_concept_id = 43055141
269269
AND value_as_number IS NOT NULL""")
270270

271-
user_query = query_template.safe_substitute(schema=sql_schema)
271+
user_query = query_template.safe_substitute(sql_schema=sql_schema)
272272

273273
print("Running mean analysis...")
274274
mean_result = analysis_runner.run_analysis(

analytics_tes.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ def _set_env(self) -> None:
2525
Set the environment variables for a TES task.
2626
"""
2727
self.env = {
28-
"DATASOURCE_DB_DATABASE": self.default_db_config['name'],
29-
"DATASOURCE_DB_HOST": self.default_db_config['host'],
30-
"DATASOURCE_DB_PASSWORD": self.default_db_config['password'],
31-
"DATASOURCE_DB_USERNAME": self.default_db_config['username']
28+
"postgresDatabase": self.default_db_config['name'],
29+
"postgresServer": self.default_db_config['host'],
30+
"postgresPassword": self.default_db_config['password'],
31+
"postgresUsername": self.default_db_config['username']
3232
}
3333
return None
3434

@@ -37,14 +37,16 @@ def _set_command(self, query: str, analysis_type: str, output_path: str, output_
3737
Set the command for a TES task.
3838
"""
3939

40-
connection_string = f"postgresql://postgres:{self.default_db_config['password']}@{self.default_db_config['host']}:{self.default_db_config['port']}/{self.default_db_config['name']}"
40+
4141
self.command = [
4242
f"--user-query={query}",
4343
f"--analysis={analysis_type}",
44-
f"--db-connection={connection_string}",
4544
f"--output-filename={output_path}/output",
4645
f"--output-format={output_format}"
4746
]
47+
## add this one if you want to override the injected connection vars with a command line argument.
48+
# connection_string = f"postgresql://postgres:{self.default_db_config['password']}@{self.default_db_config['host']}:{self.default_db_config['port']}/{self.default_db_config['name']}"
49+
#f"--db-connection={connection_string}",
4850
return None
4951

5052
def set_executors(self, query, analysis_type, workdir = "/app", output_path="/outputs", output_format="json") -> None:

bunny_tes.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ def __init__(self, *args, **kwargs):
1919
self.task_api_username = os.getenv('TASK_API_USERNAME')
2020
self.task_api_password = os.getenv('TASK_API_PASSWORD')
2121

22-
# Add schema to default_db_config if not already present
23-
if 'schema' not in self.default_db_config:
24-
self.default_db_config['schema'] = os.getenv('SQL_SCHEMA') # None if not set - will fail naturally if needed
22+
# Schema: use postgresSchema throughout; entrypoint passes it to bunny as DATASOURCE_DB_SCHEMA
23+
if not self.default_db_config.get('schema'):
24+
self.default_db_config['schema'] = os.getenv('postgresSchema')
2525

2626
#### this section will be implemented for each type of task using the pytes classes. Note that many of these fields are set in the submission layer after submission.
2727
def set_inputs(self) -> None:
@@ -43,14 +43,21 @@ def set_outputs(self, name: str, output_path: str, output_type: str = "DIRECTORY
4343
def _set_env(self) -> None:
4444
"""
4545
Set the environment variables for a TES task.
46+
Container entrypoint reads postgres* and exports DATASOURCE_DB_* for bunny; we set both.
4647
"""
48+
db = self.default_db_config
49+
schema = db.get('schema') or 'public'
50+
port = str(db.get('port') or '5432')
4751
self.env = {
48-
"DATASOURCE_DB_DATABASE": self.default_db_config['name'],
49-
"DATASOURCE_DB_HOST": self.default_db_config['host'],
50-
"DATASOURCE_DB_PASSWORD": self.default_db_config['password'],
51-
"DATASOURCE_DB_USERNAME": self.default_db_config['username'],
52-
"DATASOURCE_DB_PORT": self.default_db_config['port'],
53-
"DATASOURCE_DB_SCHEMA": self.default_db_config['schema'],
52+
# Names the bunny-wrapper entrypoint reads (postgres* → DATASOURCE_DB_*)
53+
"postgresDatabase": db['name'],
54+
"postgresServer": db['host'],
55+
"postgresPort": port,
56+
"postgresSchema": schema,
57+
"postgresUsername": db['username'],
58+
"postgresPassword": db['password'],
59+
60+
# Bunny / task API
5461
"TASK_API_BASE_URL": self.task_api_base_url,
5562
"TASK_API_USERNAME": self.task_api_username,
5663
"TASK_API_PASSWORD": self.task_api_password,

docker/README.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Docker: Node-side query and analysis
2+
3+
This folder contains the code that runs **inside the TES container** on each TRE node. It executes the user’s analysis (SQL plus optional Python) against the node’s database and writes the result (e.g. JSON) for the client to aggregate.
4+
5+
## Purpose
6+
7+
- The **user query** and **analysis type** are passed in as CLI arguments.
8+
- The **analysis type** is looked up in the `LOCAL_PROCESSING_CLASSES` registry in `local_processing.py`.
9+
- Each analysis class is responsible for:
10+
- Building the SQL query (from the user query + analysis-specific logic),
11+
- Running it against the node DB,
12+
- Optional Python-side analysis on the result.
13+
- Results are written to file (e.g. JSON) and later collected and aggregated on the client side.
14+
15+
So this code does the **per-node, partial** work; aggregation across TREs happens elsewhere (orchestrator / client).
16+
17+
## Flow
18+
19+
1. **Entrypoint** — Container runs `python query_resolver.py` with CLI args (`--user-query`, `--analysis`, `--db-connection` or env, `--output-filename`, `--output-format`).
20+
2. **query_resolver.py** — Parses the connection string (from env or `--db-connection`), then calls `process_query()`.
21+
3. **process_query()** — Resolves the DB connection, looks up the analysis in `LOCAL_PROCESSING_CLASSES`, instantiates the processor, builds and runs the query, runs optional Python analysis, and writes the result to disk.
22+
4. **local_processing.py** — Defines the registry and analysis classes (e.g. Mean, Variance, PMCC, ContingencyTable). Each class extends `BaseLocalProcessing` (from `local_processing_base.py`) and implements query building and optional Python analysis.
23+
24+
## Main modules
25+
26+
| File | Role |
27+
|------|------|
28+
| `query_resolver.py` | Click CLI, connection string parsing (`parse_connection_string`), and `process_query()` (orchestrates DB connection, registry lookup, execution, output). |
29+
| `local_processing.py` | `LOCAL_PROCESSING_CLASSES` registry and concrete analysis classes (Mean, Variance, etc.). |
30+
| `local_processing_base.py` | `BaseLocalProcessing` abstract base class (query building, optional Python analysis hook). |
31+
| `Dockerfile` | Builds the image that runs this code (Python 3.12, dependencies, entrypoint `query_resolver.py`). |
32+
33+
## Database connection
34+
35+
- If `--db-connection` is **not** provided, the connection string is built from environment variables: `postgresUsername`, `postgresPassword`, `postgresServer`, `postgresPort`, `postgresDatabase` (see `validate_environment()` and `parse_connection_string(None)` in `query_resolver.py`). This is the normal case when the container is launched by TES with env set by the task.
36+
- If `--db-connection` is provided, it can be a SQLAlchemy-style URL (`postgresql://...`) or a semicolon-separated key=value string (`Host=...;Username=...;...`).
37+
38+
## Building and running
39+
40+
From the repo root or this directory, build the image (see project docs or `tests/` for the exact image name and test usage). The container expects either postgres* env vars or `--db-connection`, plus `--user-query`, `--analysis`, and optional output options.
41+
42+
For the **bunny**-based workflow (different image and entrypoint), see `bunny-wrapper/`.

0 commit comments

Comments
 (0)