import argparse import base64 import concurrent.futures import sys from bs4 import BeautifulSoup import cloudscraper import urllib3 import ssl from requests.adapters import HTTPAdapter from urllib3.poolmanager import PoolManager from argon2 import low_level, Type import threading urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class SSLAdapter(HTTPAdapter): def __init__(self, ssl_context=None, **kwargs): self.ssl_context = ssl_context super().__init__(**kwargs) def init_poolmanager(self, *args, **kwargs): kwargs['ssl_context'] = self.ssl_context return super().init_poolmanager(*args, **kwargs) def proxy_manager_for(self, *args, **kwargs): kwargs['ssl_context'] = self.ssl_context return super().proxy_manager_for(*args, **kwargs) def hash_nonce(nonce, salt, target, t, m, p, hl, verbose, lock): try: hash_bytes = low_level.hash_secret_raw( secret=nonce.encode('utf-8'), salt=salt, time_cost=t, memory_cost=m, parallelism=p, hash_len=hl, type=Type.ID ) hash_value = int.from_bytes(hash_bytes, byteorder='big') if verbose: with lock: print(f"Testing nonce: {nonce}, Hash Value: {hash_value}") if hash_value <= target: return True, nonce except Exception as e: with lock: print(f"Error hashing nonce {nonce}: {e}") return False, None def find_valid_nonce(challenge_data, max_attempts=100000, verbose=False): print("Starting to find solution...") challenge = challenge_data.get("challenge", {}) t = challenge.get("t") m = challenge.get("m") p = challenge.get("p") hl = challenge.get("hl") d = challenge.get("d") s = challenge.get("s") if not all([t, m, p, hl, d, s]): print("Incomplete challenge data.") return None try: target = int(d.rstrip('n')) except ValueError: print("Invalid target value in challenge data.") return None try: salt = bytes.fromhex(s) except ValueError: print("Invalid salt format in challenge data.") return None lock = threading.Lock() with concurrent.futures.ThreadPoolExecutor() as executor: futures = { executor.submit( hash_nonce, str(nonce), salt, target, t, m, p, hl, verbose, lock ): nonce for nonce in range(max_attempts) } for future in concurrent.futures.as_completed(futures): result, valid_nonce = future.result() if result: print(f"Solution found: {valid_nonce}") executor.shutdown(wait=False, cancel_futures=True) return valid_nonce print("No solution found within the max_try limit.") return None def solve_captcha(url, key, verbose=False): ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE adapter = SSLAdapter(ssl_context=ssl_context) scraper = cloudscraper.create_scraper() scraper.mount("https://", adapter) scraper.mount("http://", adapter) print(f"Accessing captcha page at {url}") try: response = scraper.get(url) except Exception as e: print(f"Exception occurred while accessing captcha page: {e}") return if not response.ok: print(f"Failed to access captcha page, status code: {response.status_code}") return soup = BeautifulSoup(response.content, "html.parser") csrf_input = soup.find("input", {"name": "_csrf"}) if not csrf_input: print("CSRF token not found on the captcha page.") return csrf = csrf_input.get("value") print(f"CSRF token obtained: {csrf}") print("Requesting new captcha challenge...") challenge_payload = {"action": "new", "_csrf": csrf} if key: challenge_payload["proxyKey"] = key try: challenge_response = scraper.post( f"{url}/challenge", headers={ "Accept": "*/*", "Referer": url, "Origin": '/'.join(url.split('/')[:3]), }, json=challenge_payload ) except Exception as e: print(f"Exception occurred while requesting captcha challenge: {e}") return if not challenge_response.ok: print(f"Failed to get a captcha challenge, status code: {challenge_response.status_code}") return try: data = challenge_response.json() except ValueError: print("Failed to parse JSON from challenge response.") return print(f"Captcha challenge data received: {data}") solution = find_valid_nonce(data, verbose=verbose) if solution is None: print("Failed to compute a solution.") return print(f"Found a solution: {solution}") data["solution"] = solution data["_csrf"] = csrf if key: data["proxyKey"] = key print("Submitting solution...") try: verify_response = scraper.post( f"{url}/verify", headers={ "Accept": "*/*", "Referer": url, "Origin": '/'.join(url.split('/')[:3]), }, json=data ) except Exception as e: print(f"Exception occurred while submitting solution: {e}") return if not verify_response.ok: print(f"Failed to submit solution, status code: {verify_response.status_code}") return try: verification = verify_response.json() except ValueError: print("Failed to parse JSON from verification response.") return print(f"Verification response: {verification}") if not verification.get("success"): print("Solution was incorrect.") return token = verification.get("token") if token: print(f"Generated token: {token}") else: print("Token not found in the verification response.") def main(): parser = argparse.ArgumentParser(description="Captcha Solver Script") parser.add_argument("--url", required=True, help="The URL of the captcha page (e.g., https://example.com/user/captcha)") parser.add_argument("--key", required=True, help="Proxy key/password required by the captcha challenge") parser.add_argument("--verbose", action='store_true', help="Enable verbose debugging output") args = parser.parse_args() solve_captcha(args.url, args.key, args.verbose) if __name__ == "__main__": main()