1 | from playwright.sync_api import sync_playwright |
2 | |
3 | class BrowserToolParameters(BaseModel): |
4 | """Parameters for browser interaction commands.""" |
5 | |
6 | command: Literal[ |
7 | "go_to", # Navigate to a URL |
8 | "get_html", # Get current page HTML |
9 | "evaluate", # Run JavaScript code |
10 | "click", # Click on an element |
11 | "type", # Type into an element |
12 | "screenshot", # Take a screenshot |
13 | "get_text", # Get text content of element |
14 | "get_attribute", # Get attribute of element |
15 | ] = Field( |
16 | description="The browser command to execute. Required parameters per command:\n" |
17 | "- go_to: requires 'url'\n" |
18 | "- evaluate: requires 'code'\n" |
19 | "- click: requires 'selector'\n" |
20 | "- type: requires 'selector' and 'text'\n" |
21 | "- get_text: requires 'selector'\n" |
22 | "- get_attribute: requires 'selector' and 'attribute'\n" |
23 | "- get_html: no additional parameters\n" |
24 | "- screenshot: no additional parameters" |
25 | ) |
26 | url: Optional[str] = Field( |
27 | None, description="URL for go_to command (required for go_to)" |
28 | ) |
29 | selector: Optional[str] = Field( |
30 | None, |
31 | description="CSS selector for element operations (required for click, type, get_text, get_attribute)", |
32 | ) |
33 | code: Optional[str] = Field( |
34 | None, description="JavaScript code for evaluate command (required for evaluate)" |
35 | ) |
36 | text: Optional[str] = Field( |
37 | None, description="Text to type for type command (required for type)" |
38 | ) |
39 | timeout: Optional[int] = Field( |
40 | 30000, description="Timeout in milliseconds for operations" |
41 | ) |
42 | attribute: Optional[str] = Field( |
43 | None, |
44 | description="Attribute name for get_attribute command (required for get_attribute)", |
45 | ) |
46 | |
47 | |
48 | class BrowserTool(Tool): |
49 | """A browser interaction tool that allows the agent to interact with a browser.""" |
50 | |
51 | _instance: Union[UbuntuInstance, BrowserInstance] |
52 | |
53 | def __init__(self, instance: Union[UbuntuInstance, BrowserInstance]) -> None: |
54 | super().__init__( |
55 | name="browser", |
56 | description="Interact with a browser for web scraping and automation", |
57 | parameters=BrowserToolParameters, |
58 | ) |
59 | self._instance = instance |
60 | |
61 | def __call__(self, **kwargs: Any) -> Any: |
62 | params = BrowserToolParameters.model_validate(kwargs) |
63 | command = params.command |
64 | url = params.url |
65 | selector = params.selector |
66 | code = params.code |
67 | text = params.text |
68 | timeout = params.timeout or 30000 |
69 | attribute = params.attribute |
70 | |
71 | cdp_url = self._instance.browser.get_cdp_url().cdp_url |
72 | if cdp_url is None: |
73 | raise ValueError("CDP URL is not available, start the browser first") |
74 | |
75 | with sync_playwright() as playwright: |
76 | browser = playwright.chromium.connect_over_cdp(cdp_url) |
77 | context = browser.contexts[0] |
78 | if not context.pages: |
79 | page = context.new_page() |
80 | else: |
81 | page = context.pages[0] |
82 | |
83 | try: |
84 | if command == "go_to": |
85 | if not url: |
86 | raise ValueError("URL is required for go_to command") |
87 | page.goto(url, timeout=timeout) |
88 | return True |
89 | |
90 | elif command == "get_html": |
91 | try: |
92 | return page.evaluate("() => document.documentElement.outerHTML") |
93 | except Exception: |
94 | # If page is navigating, just return what we can get |
95 | return page.evaluate("() => document.documentElement.innerHTML") |
96 | |
97 | elif command == "evaluate": |
98 | if not code: |
99 | raise ValueError("Code is required for evaluate command") |
100 | return page.evaluate(code) |
101 | |
102 | elif command == "click": |
103 | if not selector: |
104 | raise ValueError("Selector is required for click command") |
105 | page.click(selector, timeout=timeout) |
106 | return True |
107 | |
108 | elif command == "type": |
109 | if not selector: |
110 | raise ValueError("Selector is required for type command") |
111 | if not text: |
112 | raise ValueError("Text is required for type command") |
113 | page.type(selector, text, timeout=timeout) |
114 | return True |
115 | |
116 | elif command == "screenshot": |
117 | return image_result( |
118 | base64.b64encode(page.screenshot(type="png")).decode("utf-8") |
119 | ) |
120 | |
121 | elif command == "get_text": |
122 | if not selector: |
123 | raise ValueError("Selector is required for get_text command") |
124 | element = page.wait_for_selector(selector, timeout=timeout) |
125 | if element is None: |
126 | raise ValueError(f"Element not found: {selector}") |
127 | return element.text_content() |
128 | |
129 | elif command == "get_attribute": |
130 | if not selector: |
131 | raise ValueError( |
132 | "Selector is required for get_attribute command" |
133 | ) |
134 | if not attribute: |
135 | raise ValueError( |
136 | "Attribute is required for get_attribute command" |
137 | ) |
138 | element = page.wait_for_selector(selector, timeout=timeout) |
139 | if element is None: |
140 | raise ValueError(f"Element not found: {selector}") |
141 | return element.get_attribute(attribute) |
142 | |
143 | else: |
144 | raise ValueError(f"Unknown command: {command}") |
145 | |
146 | except Exception as e: |
147 | raise ValueError(f"Browser command failed: {str(e)}") |
148 | |
149 | finally: |
150 | browser.close() |