diff --git a/fbchat/_client.py b/fbchat/_client.py index 50c842e..3b9db45 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -524,7 +524,9 @@ class Client: data = {"voice_clip": voice_clip} j = self.session._payload_post( - "https://upload.facebook.com/ajax/mercury/upload.php", data, files=file_dict + "https://upload.messenger.com/ajax/mercury/upload.php", + data, + files=file_dict, ) if len(j["metadata"]) != len(file_dict): diff --git a/fbchat/_listen.py b/fbchat/_listen.py index 003c78d..e94650f 100644 --- a/fbchat/_listen.py +++ b/fbchat/_listen.py @@ -8,7 +8,7 @@ from . import _util, _exception, _session, _graphql, _events from typing import Iterable, Optional, Mapping, List -HOST = "edge-chat.facebook.com" +HOST = "edge-chat.messenger.com" TOPICS = [ # Things that happen in chats (e.g. messages) @@ -271,10 +271,10 @@ class Listener: headers = { "Cookie": get_cookie_header( - self.session._session, "https://edge-chat.facebook.com/chat" + self.session._session, "https://edge-chat.messenger.com/chat" ), "User-Agent": self.session._session.headers["User-Agent"], - "Origin": "https://www.facebook.com", + "Origin": "https://www.messenger.com", "Host": HOST, } diff --git a/fbchat/_session.py b/fbchat/_session.py index 024d56f..f705fc5 100644 --- a/fbchat/_session.py +++ b/fbchat/_session.py @@ -1,17 +1,43 @@ import attr -import bs4 import datetime -import re import requests import random -import urllib.parse +import re +import json from ._common import log, kw_only from . import _graphql, _util, _exception -from typing import Optional, Tuple, Mapping, Callable +from typing import Optional, Mapping, Callable, Any -FB_DTSG_REGEX = re.compile(r'name="fb_dtsg" value="(.*?)"') + +SERVER_JS_DEFINE_REGEX = re.compile(r'require\("ServerJSDefine"\)\)?\.handleDefines\(') +SERVER_JS_DEFINE_JSON_DECODER = json.JSONDecoder() + + +def parse_server_js_define(html: str) -> Mapping[str, Any]: + """Parse ``ServerJSDefine`` entries from a HTML document.""" + # Find points where we should start parsing + define_splits = SERVER_JS_DEFINE_REGEX.split(html) + + # Skip leading entry + _, *define_splits = define_splits + + rtn = [] + if not define_splits: + raise _exception.ParseError("Could not find any ServerJSDefine", data=html) + # Parse entries (should be two) + for entry in define_splits: + try: + parsed, _ = SERVER_JS_DEFINE_JSON_DECODER.raw_decode(entry, idx=0) + except json.JSONDecodeError as e: + raise _exception.ParseError("Invalid ServerJSDefine", data=entry) from e + if not isinstance(parsed, list): + raise _exception.ParseError("Invalid ServerJSDefine", data=parsed) + rtn.extend(parsed) + + # Convert to a dict + return _util.get_jsmods_define(rtn) def base36encode(number: int) -> str: @@ -32,7 +58,7 @@ def base36encode(number: int) -> str: def prefix_url(url: str) -> str: if url.startswith("/"): - return "https://www.facebook.com" + url + return "https://www.messenger.com" + url return url @@ -51,15 +77,11 @@ def get_user_id(session: requests.Session) -> str: return str(rtn) -def find_input_fields(html: str): - return bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("input")) - - def session_factory() -> requests.Session: from . import __version__ session = requests.session() - session.headers["Referer"] = "https://www.facebook.com" + session.headers["Referer"] = "https://www.messenger.com/" # We won't try to set a fake user agent to mask our presence! # Facebook allows us access anyhow, and it makes our motives clearer: # We're not trying to cheat Facebook, we simply want to access their service @@ -71,81 +93,25 @@ def client_id_factory() -> str: return hex(int(random.random() * 2 ** 31))[2:] -def is_home(url: str) -> bool: - parts = urllib.parse.urlparse(url) - # Check the urls `/home.php` and `/` - return "home" in parts.path or "/" == parts.path - - -def _2fa_helper(session: requests.Session, code: int, r): - soup = find_input_fields(r.text) - data = dict() - - url = "https://m.facebook.com/login/checkpoint/" - - data["approvals_code"] = str(code) - data["fb_dtsg"] = soup.find("input", {"name": "fb_dtsg"})["value"] - data["nh"] = soup.find("input", {"name": "nh"})["value"] - data["submit[Submit Code]"] = "Submit Code" - data["codes_submitted"] = "0" - log.info("Submitting 2FA code.") - - r = session.post(url, data=data) - - if is_home(r.url): - return r - - del data["approvals_code"] - del data["submit[Submit Code]"] - del data["codes_submitted"] - - data["name_action_selected"] = "save_device" - data["submit[Continue]"] = "Continue" - log.info("Saving browser.") - # At this stage, we have dtsg, nh, name_action_selected, submit[Continue] - r = session.post(url, data=data) - - if is_home(r.url): - return r - - del data["name_action_selected"] - log.info("Starting Facebook checkup flow.") - # At this stage, we have dtsg, nh, submit[Continue] - r = session.post(url, data=data) - - if is_home(r.url): - return r - - del data["submit[Continue]"] - data["submit[This was me]"] = "This Was Me" - log.info("Verifying login attempt.") - # At this stage, we have dtsg, nh, submit[This was me] - r = session.post(url, data=data) - - if is_home(r.url): - return r - - del data["submit[This was me]"] - data["submit[Continue]"] = "Continue" - data["name_action_selected"] = "save_device" - log.info("Saving device again.") - # At this stage, we have dtsg, nh, submit[Continue], name_action_selected - r = session.post(url, data=data) - return r - - -def get_error_data(html: str, url: str) -> Tuple[Optional[int], Optional[str]]: - """Get error code and message from a request.""" - code = None - try: - code = int(_util.get_url_parameter(url, "e")) - except (TypeError, ValueError): - pass +def get_error_data(html: str) -> Optional[str]: + """Get error message from a request.""" + # Only import when required + import bs4 soup = bs4.BeautifulSoup( - html, "html.parser", parse_only=bs4.SoupStrainer("div", id="login_error") + html, "html.parser", parse_only=bs4.SoupStrainer("form", id="login_form") ) - return code, soup.get_text() or None + # Attempt to extract and format the error string + # The error message is in the user's own language! + return ". ".join(list(soup.stripped_strings)[:2]) or None + + +def get_fb_dtsg(define) -> Optional[str]: + if "DTSGInitData" in define: + return define["DTSGInitData"]["token"] + elif "DTSGInitialData" in define: + return define["DTSGInitialData"]["token"] + return None @attr.s(slots=True, kw_only=kw_only, repr=False, eq=False) @@ -161,7 +127,6 @@ class Session: _session = attr.ib(factory=session_factory, type=requests.Session) _counter = attr.ib(0, type=int) _client_id = attr.ib(factory=client_id_factory, type=str) - _logout_h = attr.ib(None, type=Optional[str]) @property def user(self): @@ -206,53 +171,49 @@ class Session: """ session = session_factory() - try: - r = session.get("https://m.facebook.com/") - except requests.RequestException as e: - _exception.handle_requests_error(e) - soup = find_input_fields(r.text) - - data = dict( - (elem["name"], elem["value"]) - for elem in soup - if elem.has_attr("value") and elem.has_attr("name") - ) - data["email"] = email - data["pass"] = password - data["login"] = "Log In" + data = { + # "jazoest": "2754", + # "lsd": "AVqqqRUa", + "initial_request_id": "x", # any, just has to be present + # "timezone": "-120", + # "lgndim": "eyJ3IjoxNDQwLCJoIjo5MDAsImF3IjoxNDQwLCJhaCI6ODc3LCJjIjoyNH0=", + # "lgnrnd": "044039_RGm9", + # "lgnjs": "n", + "email": email, + "pass": password, + # "login": "1", + # "persistent": "1", + # "default_persistent": "0", + } try: - url = "https://m.facebook.com/login.php?login_attempt=1" - r = session.post(url, data=data) - except requests.RequestException as e: - _exception.handle_requests_error(e) - - # Usually, 'Checkpoint' will refer to 2FA - if "checkpoint" in r.url and ('id="approvals_code"' in r.text.lower()): - if not on_2fa_callback: - raise ValueError( - "2FA code required, please add `on_2fa_callback` to .login" - ) - code = on_2fa_callback() - try: - r = _2fa_helper(session, code, r) - except requests.RequestException as e: - _exception.handle_requests_error(e) - - # Sometimes Facebook tries to show the user a "Save Device" dialog - if "save-device" in r.url: - try: - r = session.get("https://m.facebook.com/login/save-device/cancel/") - except requests.RequestException as e: - _exception.handle_requests_error(e) - - if is_home(r.url): - return cls._from_session(session=session) - else: - code, msg = get_error_data(r.text, r.url) - raise _exception.ExternalError( - "Login failed at url {!r}".format(r.url), msg, code=code + # Should hit a redirect to https://www.messenger.com/ + # If this does happen, the session is logged in! + r = session.post( + "https://www.messenger.com/login/password/", + data=data, + allow_redirects=False, ) + except requests.RequestException as e: + _exception.handle_requests_error(e) + _exception.handle_http_error(r.status_code) + + # TODO: Re-add 2FA + if False: + if not on_2fa_callback: + raise _exception.NotLoggedIn( + "2FA code required! Please supply `on_2fa_callback` to .login" + ) + _ = on_2fa_callback() + + if r.headers.get("Location") != "https://www.messenger.com/": + error = get_error_data(r.content.decode("utf-8")) + raise _exception.NotLoggedIn("Failed logging in: {}".format(error or r.url)) + + try: + return cls._from_session(session=session) + except _exception.NotLoggedIn as e: + raise _exception.ParseError("Failed loading session", data=r) from e def is_logged_in(self) -> bool: """Send a request to Facebook to check the login status. @@ -264,12 +225,12 @@ class Session: >>> assert session.is_logged_in() """ # Send a request to the login url, to see if we're directed to the home page - url = "https://m.facebook.com/login.php?login_attempt=1" try: - r = self._session.get(url, allow_redirects=False) + r = self._session.get(prefix_url("/login/"), allow_redirects=False) except requests.RequestException as e: _exception.handle_requests_error(e) - return "Location" in r.headers and is_home(r.headers["Location"]) + _exception.handle_http_error(r.status_code) + return "https://www.messenger.com/" == r.headers.get("Location") def logout(self) -> None: """Safely log out the user. @@ -279,56 +240,51 @@ class Session: Example: >>> session.logout() """ - logout_h = self._logout_h - if not logout_h: - url = prefix_url("/bluebar/modern_settings_menu/") - try: - h_r = self._session.post(url, data={"pmid": "4"}) - except requests.RequestException as e: - _exception.handle_requests_error(e) - logout_h = re.search(r'name=\\"h\\" value=\\"(.*?)\\"', h_r.text).group(1) - - url = prefix_url("/logout.php") + data = {"fb_dtsg": self._fb_dtsg} try: - r = self._session.get(url, params={"ref": "mb", "h": logout_h}) + r = self._session.post( + prefix_url("/logout/"), data=data, allow_redirects=False + ) except requests.RequestException as e: _exception.handle_requests_error(e) _exception.handle_http_error(r.status_code) + if "Location" not in r.headers: + raise _exception.FacebookError("Failed logging out, was not redirected!") + if "https://www.messenger.com/login/" != r.headers["Location"]: + raise _exception.FacebookError( + "Failed logging out, got bad redirect: {}".format(r.headers["Location"]) + ) + @classmethod def _from_session(cls, session): # TODO: Automatically set user_id when the cookie changes in the session user_id = get_user_id(session) + # Make a request to the main page to retrieve ServerJSDefine entries try: - r = session.get(prefix_url("/")) + r = session.get(prefix_url("/"), allow_redirects=False) except requests.RequestException as e: _exception.handle_requests_error(e) + _exception.handle_http_error(r.status_code) - soup = find_input_fields(r.text) + define = parse_server_js_define(r.content.decode("utf-8")) - fb_dtsg_element = soup.find("input", {"name": "fb_dtsg"}) - if fb_dtsg_element: - fb_dtsg = fb_dtsg_element["value"] - else: - # Fall back to searching with a regex - res = FB_DTSG_REGEX.search(r.text) - if not res: - raise _exception.NotLoggedIn("Could not find fb_dtsg") - fb_dtsg = res.group(1) + fb_dtsg = get_fb_dtsg(define) + if fb_dtsg is None: + raise _exception.ParseError("Could not find fb_dtsg", data=define) + if not fb_dtsg: + # Happens when the client is not actually logged in + raise _exception.NotLoggedIn( + "Found empty fb_dtsg, the session was probably invalid." + ) - revision = int(r.text.split('"client_revision":', 1)[1].split(",", 1)[0]) + try: + revision = int(define["SiteData"]["client_revision"]) + except TypeError: + raise _exception.ParseError("Could not find client revision", data=define) - logout_h_element = soup.find("input", {"name": "h"}) - logout_h = logout_h_element["value"] if logout_h_element else None - - return cls( - user_id=user_id, - fb_dtsg=fb_dtsg, - revision=revision, - session=session, - logout_h=logout_h, - ) + return cls(user_id=user_id, fb_dtsg=fb_dtsg, revision=revision, session=session) def get_cookies(self) -> Mapping[str, str]: """Retrieve session cookies, that can later be used in `from_cookies`. @@ -383,10 +339,9 @@ class Session: # update fb_dtsg token if received in response if "jsmods" in j: define = _util.get_jsmods_define(j["jsmods"]["define"]) - if "DTSGInitData" in define: - self._fb_dtsg = define["DTSGInitData"]["token"] - elif "DTSGInitialData" in define: - self._fb_dtsg = define["DTSGInitialData"]["token"] + fb_dtsg = get_fb_dtsg(define) + if fb_dtsg: + self._fb_dtsg = fb_dtsg try: return j["payload"]