Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ kind: Secret
metadata:
name: openlineage-secret
stringData:
api_key: your-marquez-api-key # pragma: allowlist secret
api_key: your-marquez-api-key #pragma: allowlist secret
173 changes: 173 additions & 0 deletions sdk/python/tests/integration/online_store/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""
Fixtures for online-store integration tests.
"""

from typing import Dict

import pytest
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs

from tests.universal.feature_repos.universal.online_store_creator import (
OnlineStoreCreator,
)


class _SharedDbDynamoDBOnlineStoreCreator(OnlineStoreCreator):
"""DynamoDB Local container started with ``-sharedDb -inMemory``.

Why ``-sharedDb``
-----------------
DynamoDB Local 2.x namespaces tables by the **access key ID** in the
request signature. In CI, the sync ``boto3`` client and the async
``aiobotocore`` client can resolve credentials from *different* sources
(env vars, credential file, ``credential_process``, container IAM role,
etc.) even after ``monkeypatch.setenv`` has set fake keys—because the
credential chain is evaluated lazily and various caches may hold stale
values.

When the two clients end up using *different* access keys, the sync
client creates tables in namespace A while the async client queries
namespace B, which is empty → ``ResourceNotFoundException``.

``-sharedDb`` collapses all namespaces into a single in-memory database,
making table visibility completely independent of which credentials each
client uses. This is the correct setting for integration tests that want
to verify async read/write behaviour without caring about credential
isolation.
"""

def __init__(self, project_name: str, **kwargs):
super().__init__(project_name)
self.container = (
DockerContainer("amazon/dynamodb-local:latest")
.with_exposed_ports("8000")
.with_command("-jar DynamoDBLocal.jar -sharedDb -inMemory")
)

def create_online_store(self) -> Dict[str, str]:
self.container.start()
wait_for_logs(
container=self.container,
predicate="Initializing DynamoDB Local with the following configuration:",
timeout=10,
)
exposed_port = self.container.get_exposed_port("8000")
return {
"type": "dynamodb",
"endpoint_url": f"http://localhost:{exposed_port}",
"region": "us-west-2",
}

def teardown(self):
self.container.stop()


@pytest.fixture
async def dynamodb_local_environment(monkeypatch, worker_id):
"""Isolated, self-contained Environment for DynamoDB async tests.

Root cause of the async credential failures
-------------------------------------------
DynamoDB Local 2.x isolates tables **per access key ID**. In CI,
``boto3`` (sync, used to provision tables via ``store.apply()``) and
``aiobotocore`` (async, used for reads/writes in the test body) may
resolve credentials from *different* sources even when ``monkeypatch``
has set fake static keys—the credential chain is evaluated lazily and
caches may hold stale values from a real AWS session configured in the
runner environment.

When the two clients end up using different access key IDs they land in
different DynamoDB Local namespaces:

* sync client → namespace ``KEY_A`` → tables exist ✓
* async client → namespace ``KEY_B`` → tables not found → ``ResourceNotFoundException``

Fix: ``_SharedDbDynamoDBOnlineStoreCreator``
--------------------------------------------
The isolated container is started with ``-sharedDb -inMemory``. In
shared-DB mode DynamoDB Local stores *all* tables in a single namespace
regardless of the access key, so sync and async clients always see the
same tables.

Why async + ``await fs.initialize()`` before yielding
-----------------------------------------------------
Calling ``await fs.initialize()`` eagerly creates the ``aiobotocore``
client inside this fixture's event loop (the *same* loop the test will
run in). This pre-caches:

1. ``FeatureStore._provider`` so the identical ``DynamoDBOnlineStore``
instance is reused for the entire test.
2. The aiobotocore client, which is now unambiguously pointed at our
isolated container's ``endpoint_url``.

Yields
------
tuple[Environment, TestData]
``(environment, (entities, datasets, data_sources))``
"""
from feast.infra.online_stores.dynamodb import DynamoDBOnlineStore
from tests.universal.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
)
from tests.universal.feature_repos.repo_configuration import (
construct_test_environment,
construct_universal_test_data,
)
from tests.universal.feature_repos.universal.data_sources.file import (
FileDataSourceCreator,
)

# Set fake static credentials before any boto client is created.
# These are accepted by DynamoDB Local regardless of validity.
monkeypatch.setenv("AWS_ACCESS_KEY_ID", "fakeaccesskey000000")
monkeypatch.setenv(
"AWS_SECRET_ACCESS_KEY", "fakesecretkey0000000000000000000000000000"
)
monkeypatch.delenv("AWS_SESSION_TOKEN", raising=False)
monkeypatch.delenv("AWS_SECURITY_TOKEN", raising=False)
# Prevent IMDS from injecting real session tokens on EC2-backed runners.
monkeypatch.setenv("AWS_EC2_METADATA_DISABLED", "true")
# Disable the container credentials provider (ECS/EKS IAM roles).
monkeypatch.delenv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI", raising=False)
monkeypatch.delenv("AWS_CONTAINER_CREDENTIALS_FULL_URI", raising=False)
# Ensure no profile redirects boto to a different credential source.
monkeypatch.delenv("AWS_PROFILE", raising=False)
monkeypatch.delenv("AWS_DEFAULT_PROFILE", raising=False)

# Reset class-level boto3 client caches so that no stale client from a
# previous test in this worker bleeds into our isolated environment.
DynamoDBOnlineStore._dynamodb_client = None
DynamoDBOnlineStore._dynamodb_resource = None

config = IntegrationTestRepoConfig(
provider="local",
offline_store_creator=FileDataSourceCreator,
online_store_creator=_SharedDbDynamoDBOnlineStoreCreator,
online_store=None,
)

environment = construct_test_environment(
config,
fixture_request=None,
worker_id=worker_id,
)
environment.setup()

# FileDataSourceCreator writes only local Parquet files — no AWS calls.
universal_test_data = construct_universal_test_data(environment)

# Eagerly initialise the aiobotocore client in *this* event loop so it
# is guaranteed to point at our container and is reused throughout the
# test body without lazy-init surprises.
await environment.feature_store.initialize()

yield environment, universal_test_data

# Cleanly shut down the async client before the container disappears.
await environment.feature_store.close()
environment.teardown()

# Flush class-level caches so the next test starts completely fresh.
DynamoDBOnlineStore._dynamodb_client = None
DynamoDBOnlineStore._dynamodb_resource = None
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def test_push_features_and_read(store):


@pytest.mark.integration
@pytest.mark.universal_online_stores(only=["dynamodb", "mongodb"])
@pytest.mark.universal_online_stores(only=["mongodb"])
async def test_push_features_and_read_async(store):
await store.push_async("location_stats_push_source", _ingest_df())

Expand All @@ -56,3 +56,36 @@ async def test_push_features_and_read_async(store):
entity_rows=[{"location_id": 1}],
)
assert_response(online_resp)


@pytest.mark.asyncio
@pytest.mark.integration
@pytest.mark.universal_online_stores
async def test_push_features_and_read_async_dynamodb(dynamodb_local_environment):
"""Async push + async read for DynamoDB with a credential-isolated environment.

DynamoDB Local 2.x rejects requests that carry an expired AWS session
token. In CI, real (possibly expired) STS credentials exist in the
environment. The shared ``environment`` fixture resolves credentials
before the async client is created, so those bad credentials bleed in.

This test uses ``dynamodb_local_environment``, which sets dummy
credentials *before* any boto client is instantiated, guaranteeing that
both the sync boto3 table-provisioning client and the async aiobotocore
client start with clean, token-free credentials.
"""
environment, universal_test_data = dynamodb_local_environment
store = environment.feature_store
_, _, data_sources = universal_test_data

feature_views = construct_universal_feature_views(data_sources)
location_fv = feature_views.pushed_locations
store.apply([location(), location_fv])

await store.push_async("location_stats_push_source", _ingest_df())

online_resp = await store.get_online_features_async(
features=["pushable_location_stats:temperature"],
entity_rows=[{"location_id": 1}],
)
assert_response(online_resp)
Original file line number Diff line number Diff line change
Expand Up @@ -532,10 +532,19 @@ async def test_async_online_retrieval_with_event_timestamps(

@pytest.mark.asyncio
@pytest.mark.integration
@pytest.mark.universal_online_stores(only=["dynamodb"])
@pytest.mark.universal_online_stores
async def test_async_online_retrieval_with_event_timestamps_dynamo(
environment, universal_data_sources
dynamodb_local_environment,
):
"""Async online retrieval for DynamoDB with a credential-isolated environment.

Uses ``dynamodb_local_environment`` (its own DynamoDB Local container +
FileDataSourceCreator) so that dummy credentials are set before any boto
client is created. This avoids the expired-STS-token problem that
occurs when aiobotocore lazily resolves credentials from the shared
environment in CI.
"""
environment, universal_data_sources = dynamodb_local_environment
await _do_async_retrieval_test(environment, universal_data_sources)


Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/unit/infra/online_store/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ async def _run():
config, feature_view, data, progress=None
)

asyncio.get_event_loop().run_until_complete(_run())
asyncio.run(_run())

assert mock_async_client.pipeline.call_count == 1
async_pipe.hmget.assert_not_called()
Expand Down
Loading