Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions src/strands_tools/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
Key Features:
-------------
1. Advanced Task Management:
• Parallel execution with dynamic thread pooling
• Parallel execution with a fixed-size thread pool
• Priority-based scheduling (1-5 levels)
• Complex dependency resolution with validation
• Timeout and resource controls per task
Expand All @@ -25,11 +25,8 @@
• Automatic tool filtering and validation
• Support for any combination of tools per task

4. Resource Optimization:
• Automatic thread pool scaling (2-8 threads)
• Rate limiting with exponential backoff
• Resource-aware task distribution
• CPU usage monitoring and optimization
4. Rate Limiting:
• Exponential backoff on retries

5. Reliability Features:
• Persistent state storage with real-time monitoring
Expand Down Expand Up @@ -130,9 +127,7 @@
os.makedirs(WORKFLOW_DIR, exist_ok=True)

# Default thread pool settings
MIN_THREADS = int(os.getenv("STRANDS_WORKFLOW_MIN_THREADS", "2"))
MAX_THREADS = int(os.getenv("STRANDS_WORKFLOW_MAX_THREADS", "8"))
CPU_THRESHOLD = int(os.getenv("STRANDS_WORKFLOW_CPU_THRESHOLD", "80")) # CPU usage threshold for scaling down

# Rate limiting configuration
_rate_limit_lock = RLock()
Expand All @@ -157,10 +152,9 @@ def on_modified(self, event):


class TaskExecutor:
"""Advanced task executor with dynamic scaling and resource monitoring."""
"""Task executor that runs workflow tasks in a fixed-size thread pool."""

def __init__(self, min_workers=MIN_THREADS, max_workers=MAX_THREADS):
self.min_workers = min_workers
def __init__(self, max_workers=MAX_THREADS):
self.max_workers = max_workers
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self.task_queue = Queue()
Expand Down
Loading