Skip to content

Thread Safety

The Clio SDK is designed to handle multiple Playwright contexts concurrently. This guide covers thread safety features and best practices for parallel testing scenarios.

The Clio SDK includes several thread safety mechanisms:

from clio import ClioMonitor
import asyncio
from playwright.async_api import async_playwright
# Single monitor instance can safely handle multiple contexts
monitor = ClioMonitor(api_key="clio_your_key")
async def concurrent_tests():
"""Multiple contexts can be monitored simultaneously"""
async with async_playwright() as p:
browser = await p.chromium.launch()
# Create multiple contexts
context1 = await browser.new_context(record_video_dir="./videos1")
context2 = await browser.new_context(record_video_dir="./videos2")
context3 = await browser.new_context(record_video_dir="./videos3")
# All can be monitored concurrently
await asyncio.gather(
monitor.start_run(context1, "Test 1"),
monitor.start_run(context2, "Test 2"),
monitor.start_run(context3, "Test 3")
)
# Run tests in parallel
await asyncio.gather(
run_test_scenario(context1, "Scenario A"),
run_test_scenario(context2, "Scenario B"),
run_test_scenario(context3, "Scenario C")
)
# Close all contexts (uploads happen concurrently)
await asyncio.gather(
context1.close(),
context2.close(),
context3.close()
)

The SDK uses several techniques to ensure thread safety:

  1. Threading Locks: Protects shared state during concurrent operations
  2. WeakKeyDictionary: Automatically cleans up when contexts are garbage collected
  3. Collision Detection: Prevents multiple monitors on the same context
  4. Atomic Operations: Thread-safe updates to internal state
async def parallel_test_scenarios():
"""Run multiple test scenarios in parallel"""
monitor = ClioMonitor(api_key=os.getenv("CLIO_API_KEY"))
async def test_login_flow():
async with async_playwright() as p:
browser = await p.chromium.launch()
context = await browser.new_context(record_video_dir="./videos/login")
await monitor.start_run(
context=context,
automation_name="Login Flow Test",
success_criteria="User successfully logs in"
)
page = await context.new_page()
await page.goto("https://app.example.com/login")
await page.fill("#username", "testuser")
await page.fill("#password", "testpass")
await page.click("#login-button")
await context.close()
await browser.close()
async def test_checkout_flow():
async with async_playwright() as p:
browser = await p.chromium.launch()
context = await browser.new_context(record_video_dir="./videos/checkout")
await monitor.start_run(
context=context,
automation_name="Checkout Flow Test",
success_criteria="Order completed successfully"
)
page = await context.new_page()
await page.goto("https://app.example.com/store")
await page.click(".add-to-cart")
await page.click(".checkout-button")
await context.close()
await browser.close()
async def test_profile_flow():
async with async_playwright() as p:
browser = await p.chromium.launch()
context = await browser.new_context(record_video_dir="./videos/profile")
await monitor.start_run(
context=context,
automation_name="Profile Update Test",
success_criteria="Profile updated successfully"
)
page = await context.new_page()
await page.goto("https://app.example.com/profile")
await page.fill("#email", "newemail@example.com")
await page.click("#save-button")
await context.close()
await browser.close()
# Run all tests concurrently
await asyncio.gather(
test_login_flow(),
test_checkout_flow(),
test_profile_flow()
)
print("✅ All parallel tests completed")
import asyncio
from contextlib import asynccontextmanager
class BrowserPool:
"""Manage a pool of browsers for concurrent testing"""
def __init__(self, size=3):
self.size = size
self.browsers = []
self.monitor = ClioMonitor(api_key=os.getenv("CLIO_API_KEY"))
async def __aenter__(self):
async with async_playwright() as p:
self.playwright = p
# Create browser pool
for i in range(self.size):
browser = await p.chromium.launch()
self.browsers.append(browser)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Close all browsers
await asyncio.gather(*[browser.close() for browser in self.browsers])
@asynccontextmanager
async def get_monitored_context(self, test_name, video_dir):
"""Get a monitored context from the pool"""
# Get least busy browser (simplified)
browser = self.browsers[0]
context = await browser.new_context(record_video_dir=video_dir)
await self.monitor.start_run(
context=context,
automation_name=test_name
)
try:
yield context
finally:
await context.close()
# Usage
async def test_with_pool():
async with BrowserPool(size=3) as pool:
async def run_test(test_id):
async with pool.get_monitored_context(
f"Pool Test {test_id}",
f"./videos/test_{test_id}"
) as context:
page = await context.new_page()
await page.goto(f"https://example.com/test/{test_id}")
# Test logic here
# Run multiple tests concurrently using the pool
await asyncio.gather(*[run_test(i) for i in range(10)])
import asyncio
from dataclasses import dataclass
from typing import List
@dataclass
class TestTask:
name: str
url: str
success_criteria: str
async def queue_based_testing():
"""Process test tasks from a queue with concurrency control"""
monitor = ClioMonitor(api_key=os.getenv("CLIO_API_KEY"))
# Define test tasks
tasks = [
TestTask("Homepage Test", "https://example.com", "Page loads successfully"),
TestTask("About Page Test", "https://example.com/about", "About content visible"),
TestTask("Contact Test", "https://example.com/contact", "Contact form accessible"),
TestTask("Products Test", "https://example.com/products", "Product list displays"),
TestTask("Services Test", "https://example.com/services", "Services listed"),
]
# Create task queue
queue = asyncio.Queue()
for task in tasks:
await queue.put(task)
async def worker(worker_id):
"""Worker function to process tasks"""
while True:
try:
# Get task from queue
task = await asyncio.wait_for(queue.get(), timeout=1.0)
async with async_playwright() as p:
browser = await p.chromium.launch()
context = await browser.new_context(
record_video_dir=f"./videos/worker_{worker_id}"
)
await monitor.start_run(
context=context,
automation_name=f"Worker {worker_id}: {task.name}",
success_criteria=task.success_criteria
)
# Execute test
page = await context.new_page()
await page.goto(task.url)
await page.wait_for_load_state("networkidle")
print(f"Worker {worker_id} completed: {task.name}")
await context.close()
await browser.close()
# Mark task as done
queue.task_done()
except asyncio.TimeoutError:
# No more tasks, worker can exit
break
except Exception as e:
print(f"Worker {worker_id} error: {e}")
queue.task_done()
# Start workers
num_workers = 3
workers = [asyncio.create_task(worker(i)) for i in range(num_workers)]
# Wait for all tasks to complete
await queue.join()
# Cancel workers
for worker in workers:
worker.cancel()
print("✅ All queued tests completed")
async def shared_browser_testing():
"""Multiple contexts sharing a single browser instance"""
monitor = ClioMonitor(api_key=os.getenv("CLIO_API_KEY"))
async with async_playwright() as p:
# Single browser instance
browser = await p.chromium.launch()
async def create_monitored_context(test_name, video_subdir):
"""Helper to create monitored context"""
context = await browser.new_context(
record_video_dir=f"./videos/{video_subdir}"
)
await monitor.start_run(
context=context,
automation_name=test_name
)
return context
# Create multiple contexts from same browser
contexts = await asyncio.gather(
create_monitored_context("Multi-Context Test 1", "ctx1"),
create_monitored_context("Multi-Context Test 2", "ctx2"),
create_monitored_context("Multi-Context Test 3", "ctx3")
)
async def run_test_in_context(context, test_id):
"""Run test in specific context"""
page = await context.new_page()
await page.goto(f"https://httpbin.org/delay/{test_id}")
await page.wait_for_load_state("networkidle")
await context.close()
# Run tests concurrently in different contexts
await asyncio.gather(*[
run_test_in_context(ctx, i)
for i, ctx in enumerate(contexts, 1)
])
await browser.close()
import asyncio
from asyncio import Semaphore
async def rate_limited_testing():
"""Control concurrency to respect rate limits"""
monitor = ClioMonitor(api_key=os.getenv("CLIO_API_KEY"))
# Limit concurrent uploads to respect API rate limits
upload_semaphore = Semaphore(2) # Max 2 concurrent uploads
async def rate_limited_test(test_id):
"""Test with rate limiting"""
async with upload_semaphore: # Acquire semaphore
async with async_playwright() as p:
browser = await p.chromium.launch()
context = await browser.new_context(
record_video_dir=f"./videos/rate_limited_{test_id}"
)
await monitor.start_run(
context=context,
automation_name=f"Rate Limited Test {test_id}"
)
page = await context.new_page()
await page.goto("https://example.com")
await page.wait_for_timeout(2000) # Simulate test work
print(f"Starting upload for test {test_id}")
await context.close() # Upload happens here
await browser.close()
print(f"Completed test {test_id}")
# Semaphore released automatically
# Start many tests, but uploads are rate-limited
await asyncio.gather(*[
rate_limited_test(i) for i in range(10)
])
async def isolated_concurrent_tests():
"""Ensure one test failure doesn't affect others"""
monitor = ClioMonitor(
api_key=os.getenv("CLIO_API_KEY"),
raise_on_error=False # Don't let monitoring errors propagate
)
async def safe_test(test_name, should_fail=False):
"""Test with isolated error handling"""
try:
async with async_playwright() as p:
browser = await p.chromium.launch()
context = await browser.new_context(record_video_dir="./videos")
await monitor.start_run(context=context, automation_name=test_name)
page = await context.new_page()
if should_fail:
# Simulate a test failure
await page.goto("https://nonexistent-site-12345.com")
else:
await page.goto("https://example.com")
await context.close()
await browser.close()
print(f"✅ {test_name} completed")
return True
except Exception as e:
print(f"❌ {test_name} failed: {e}")
return False
# Run tests with some failures - others should continue
results = await asyncio.gather(
safe_test("Good Test 1"),
safe_test("Failing Test", should_fail=True), # This will fail
safe_test("Good Test 2"),
safe_test("Good Test 3"),
return_exceptions=True # Don't stop on exceptions
)
# Check results
successful = sum(1 for result in results if result is True)
print(f"✅ {successful}/{len(results)} tests passed")
# Good - reuse monitor instance
monitor = ClioMonitor(api_key="clio_key")
async def test1():
await monitor.start_run(context1, "Test 1")
async def test2():
await monitor.start_run(context2, "Test 2")
# Avoid - creating multiple monitors unnecessarily
async def test3():
monitor3 = ClioMonitor(api_key="clio_key") # Unnecessary
await monitor3.start_run(context3, "Test 3")
# Good - separate directories prevent file conflicts
context1 = await browser.new_context(record_video_dir="./videos/test1")
context2 = await browser.new_context(record_video_dir="./videos/test2")
# Avoid - same directory can cause conflicts
context1 = await browser.new_context(record_video_dir="./videos") # Same dir
context2 = await browser.new_context(record_video_dir="./videos") # Same dir
async def proper_cleanup():
"""Ensure resources are cleaned up even with failures"""
contexts = []
browsers = []
try:
async with async_playwright() as p:
# Create resources
for i in range(3):
browser = await p.chromium.launch()
browsers.append(browser)
context = await browser.new_context(record_video_dir=f"./videos/{i}")
contexts.append(context)
await monitor.start_run(context, f"Cleanup Test {i}")
# Run tests concurrently
await asyncio.gather(*[
run_test_in_context(ctx) for ctx in contexts
])
finally:
# Cleanup in reverse order
await asyncio.gather(*[ctx.close() for ctx in contexts], return_exceptions=True)
await asyncio.gather(*[browser.close() for browser in browsers], return_exceptions=True)
import psutil
import asyncio
async def monitor_resource_usage():
"""Monitor system resources during concurrent testing"""
process = psutil.Process()
async def log_resources():
while True:
memory_mb = process.memory_info().rss / 1024 / 1024
cpu_percent = process.cpu_percent()
print(f"Memory: {memory_mb:.1f} MB, CPU: {cpu_percent:.1f}%")
await asyncio.sleep(5)
# Start resource monitoring
monitor_task = asyncio.create_task(log_resources())
try:
# Run your concurrent tests
await isolated_concurrent_tests()
finally:
monitor_task.cancel()

The Clio SDK’s thread safety features allow you to scale your automation testing while maintaining reliable monitoring across all your test scenarios.