Thread Safety
The Clio SDK is designed to handle multiple Playwright contexts concurrently. This guide covers thread safety features and best practices for parallel testing scenarios.
Thread Safety Features
Section titled “Thread Safety Features”Built-in Concurrency Support
Section titled “Built-in Concurrency Support”The Clio SDK includes several thread safety mechanisms:
from clio import ClioMonitorimport asynciofrom playwright.async_api import async_playwright
# Single monitor instance can safely handle multiple contextsmonitor = ClioMonitor(api_key="clio_your_key")
async def concurrent_tests(): """Multiple contexts can be monitored simultaneously""" async with async_playwright() as p: browser = await p.chromium.launch()
# Create multiple contexts context1 = await browser.new_context(record_video_dir="./videos1") context2 = await browser.new_context(record_video_dir="./videos2") context3 = await browser.new_context(record_video_dir="./videos3")
# All can be monitored concurrently await asyncio.gather( monitor.start_run(context1, "Test 1"), monitor.start_run(context2, "Test 2"), monitor.start_run(context3, "Test 3") )
# Run tests in parallel await asyncio.gather( run_test_scenario(context1, "Scenario A"), run_test_scenario(context2, "Scenario B"), run_test_scenario(context3, "Scenario C") )
# Close all contexts (uploads happen concurrently) await asyncio.gather( context1.close(), context2.close(), context3.close() )Internal Thread Safety Mechanisms
Section titled “Internal Thread Safety Mechanisms”The SDK uses several techniques to ensure thread safety:
- Threading Locks: Protects shared state during concurrent operations
- WeakKeyDictionary: Automatically cleans up when contexts are garbage collected
- Collision Detection: Prevents multiple monitors on the same context
- Atomic Operations: Thread-safe updates to internal state
Safe Concurrent Patterns
Section titled “Safe Concurrent Patterns”Pattern 1: Parallel Test Scenarios
Section titled “Pattern 1: Parallel Test Scenarios”async def parallel_test_scenarios(): """Run multiple test scenarios in parallel""" monitor = ClioMonitor(api_key=os.getenv("CLIO_API_KEY"))
async def test_login_flow(): async with async_playwright() as p: browser = await p.chromium.launch() context = await browser.new_context(record_video_dir="./videos/login")
await monitor.start_run( context=context, automation_name="Login Flow Test", success_criteria="User successfully logs in" )
page = await context.new_page() await page.goto("https://app.example.com/login") await page.fill("#username", "testuser") await page.fill("#password", "testpass") await page.click("#login-button")
await context.close() await browser.close()
async def test_checkout_flow(): async with async_playwright() as p: browser = await p.chromium.launch() context = await browser.new_context(record_video_dir="./videos/checkout")
await monitor.start_run( context=context, automation_name="Checkout Flow Test", success_criteria="Order completed successfully" )
page = await context.new_page() await page.goto("https://app.example.com/store") await page.click(".add-to-cart") await page.click(".checkout-button")
await context.close() await browser.close()
async def test_profile_flow(): async with async_playwright() as p: browser = await p.chromium.launch() context = await browser.new_context(record_video_dir="./videos/profile")
await monitor.start_run( context=context, automation_name="Profile Update Test", success_criteria="Profile updated successfully" )
page = await context.new_page() await page.goto("https://app.example.com/profile") await page.fill("#email", "newemail@example.com") await page.click("#save-button")
await context.close() await browser.close()
# Run all tests concurrently await asyncio.gather( test_login_flow(), test_checkout_flow(), test_profile_flow() )
print("✅ All parallel tests completed")Pattern 2: Browser Pool Management
Section titled “Pattern 2: Browser Pool Management”import asynciofrom contextlib import asynccontextmanager
class BrowserPool: """Manage a pool of browsers for concurrent testing"""
def __init__(self, size=3): self.size = size self.browsers = [] self.monitor = ClioMonitor(api_key=os.getenv("CLIO_API_KEY"))
async def __aenter__(self): async with async_playwright() as p: self.playwright = p # Create browser pool for i in range(self.size): browser = await p.chromium.launch() self.browsers.append(browser) return self
async def __aexit__(self, exc_type, exc_val, exc_tb): # Close all browsers await asyncio.gather(*[browser.close() for browser in self.browsers])
@asynccontextmanager async def get_monitored_context(self, test_name, video_dir): """Get a monitored context from the pool""" # Get least busy browser (simplified) browser = self.browsers[0]
context = await browser.new_context(record_video_dir=video_dir)
await self.monitor.start_run( context=context, automation_name=test_name )
try: yield context finally: await context.close()
# Usageasync def test_with_pool(): async with BrowserPool(size=3) as pool: async def run_test(test_id): async with pool.get_monitored_context( f"Pool Test {test_id}", f"./videos/test_{test_id}" ) as context: page = await context.new_page() await page.goto(f"https://example.com/test/{test_id}") # Test logic here
# Run multiple tests concurrently using the pool await asyncio.gather(*[run_test(i) for i in range(10)])Pattern 3: Queue-Based Processing
Section titled “Pattern 3: Queue-Based Processing”import asynciofrom dataclasses import dataclassfrom typing import List
@dataclassclass TestTask: name: str url: str success_criteria: str
async def queue_based_testing(): """Process test tasks from a queue with concurrency control""" monitor = ClioMonitor(api_key=os.getenv("CLIO_API_KEY"))
# Define test tasks tasks = [ TestTask("Homepage Test", "https://example.com", "Page loads successfully"), TestTask("About Page Test", "https://example.com/about", "About content visible"), TestTask("Contact Test", "https://example.com/contact", "Contact form accessible"), TestTask("Products Test", "https://example.com/products", "Product list displays"), TestTask("Services Test", "https://example.com/services", "Services listed"), ]
# Create task queue queue = asyncio.Queue() for task in tasks: await queue.put(task)
async def worker(worker_id): """Worker function to process tasks""" while True: try: # Get task from queue task = await asyncio.wait_for(queue.get(), timeout=1.0)
async with async_playwright() as p: browser = await p.chromium.launch() context = await browser.new_context( record_video_dir=f"./videos/worker_{worker_id}" )
await monitor.start_run( context=context, automation_name=f"Worker {worker_id}: {task.name}", success_criteria=task.success_criteria )
# Execute test page = await context.new_page() await page.goto(task.url) await page.wait_for_load_state("networkidle")
print(f"Worker {worker_id} completed: {task.name}")
await context.close() await browser.close()
# Mark task as done queue.task_done()
except asyncio.TimeoutError: # No more tasks, worker can exit break except Exception as e: print(f"Worker {worker_id} error: {e}") queue.task_done()
# Start workers num_workers = 3 workers = [asyncio.create_task(worker(i)) for i in range(num_workers)]
# Wait for all tasks to complete await queue.join()
# Cancel workers for worker in workers: worker.cancel()
print("✅ All queued tests completed")Advanced Concurrency Scenarios
Section titled “Advanced Concurrency Scenarios”Shared Browser, Multiple Contexts
Section titled “Shared Browser, Multiple Contexts”async def shared_browser_testing(): """Multiple contexts sharing a single browser instance""" monitor = ClioMonitor(api_key=os.getenv("CLIO_API_KEY"))
async with async_playwright() as p: # Single browser instance browser = await p.chromium.launch()
async def create_monitored_context(test_name, video_subdir): """Helper to create monitored context""" context = await browser.new_context( record_video_dir=f"./videos/{video_subdir}" )
await monitor.start_run( context=context, automation_name=test_name )
return context
# Create multiple contexts from same browser contexts = await asyncio.gather( create_monitored_context("Multi-Context Test 1", "ctx1"), create_monitored_context("Multi-Context Test 2", "ctx2"), create_monitored_context("Multi-Context Test 3", "ctx3") )
async def run_test_in_context(context, test_id): """Run test in specific context""" page = await context.new_page() await page.goto(f"https://httpbin.org/delay/{test_id}") await page.wait_for_load_state("networkidle") await context.close()
# Run tests concurrently in different contexts await asyncio.gather(*[ run_test_in_context(ctx, i) for i, ctx in enumerate(contexts, 1) ])
await browser.close()Rate-Limited Concurrent Processing
Section titled “Rate-Limited Concurrent Processing”import asynciofrom asyncio import Semaphore
async def rate_limited_testing(): """Control concurrency to respect rate limits""" monitor = ClioMonitor(api_key=os.getenv("CLIO_API_KEY"))
# Limit concurrent uploads to respect API rate limits upload_semaphore = Semaphore(2) # Max 2 concurrent uploads
async def rate_limited_test(test_id): """Test with rate limiting""" async with upload_semaphore: # Acquire semaphore async with async_playwright() as p: browser = await p.chromium.launch() context = await browser.new_context( record_video_dir=f"./videos/rate_limited_{test_id}" )
await monitor.start_run( context=context, automation_name=f"Rate Limited Test {test_id}" )
page = await context.new_page() await page.goto("https://example.com") await page.wait_for_timeout(2000) # Simulate test work
print(f"Starting upload for test {test_id}") await context.close() # Upload happens here await browser.close() print(f"Completed test {test_id}") # Semaphore released automatically
# Start many tests, but uploads are rate-limited await asyncio.gather(*[ rate_limited_test(i) for i in range(10) ])Error Handling in Concurrent Scenarios
Section titled “Error Handling in Concurrent Scenarios”Isolate Failures
Section titled “Isolate Failures”async def isolated_concurrent_tests(): """Ensure one test failure doesn't affect others""" monitor = ClioMonitor( api_key=os.getenv("CLIO_API_KEY"), raise_on_error=False # Don't let monitoring errors propagate )
async def safe_test(test_name, should_fail=False): """Test with isolated error handling""" try: async with async_playwright() as p: browser = await p.chromium.launch() context = await browser.new_context(record_video_dir="./videos")
await monitor.start_run(context=context, automation_name=test_name)
page = await context.new_page()
if should_fail: # Simulate a test failure await page.goto("https://nonexistent-site-12345.com") else: await page.goto("https://example.com")
await context.close() await browser.close()
print(f"✅ {test_name} completed") return True
except Exception as e: print(f"❌ {test_name} failed: {e}") return False
# Run tests with some failures - others should continue results = await asyncio.gather( safe_test("Good Test 1"), safe_test("Failing Test", should_fail=True), # This will fail safe_test("Good Test 2"), safe_test("Good Test 3"), return_exceptions=True # Don't stop on exceptions )
# Check results successful = sum(1 for result in results if result is True) print(f"✅ {successful}/{len(results)} tests passed")Best Practices for Thread Safety
Section titled “Best Practices for Thread Safety”1. Use One Monitor Instance
Section titled “1. Use One Monitor Instance”# Good - reuse monitor instancemonitor = ClioMonitor(api_key="clio_key")
async def test1(): await monitor.start_run(context1, "Test 1")
async def test2(): await monitor.start_run(context2, "Test 2")
# Avoid - creating multiple monitors unnecessarilyasync def test3(): monitor3 = ClioMonitor(api_key="clio_key") # Unnecessary await monitor3.start_run(context3, "Test 3")2. Separate Video Directories
Section titled “2. Separate Video Directories”# Good - separate directories prevent file conflictscontext1 = await browser.new_context(record_video_dir="./videos/test1")context2 = await browser.new_context(record_video_dir="./videos/test2")
# Avoid - same directory can cause conflictscontext1 = await browser.new_context(record_video_dir="./videos") # Same dircontext2 = await browser.new_context(record_video_dir="./videos") # Same dir3. Handle Resource Cleanup
Section titled “3. Handle Resource Cleanup”async def proper_cleanup(): """Ensure resources are cleaned up even with failures""" contexts = [] browsers = []
try: async with async_playwright() as p: # Create resources for i in range(3): browser = await p.chromium.launch() browsers.append(browser)
context = await browser.new_context(record_video_dir=f"./videos/{i}") contexts.append(context)
await monitor.start_run(context, f"Cleanup Test {i}")
# Run tests concurrently await asyncio.gather(*[ run_test_in_context(ctx) for ctx in contexts ])
finally: # Cleanup in reverse order await asyncio.gather(*[ctx.close() for ctx in contexts], return_exceptions=True) await asyncio.gather(*[browser.close() for browser in browsers], return_exceptions=True)4. Monitor Resource Usage
Section titled “4. Monitor Resource Usage”import psutilimport asyncio
async def monitor_resource_usage(): """Monitor system resources during concurrent testing""" process = psutil.Process()
async def log_resources(): while True: memory_mb = process.memory_info().rss / 1024 / 1024 cpu_percent = process.cpu_percent() print(f"Memory: {memory_mb:.1f} MB, CPU: {cpu_percent:.1f}%") await asyncio.sleep(5)
# Start resource monitoring monitor_task = asyncio.create_task(log_resources())
try: # Run your concurrent tests await isolated_concurrent_tests() finally: monitor_task.cancel()The Clio SDK’s thread safety features allow you to scale your automation testing while maintaining reliable monitoring across all your test scenarios.