Use Selenium WebDriver with Orchestrator for cross-platform web automation
pip install selenium requests
import requests
import time
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
def connect_to_orchestrator():
api_key = 'orch_your_api_key_here'
# Create a new browser session
session_response = requests.post(
'https://api.orchestratorhq.com/api/sessions',
headers={
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
},
json={
'session_name': 'Selenium Automation',
'duration': '1h'
}
)
session = session_response.json()
session_id = session['id']
print(f'Session created: {session_id}')
# Extract CDP URL and convert to Selenium format (session is active when API responds)
cdp_url = session['cdp_url']
# Convert ws://host:port/devtools/browser/id to http://host:port
server_url = cdp_url.replace('ws://', 'http://').split('/devtools')[0]
# Configure Chrome options for remote debugging
chrome_options = Options()
chrome_options.add_experimental_option("debuggerAddress", server_url.replace('http://', ''))
# Connect to the remote browser
driver = webdriver.Chrome(options=chrome_options)
return driver, session_id, api_key
# Usage example
def main():
driver, session_id, api_key = connect_to_orchestrator()
try:
# Your automation code here
driver.get('https://example.com')
print(f'Page title: {driver.title}')
# Take a screenshot
driver.save_screenshot('example.png')
finally:
# Clean up
driver.quit()
# Stop the session
requests.delete(
f'https://api.orchestratorhq.com/api/sessions/{session_id}',
headers={'Authorization': f'Bearer {api_key}'}
)
if __name__ == '__main__':
main()
import requests
import time
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
def scrape_products():
# Connect to Orchestrator session
orchestrator = OrchestratorSelenium()
driver = orchestrator.connect_driver()
try:
# Navigate to product listing page
driver.get('https://books.toscrape.com/')
# Wait for products to load
wait = WebDriverWait(driver, 10)
products = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.product_pod')))
scraped_products = []
for product in products[:10]: # Scrape first 10 products
try:
title_element = product.find_element(By.CSS_SELECTOR, 'h3 a')
title = title_element.get_attribute('title')
price_element = product.find_element(By.CSS_SELECTOR, '.price_color')
price = price_element.text
rating_element = product.find_element(By.CSS_SELECTOR, '.star-rating')
rating = rating_element.get_attribute('class').split()[-1]
availability_element = product.find_element(By.CSS_SELECTOR, '.availability')
availability = availability_element.text.strip()
scraped_products.append({
'title': title,
'price': price,
'rating': rating,
'availability': availability
})
except Exception as e:
print(f'Error scraping product: {e}')
continue
print(f'Scraped {len(scraped_products)} products:')
for product in scraped_products:
print(f"- {product['title']}: {product['price']} ({product['rating']} stars)")
# Take screenshot of the page
driver.save_screenshot('books_page.png')
return scraped_products
finally:
orchestrator.cleanup()
if __name__ == '__main__':
products = scrape_products()
from selenium.webdriver.support.ui import Select
def automate_contact_form():
orchestrator = OrchestratorSelenium()
driver = orchestrator.connect_driver()
try:
# Navigate to contact form
driver.get('https://httpbin.org/forms/post')
# Fill text inputs
name_field = driver.find_element(By.NAME, 'custname')
name_field.send_keys('John Doe')
phone_field = driver.find_element(By.NAME, 'custtel')
phone_field.send_keys('+1-555-123-4567')
email_field = driver.find_element(By.NAME, 'custemail')
email_field.send_keys('john.doe@example.com')
# Select dropdown option
size_dropdown = Select(driver.find_element(By.NAME, 'size'))
size_dropdown.select_by_value('medium')
# Select radio button
topping_radio = driver.find_element(By.CSS_SELECTOR, 'input[name="topping"][value="bacon"]')
topping_radio.click()
# Fill textarea
comments_field = driver.find_element(By.NAME, 'comments')
comments_field.send_keys('This is an automated test using Selenium and Orchestrator.')
# Take screenshot before submission
driver.save_screenshot('form_filled.png')
# Submit form
submit_button = driver.find_element(By.CSS_SELECTOR, 'input[type="submit"]')
submit_button.click()
# Wait for response page
wait = WebDriverWait(driver, 10)
response_element = wait.until(EC.presence_of_element_located((By.TAG_NAME, 'pre')))
response_text = response_element.text
print('Form submission response:', response_text)
# Take screenshot of result
driver.save_screenshot('form_result.png')
finally:
orchestrator.cleanup()
if __name__ == '__main__':
automate_contact_form()
import org.openqa.selenium.WebDriver;
import org.openqa.selenium.chrome.ChromeDriver;
import org.openqa.selenium.chrome.ChromeOptions;
import org.openqa.selenium.By;
import org.openqa.selenium.WebElement;
import org.openqa.selenium.support.ui.WebDriverWait;
import org.openqa.selenium.support.ui.ExpectedConditions;
import java.time.Duration;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.JsonNode;
public class OrchestratorSelenium {
private static final String API_KEY = "orch_your_api_key_here";
private static final String BASE_URL = "https://api.orchestratorhq.com";
private WebDriver driver;
private String sessionId;
private HttpClient httpClient;
private ObjectMapper objectMapper;
public OrchestratorSelenium() {
this.httpClient = HttpClient.newHttpClient();
this.objectMapper = new ObjectMapper();
}
public WebDriver connectToOrchestrator() throws Exception {
// Create session
String sessionPayload = "{\"session_name\":\"Java Selenium\",\"duration\":\"1h\"}";
HttpRequest createRequest = HttpRequest.newBuilder()
.uri(URI.create(BASE_URL + "/api/sessions"))
.header("Authorization", "Bearer " + API_KEY)
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(sessionPayload))
.build();
HttpResponse<String> createResponse = httpClient.send(createRequest,
HttpResponse.BodyHandlers.ofString());
JsonNode sessionJson = objectMapper.readTree(createResponse.body());
this.sessionId = sessionJson.get("id").asText();
System.out.println("Session created: " + sessionId);
// Configure Chrome options (session is active when API responds)
String cdpUrl = sessionJson.get("cdp_url").asText();
String serverUrl = cdpUrl.replace("ws://", "http://").split("/devtools")[0];
ChromeOptions options = new ChromeOptions();
options.setExperimentalOption("debuggerAddress", serverUrl.replace("http://", ""));
this.driver = new ChromeDriver(options);
return driver;
}
public void cleanup() {
if (driver != null) {
try {
driver.quit();
} catch (Exception e) {
System.err.println("Error quitting driver: " + e.getMessage());
}
}
if (sessionId != null) {
try {
HttpRequest deleteRequest = HttpRequest.newBuilder()
.uri(URI.create(BASE_URL + "/api/sessions/" + sessionId))
.header("Authorization", "Bearer " + API_KEY)
.DELETE()
.build();
httpClient.send(deleteRequest, HttpResponse.BodyHandlers.ofString());
} catch (Exception e) {
System.err.println("Error stopping session: " + e.getMessage());
}
}
}
public static void main(String[] args) {
OrchestratorSelenium orchestrator = new OrchestratorSelenium();
try {
WebDriver driver = orchestrator.connectToOrchestrator();
// Your automation code
driver.get("https://example.com");
System.out.println("Page title: " + driver.getTitle());
// Find and interact with elements
WebDriverWait wait = new WebDriverWait(driver, Duration.ofSeconds(10));
WebElement heading = wait.until(ExpectedConditions.presenceOfElementLocated(By.tagName("h1")));
System.out.println("Heading text: " + heading.getText());
} catch (Exception e) {
e.printStackTrace();
} finally {
orchestrator.cleanup();
}
}
}
import pytest
import requests
import time
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
class TestWithOrchestrator:
@pytest.fixture(scope="class")
def orchestrator_driver(self):
"""Fixture to create and manage Orchestrator session"""
api_key = 'orch_your_api_key_here'
# Create session
session_response = requests.post(
'https://api.orchestratorhq.com/api/sessions',
headers={
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
},
json={
'session_name': 'pytest Session',
'duration': '30m'
}
)
session = session_response.json()
session_id = session['id']
# Connect driver (session is active when API responds)
cdp_url = session['cdp_url']
server_url = cdp_url.replace('ws://', 'http://').split('/devtools')[0]
chrome_options = Options()
chrome_options.add_experimental_option("debuggerAddress", server_url.replace('http://', ''))
driver = webdriver.Chrome(options=chrome_options)
yield driver
# Cleanup
driver.quit()
requests.delete(
f'https://api.orchestratorhq.com/api/sessions/{session_id}',
headers={'Authorization': f'Bearer {api_key}'}
)
def test_homepage_loads(self, orchestrator_driver):
"""Test that homepage loads correctly"""
driver = orchestrator_driver
driver.get('https://example.com')
assert 'Example Domain' in driver.title
heading = driver.find_element(By.TAG_NAME, 'h1')
assert 'Example Domain' in heading.text
def test_navigation_works(self, orchestrator_driver):
"""Test navigation functionality"""
driver = orchestrator_driver
driver.get('https://example.com')
# Check for "More information" link
wait = WebDriverWait(driver, 10)
more_info_link = wait.until(
EC.presence_of_element_located((By.PARTIAL_LINK_TEXT, 'More information'))
)
assert more_info_link.is_displayed()
# Take screenshot for visual verification
driver.save_screenshot('navigation_test.png')
def test_form_submission(self, orchestrator_driver):
"""Test form submission functionality"""
driver = orchestrator_driver
driver.get('https://httpbin.org/forms/post')
# Fill form
driver.find_element(By.NAME, 'custname').send_keys('Test User')
driver.find_element(By.NAME, 'custemail').send_keys('test@example.com')
# Submit
driver.find_element(By.CSS_SELECTOR, 'input[type="submit"]').click()
# Verify submission
wait = WebDriverWait(driver, 10)
response = wait.until(EC.presence_of_element_located((By.TAG_NAME, 'pre')))
assert 'Test User' in response.text
assert 'test@example.com' in response.text
if __name__ == '__main__':
pytest.main([__file__, '-v'])
Session Management
import atexit
class SafeOrchestratorSelenium:
def __init__(self):
self.driver = None
self.session_id = None
self.api_key = os.getenv('ORCHESTRATOR_API_KEY')
# Register cleanup function
atexit.register(self.cleanup)
def cleanup(self):
if self.driver:
try:
self.driver.quit()
except:
pass
if self.session_id:
try:
requests.delete(
f'https://api.orchestratorhq.com/api/sessions/{self.session_id}',
headers={'Authorization': f'Bearer {self.api_key}'}
)
except:
pass
Wait Strategies
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# Good - explicit wait
wait = WebDriverWait(driver, 10)
element = wait.until(EC.element_to_be_clickable((By.ID, 'submit-button')))
# Bad - arbitrary sleep
time.sleep(5)
element = driver.find_element(By.ID, 'submit-button')
Error Recovery
def retry_operation(operation, max_retries=3, delay=1):
for attempt in range(max_retries):
try:
return operation()
except Exception as e:
if attempt == max_retries - 1:
raise e
print(f'Attempt {attempt + 1} failed: {e}')
time.sleep(delay * (attempt + 1))
# Usage
retry_operation(lambda: driver.find_element(By.ID, 'dynamic-element').click())
Use VNC for Visual Debugging
Take Screenshots
driver.save_screenshot('debug.png')
.Log Page Source
with open('page.html', 'w') as f: f.write(driver.page_source)
.Check Console Logs
logs = driver.get_log('browser')
.