44import os
55import random
66import string
7- from datetime import datetime , timezone
7+ from datetime import datetime , timedelta , timezone
88from typing import Any
99
1010import boto3
11+ import jwt
1112import requests
1213
1314
@@ -18,11 +19,12 @@ class ProductSearchApiDriver:
1819 matching the pattern used by the activity-service integration tests.
1920 """
2021
21- def __init__ (self , search_api_endpoint : str , product_api_endpoint : str , event_bus_name : str ) -> None :
22+ def __init__ (self , search_api_endpoint : str , product_api_endpoint : str , event_bus_name : str , jwt_secret : str = "" ) -> None :
2223 self .search_api_endpoint = search_api_endpoint .rstrip ("/" )
2324 self .product_api_endpoint = product_api_endpoint .rstrip ("/" )
2425 self .event_bus_name = event_bus_name
2526 self .environment = os .environ .get ("ENV" , "dev" )
27+ self ._jwt_secret = jwt_secret
2628 self ._events_client = boto3 .client ("events" )
2729 self ._products_client = requests .Session ()
2830
@@ -52,20 +54,46 @@ def search_raw(self, query: str) -> requests.Response:
5254 # Product Management Service API (used to seed test products)
5355 # ------------------------------------------------------------------
5456
57+ def _generate_admin_token (self ) -> str :
58+ """Generate a short-lived ADMIN JWT signed with the shared secret.
59+
60+ The Product Management Service validates:
61+ - HS256 signing algorithm
62+ - `user_type` claim must equal "ADMIN"
63+ - Token must not be expired
64+ See: src/product-management-service/src/product-api/internal/adapters/authentication.go
65+ """
66+ now = datetime .now (timezone .utc )
67+ payload = {
68+ "sub" : "admin@serverless-sample.com" ,
69+ "user_type" : "ADMIN" ,
70+ "iat" : now ,
71+ "exp" : now + timedelta (hours = 1 ),
72+ }
73+ return jwt .encode (payload , self ._jwt_secret , algorithm = "HS256" )
74+
5575 def create_product (self , name : str , price : float ) -> dict [str , Any ]:
5676 """Create a product via the Product Management Service API."""
77+ headers = {}
78+ if self ._jwt_secret :
79+ headers ["Authorization" ] = f"Bearer { self ._generate_admin_token ()} "
5780 response = self ._products_client .post (
5881 f"{ self .product_api_endpoint } /product" ,
5982 json = {"name" : name , "price" : price },
83+ headers = headers ,
6084 timeout = 10 ,
6185 )
6286 response .raise_for_status ()
6387 return response .json () # type: ignore[no-any-return]
6488
6589 def delete_product (self , product_id : str ) -> None :
6690 """Delete a product via the Product Management Service API."""
91+ headers = {}
92+ if self ._jwt_secret :
93+ headers ["Authorization" ] = f"Bearer { self ._generate_admin_token ()} "
6794 response = self ._products_client .delete (
6895 f"{ self .product_api_endpoint } /product/{ product_id } " ,
96+ headers = headers ,
6997 timeout = 10 ,
7098 )
7199 # 404 on cleanup is acceptable
@@ -153,9 +181,10 @@ def initialize_driver() -> ProductSearchApiDriver:
153181 search_endpoint = os .environ .get ("SEARCH_API_ENDPOINT" )
154182 product_endpoint = os .environ .get ("PRODUCT_API_ENDPOINT" )
155183 event_bus_name = os .environ .get ("EVENT_BUS_NAME" )
184+ jwt_secret = os .environ .get ("JWT_SECRET_KEY" , "" )
156185
157186 if search_endpoint and product_endpoint and event_bus_name :
158- return ProductSearchApiDriver (search_endpoint , product_endpoint , event_bus_name )
187+ return ProductSearchApiDriver (search_endpoint , product_endpoint , event_bus_name , jwt_secret )
159188
160189 env = os .environ .get ("ENV" , "dev" )
161190 ssm = boto3 .client ("ssm" )
@@ -179,10 +208,17 @@ def initialize_driver() -> ProductSearchApiDriver:
179208 )["Parameter" ]["Value" ]
180209 except ssm .exceptions .ParameterNotFound :
181210 event_bus_name = "default"
211+ try :
212+ jwt_secret = ssm .get_parameter (
213+ Name = f"/{ env } /shared/secret-access-key" ,
214+ WithDecryption = True ,
215+ )["Parameter" ]["Value" ]
216+ except ssm .exceptions .ParameterNotFound :
217+ jwt_secret = ""
182218 else :
183219 # Product Management Service and shared event bus are not deployed in ephemeral envs.
184220 # Pipeline tests are skipped; smoke tests only need the search endpoint.
185221 product_endpoint = ""
186222 event_bus_name = "default"
187223
188- return ProductSearchApiDriver (search_endpoint , product_endpoint , event_bus_name )
224+ return ProductSearchApiDriver (search_endpoint , product_endpoint , event_bus_name , jwt_secret )
0 commit comments