diff --git a/infra/feast-operator/bundle/manifests/openlineage-secret_v1_secret.yaml b/infra/feast-operator/bundle/manifests/openlineage-secret_v1_secret.yaml index a66a41e8466..aae8a81d7f5 100644 --- a/infra/feast-operator/bundle/manifests/openlineage-secret_v1_secret.yaml +++ b/infra/feast-operator/bundle/manifests/openlineage-secret_v1_secret.yaml @@ -3,4 +3,4 @@ kind: Secret metadata: name: openlineage-secret stringData: - api_key: your-marquez-api-key # pragma: allowlist secret + api_key: your-marquez-api-key #pragma: allowlist secret diff --git a/sdk/python/tests/integration/online_store/conftest.py b/sdk/python/tests/integration/online_store/conftest.py new file mode 100644 index 00000000000..42b507231dc --- /dev/null +++ b/sdk/python/tests/integration/online_store/conftest.py @@ -0,0 +1,173 @@ +""" +Fixtures for online-store integration tests. +""" + +from typing import Dict + +import pytest +from testcontainers.core.container import DockerContainer +from testcontainers.core.waiting_utils import wait_for_logs + +from tests.universal.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) + + +class _SharedDbDynamoDBOnlineStoreCreator(OnlineStoreCreator): + """DynamoDB Local container started with ``-sharedDb -inMemory``. + + Why ``-sharedDb`` + ----------------- + DynamoDB Local 2.x namespaces tables by the **access key ID** in the + request signature. In CI, the sync ``boto3`` client and the async + ``aiobotocore`` client can resolve credentials from *different* sources + (env vars, credential file, ``credential_process``, container IAM role, + etc.) even after ``monkeypatch.setenv`` has set fake keys—because the + credential chain is evaluated lazily and various caches may hold stale + values. + + When the two clients end up using *different* access keys, the sync + client creates tables in namespace A while the async client queries + namespace B, which is empty → ``ResourceNotFoundException``. + + ``-sharedDb`` collapses all namespaces into a single in-memory database, + making table visibility completely independent of which credentials each + client uses. This is the correct setting for integration tests that want + to verify async read/write behaviour without caring about credential + isolation. + """ + + def __init__(self, project_name: str, **kwargs): + super().__init__(project_name) + self.container = ( + DockerContainer("amazon/dynamodb-local:latest") + .with_exposed_ports("8000") + .with_command("-jar DynamoDBLocal.jar -sharedDb -inMemory") + ) + + def create_online_store(self) -> Dict[str, str]: + self.container.start() + wait_for_logs( + container=self.container, + predicate="Initializing DynamoDB Local with the following configuration:", + timeout=10, + ) + exposed_port = self.container.get_exposed_port("8000") + return { + "type": "dynamodb", + "endpoint_url": f"http://localhost:{exposed_port}", + "region": "us-west-2", + } + + def teardown(self): + self.container.stop() + + +@pytest.fixture +async def dynamodb_local_environment(monkeypatch, worker_id): + """Isolated, self-contained Environment for DynamoDB async tests. + + Root cause of the async credential failures + ------------------------------------------- + DynamoDB Local 2.x isolates tables **per access key ID**. In CI, + ``boto3`` (sync, used to provision tables via ``store.apply()``) and + ``aiobotocore`` (async, used for reads/writes in the test body) may + resolve credentials from *different* sources even when ``monkeypatch`` + has set fake static keys—the credential chain is evaluated lazily and + caches may hold stale values from a real AWS session configured in the + runner environment. + + When the two clients end up using different access key IDs they land in + different DynamoDB Local namespaces: + + * sync client → namespace ``KEY_A`` → tables exist ✓ + * async client → namespace ``KEY_B`` → tables not found → ``ResourceNotFoundException`` + + Fix: ``_SharedDbDynamoDBOnlineStoreCreator`` + -------------------------------------------- + The isolated container is started with ``-sharedDb -inMemory``. In + shared-DB mode DynamoDB Local stores *all* tables in a single namespace + regardless of the access key, so sync and async clients always see the + same tables. + + Why async + ``await fs.initialize()`` before yielding + ----------------------------------------------------- + Calling ``await fs.initialize()`` eagerly creates the ``aiobotocore`` + client inside this fixture's event loop (the *same* loop the test will + run in). This pre-caches: + + 1. ``FeatureStore._provider`` so the identical ``DynamoDBOnlineStore`` + instance is reused for the entire test. + 2. The aiobotocore client, which is now unambiguously pointed at our + isolated container's ``endpoint_url``. + + Yields + ------ + tuple[Environment, TestData] + ``(environment, (entities, datasets, data_sources))`` + """ + from feast.infra.online_stores.dynamodb import DynamoDBOnlineStore + from tests.universal.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, + ) + from tests.universal.feature_repos.repo_configuration import ( + construct_test_environment, + construct_universal_test_data, + ) + from tests.universal.feature_repos.universal.data_sources.file import ( + FileDataSourceCreator, + ) + + # Set fake static credentials before any boto client is created. + # These are accepted by DynamoDB Local regardless of validity. + monkeypatch.setenv("AWS_ACCESS_KEY_ID", "fakeaccesskey000000") + monkeypatch.setenv( + "AWS_SECRET_ACCESS_KEY", "fakesecretkey0000000000000000000000000000" + ) + monkeypatch.delenv("AWS_SESSION_TOKEN", raising=False) + monkeypatch.delenv("AWS_SECURITY_TOKEN", raising=False) + # Prevent IMDS from injecting real session tokens on EC2-backed runners. + monkeypatch.setenv("AWS_EC2_METADATA_DISABLED", "true") + # Disable the container credentials provider (ECS/EKS IAM roles). + monkeypatch.delenv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI", raising=False) + monkeypatch.delenv("AWS_CONTAINER_CREDENTIALS_FULL_URI", raising=False) + # Ensure no profile redirects boto to a different credential source. + monkeypatch.delenv("AWS_PROFILE", raising=False) + monkeypatch.delenv("AWS_DEFAULT_PROFILE", raising=False) + + # Reset class-level boto3 client caches so that no stale client from a + # previous test in this worker bleeds into our isolated environment. + DynamoDBOnlineStore._dynamodb_client = None + DynamoDBOnlineStore._dynamodb_resource = None + + config = IntegrationTestRepoConfig( + provider="local", + offline_store_creator=FileDataSourceCreator, + online_store_creator=_SharedDbDynamoDBOnlineStoreCreator, + online_store=None, + ) + + environment = construct_test_environment( + config, + fixture_request=None, + worker_id=worker_id, + ) + environment.setup() + + # FileDataSourceCreator writes only local Parquet files — no AWS calls. + universal_test_data = construct_universal_test_data(environment) + + # Eagerly initialise the aiobotocore client in *this* event loop so it + # is guaranteed to point at our container and is reused throughout the + # test body without lazy-init surprises. + await environment.feature_store.initialize() + + yield environment, universal_test_data + + # Cleanly shut down the async client before the container disappears. + await environment.feature_store.close() + environment.teardown() + + # Flush class-level caches so the next test starts completely fresh. + DynamoDBOnlineStore._dynamodb_client = None + DynamoDBOnlineStore._dynamodb_resource = None diff --git a/sdk/python/tests/integration/online_store/test_push_features_to_online_store.py b/sdk/python/tests/integration/online_store/test_push_features_to_online_store.py index 536864ed97e..55a4eb12adc 100644 --- a/sdk/python/tests/integration/online_store/test_push_features_to_online_store.py +++ b/sdk/python/tests/integration/online_store/test_push_features_to_online_store.py @@ -47,7 +47,7 @@ def test_push_features_and_read(store): @pytest.mark.integration -@pytest.mark.universal_online_stores(only=["dynamodb", "mongodb"]) +@pytest.mark.universal_online_stores(only=["mongodb"]) async def test_push_features_and_read_async(store): await store.push_async("location_stats_push_source", _ingest_df()) @@ -56,3 +56,36 @@ async def test_push_features_and_read_async(store): entity_rows=[{"location_id": 1}], ) assert_response(online_resp) + + +@pytest.mark.asyncio +@pytest.mark.integration +@pytest.mark.universal_online_stores +async def test_push_features_and_read_async_dynamodb(dynamodb_local_environment): + """Async push + async read for DynamoDB with a credential-isolated environment. + + DynamoDB Local 2.x rejects requests that carry an expired AWS session + token. In CI, real (possibly expired) STS credentials exist in the + environment. The shared ``environment`` fixture resolves credentials + before the async client is created, so those bad credentials bleed in. + + This test uses ``dynamodb_local_environment``, which sets dummy + credentials *before* any boto client is instantiated, guaranteeing that + both the sync boto3 table-provisioning client and the async aiobotocore + client start with clean, token-free credentials. + """ + environment, universal_test_data = dynamodb_local_environment + store = environment.feature_store + _, _, data_sources = universal_test_data + + feature_views = construct_universal_feature_views(data_sources) + location_fv = feature_views.pushed_locations + store.apply([location(), location_fv]) + + await store.push_async("location_stats_push_source", _ingest_df()) + + online_resp = await store.get_online_features_async( + features=["pushable_location_stats:temperature"], + entity_rows=[{"location_id": 1}], + ) + assert_response(online_resp) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index f72ac64586c..109eea454c0 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -532,10 +532,19 @@ async def test_async_online_retrieval_with_event_timestamps( @pytest.mark.asyncio @pytest.mark.integration -@pytest.mark.universal_online_stores(only=["dynamodb"]) +@pytest.mark.universal_online_stores async def test_async_online_retrieval_with_event_timestamps_dynamo( - environment, universal_data_sources + dynamodb_local_environment, ): + """Async online retrieval for DynamoDB with a credential-isolated environment. + + Uses ``dynamodb_local_environment`` (its own DynamoDB Local container + + FileDataSourceCreator) so that dummy credentials are set before any boto + client is created. This avoids the expired-STS-token problem that + occurs when aiobotocore lazily resolves credentials from the shared + environment in CI. + """ + environment, universal_data_sources = dynamodb_local_environment await _do_async_retrieval_test(environment, universal_data_sources) diff --git a/sdk/python/tests/unit/infra/online_store/test_redis.py b/sdk/python/tests/unit/infra/online_store/test_redis.py index b9b5dd3e97e..68eb28c4c11 100644 --- a/sdk/python/tests/unit/infra/online_store/test_redis.py +++ b/sdk/python/tests/unit/infra/online_store/test_redis.py @@ -468,7 +468,7 @@ async def _run(): config, feature_view, data, progress=None ) - asyncio.get_event_loop().run_until_complete(_run()) + asyncio.run(_run()) assert mock_async_client.pipeline.call_count == 1 async_pipe.hmget.assert_not_called()