1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
|
from __future__ import annotations
try:
from platformdirs import user_config_dir
from undetected_chromedriver import Chrome, ChromeOptions
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import NoSuchElementException
has_requirements = True
except ImportError:
from typing import Type as WebDriver, Callable as user_config_dir
has_requirements = False
import time
from shutil import which
from os import path
from os import access, R_OK
from .typing import Cookies
from .errors import MissingRequirementsError
from . import debug
try:
from pyvirtualdisplay import Display
has_pyvirtualdisplay = True
except ImportError:
has_pyvirtualdisplay = False
try:
from undetected_chromedriver import Chrome as _Chrome, ChromeOptions
from seleniumwire.webdriver import InspectRequestsMixin, DriverCommonMixin
class Chrome(InspectRequestsMixin, DriverCommonMixin, _Chrome):
def __init__(self, *args, options=None, seleniumwire_options={}, **kwargs):
if options is None:
options = ChromeOptions()
options.add_argument('--proxy-bypass-list=<-loopback>')
config = self._setup_backend(seleniumwire_options)
options.add_argument(f"--proxy-server={config['proxy']['httpProxy']}")
options.add_argument("--ignore-certificate-errors")
super().__init__(*args, options=options, **kwargs)
has_seleniumwire = True
except:
from undetected_chromedriver import Chrome, ChromeOptions
has_seleniumwire = False
def get_browser(
user_data_dir: str = None,
headless: bool = False,
proxy: str = None,
options: ChromeOptions = None
) -> WebDriver:
"""
Creates and returns a Chrome WebDriver with specified options.
Args:
user_data_dir (str, optional): Directory for user data. If None, uses default directory.
headless (bool, optional): Whether to run the browser in headless mode. Defaults to False.
proxy (str, optional): Proxy settings for the browser. Defaults to None.
options (ChromeOptions, optional): ChromeOptions object with specific browser options. Defaults to None.
Returns:
WebDriver: An instance of WebDriver configured with the specified options.
"""
if not has_requirements:
raise MissingRequirementsError('Install "undetected_chromedriver" and "platformdirs" package')
if user_data_dir is None:
user_data_dir = user_config_dir("g4f")
if user_data_dir and debug.logging:
print("Open browser with config dir:", user_data_dir)
if not options:
options = ChromeOptions()
if proxy:
options.add_argument(f'--proxy-server={proxy}')
# Check for system driver in docker
driver = which('chromedriver') or '/usr/bin/chromedriver'
if not path.isfile(driver) or not access(driver, R_OK):
driver = None
return Chrome(
options=options,
user_data_dir=user_data_dir,
driver_executable_path=driver,
headless=headless,
patcher_force_close=True
)
def get_driver_cookies(driver: WebDriver) -> Cookies:
"""
Retrieves cookies from the specified WebDriver.
Args:
driver (WebDriver): The WebDriver instance from which to retrieve cookies.
Returns:
dict: A dictionary containing cookies with their names as keys and values as cookie values.
"""
return {cookie["name"]: cookie["value"] for cookie in driver.get_cookies()}
def bypass_cloudflare(driver: WebDriver, url: str, timeout: int) -> None:
"""
Attempts to bypass Cloudflare protection when accessing a URL using the provided WebDriver.
Args:
driver (WebDriver): The WebDriver to use for accessing the URL.
url (str): The URL to access.
timeout (int): Time in seconds to wait for the page to load.
Raises:
Exception: If there is an error while bypassing Cloudflare or loading the page.
"""
driver.get(url)
if driver.find_element(By.TAG_NAME, "body").get_attribute("class") == "no-js":
if debug.logging:
print("Cloudflare protection detected:", url)
# Open website in a new tab
element = driver.find_element(By.ID, "challenge-body-text")
driver.execute_script(f"""
arguments[0].addEventListener('click', () => {{
window.open(arguments[1]);
}});
""", element, url)
element.click()
time.sleep(5)
# Switch to the new tab and close the old tab
original_window = driver.current_window_handle
for window_handle in driver.window_handles:
if window_handle != original_window:
driver.close()
driver.switch_to.window(window_handle)
break
# Click on the challenge button in the iframe
try:
driver.switch_to.frame(driver.find_element(By.CSS_SELECTOR, "#turnstile-wrapper iframe"))
WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "#challenge-stage input"))
).click()
except NoSuchElementException:
...
except Exception as e:
if debug.logging:
print(f"Error bypassing Cloudflare: {str(e).splitlines()[0]}")
#driver.switch_to.default_content()
driver.switch_to.window(window_handle)
driver.execute_script("document.href = document.href;")
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "body:not(.no-js)"))
)
class WebDriverSession:
"""
Manages a Selenium WebDriver session, including handling of virtual displays and proxies.
"""
def __init__(
self,
webdriver: WebDriver = None,
user_data_dir: str = None,
headless: bool = False,
virtual_display: bool = False,
proxy: str = None,
options: ChromeOptions = None
):
"""
Initializes a new instance of the WebDriverSession.
Args:
webdriver (WebDriver, optional): A WebDriver instance for the session. Defaults to None.
user_data_dir (str, optional): Directory for user data. Defaults to None.
headless (bool, optional): Whether to run the browser in headless mode. Defaults to False.
virtual_display (bool, optional): Whether to use a virtual display. Defaults to False.
proxy (str, optional): Proxy settings for the browser. Defaults to None.
options (ChromeOptions, optional): ChromeOptions for the browser. Defaults to None.
"""
self.webdriver = webdriver
self.user_data_dir = user_data_dir
self.headless = headless
self.virtual_display = Display(size=(1920, 1080)) if has_pyvirtualdisplay and virtual_display else None
self.proxy = proxy
self.options = options
self.default_driver = None
def reopen(
self,
user_data_dir: str = None,
headless: bool = False,
virtual_display: bool = False
) -> WebDriver:
"""
Reopens the WebDriver session with new settings.
Args:
user_data_dir (str, optional): Directory for user data. Defaults to current value.
headless (bool, optional): Whether to run the browser in headless mode. Defaults to current value.
virtual_display (bool, optional): Whether to use a virtual display. Defaults to current value.
Returns:
WebDriver: The reopened WebDriver instance.
"""
user_data_dir = user_data_dir or self.user_data_dir
if self.default_driver:
self.default_driver.quit()
if not virtual_display and self.virtual_display:
self.virtual_display.stop()
self.virtual_display = None
self.default_driver = get_browser(user_data_dir, headless, self.proxy)
return self.default_driver
def __enter__(self) -> WebDriver:
"""
Context management method for entering a session. Initializes and returns a WebDriver instance.
Returns:
WebDriver: An instance of WebDriver for this session.
"""
if self.webdriver:
return self.webdriver
if self.virtual_display:
self.virtual_display.start()
self.default_driver = get_browser(self.user_data_dir, self.headless, self.proxy, self.options)
return self.default_driver
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Context management method for exiting a session. Closes and quits the WebDriver.
Args:
exc_type: Exception type.
exc_val: Exception value.
exc_tb: Exception traceback.
Note:
Closes the WebDriver and stops the virtual display if used.
"""
if self.default_driver:
try:
self.default_driver.close()
except Exception as e:
if debug.logging:
print(f"Error closing WebDriver: {str(e).splitlines()[0]}")
finally:
self.default_driver.quit()
if self.virtual_display:
self.virtual_display.stop()
def element_send_text(element: WebElement, text: str) -> None:
script = "arguments[0].innerText = arguments[1];"
element.parent.execute_script(script, element, text)
element.send_keys(Keys.ENTER)
|