diff --git a/fbchat/_session.py b/fbchat/_session.py index f705fc5..6bf8f7d 100644 --- a/fbchat/_session.py +++ b/fbchat/_session.py @@ -4,6 +4,7 @@ import requests import random import re import json +import urllib.parse from ._common import log, kw_only from . import _graphql, _util, _exception @@ -93,6 +94,78 @@ def client_id_factory() -> str: return hex(int(random.random() * 2 ** 31))[2:] +def get_next_url(url: str) -> Optional[str]: + parsed_url = urllib.parse.urlparse(url) + query = urllib.parse.parse_qs(parsed_url.query) + return query.get("next", [None])[0] + + +def find_form_request(html: str): + # Only import when required + import bs4 + + soup = bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("form")) + + form = soup.form + if not form: + raise _exception.ParseError("Could not find form to submit", data=soup) + + url = form.get("action") + if not url: + raise _exception.ParseError("Could not find url to submit to", data=form) + + if url.startswith("/"): + url = "https://www.facebook.com" + url + + # It's okay to set missing values to something crap, the values are localized, and + # hence are not available in the raw HTML + data = { + x["name"]: x.get("value", "[missing]") + for x in form.find_all(["input", "button"]) + } + return url, data + + +def two_factor_helper(session: requests.Session, r, on_2fa_callback): + url, data = find_form_request(r.content.decode("utf-8")) + + # You don't have to type a code if your device is already saved + # Repeats if you get the code wrong + while "approvals_code" not in data: + data["approvals_code"] = on_2fa_callback() + log.info("Submitting 2FA code") + r = session.post(url, data=data, allow_redirects=False) + url, data = find_form_request(r.content.decode("utf-8")) + + # TODO: Can be missing if checkup flow was done on another device in the meantime? + if "name_action_selected" in data: + data["name_action_selected"] = "save_device" + log.info("Saving browser") + r = session.post(url, data=data, allow_redirects=False) + url, data = find_form_request(r.content.decode("utf-8")) + + log.info("Starting Facebook checkup flow") + r = session.post(url, data=data, allow_redirects=False) + + url, data = find_form_request(r.content.decode("utf-8")) + if "submit[This was me]" not in data or "submit[This wasn't me]" not in data: + raise _exception.ParseError("Could not fill out form properly (2)", data=data) + data["submit[This was me]"] = "[any value]" + del data["submit[This wasn't me]"] + log.info("Verifying login attempt") + r = session.post(url, data=data, allow_redirects=False) + + url, data = find_form_request(r.content.decode("utf-8")) + if "name_action_selected" not in data: + raise _exception.ParseError("Could not fill out form properly (3)", data=data) + data["name_action_selected"] = "save_device" + log.info("Saving device again") + r = session.post(url, data=data, allow_redirects=False) + + print(r.status_code, r.url, r.headers) + return r.headers.get("Location") + + def get_error_data(html: str) -> Optional[str]: """Get error message from a request.""" # Only import when required @@ -150,6 +223,7 @@ class Session: "fb_dtsg": self._fb_dtsg, } + # TODO: Add ability to load previous cookies in here, to avoid 2fa flow @classmethod def login( cls, email: str, password: str, on_2fa_callback: Callable[[], int] = None @@ -159,13 +233,25 @@ class Session: Args: email: Facebook ``email``, ``id`` or ``phone number`` password: Facebook account password - on_2fa_callback: Function that will be called, in case a 2FA code is needed. - This should return the requested 2FA code. + on_2fa_callback: Function that will be called, in case a two factor + authentication code is needed. This should return the requested code. + + Only tested with SMS codes, might not work with authentication apps. + + Note: Facebook limits the amount of codes they will give you, so if you + don't receive a code, be patient, and try again later! Example: - >>> import getpass >>> import fbchat - >>> session = fbchat.Session.login("", getpass.getpass()) + >>> import getpass + >>> session = fbchat.Session.login( + ... input("Email: "), + ... getpass.getpass(), + ... on_2fa_callback=lambda: input("2FA Code: ") + ... ) + Email: abc@gmail.com + Password: **** + 2FA Code: 123456 >>> session.user.id "1234" """ @@ -198,17 +284,34 @@ class Session: _exception.handle_requests_error(e) _exception.handle_http_error(r.status_code) - # TODO: Re-add 2FA - if False: + url = r.headers.get("Location") + + # We weren't redirected, hence the email or password was wrong + if not url: + error = get_error_data(r.content.decode("utf-8")) + raise _exception.NotLoggedIn(error) + + if "checkpoint" in url: if not on_2fa_callback: raise _exception.NotLoggedIn( "2FA code required! Please supply `on_2fa_callback` to .login" ) - _ = on_2fa_callback() + # Get a facebook.com url that handles the 2FA flow + # This probably works differently for Messenger-only accounts + url = get_next_url(url) + # Explicitly allow redirects + r = session.get(url, allow_redirects=True) + url = two_factor_helper(session, r, on_2fa_callback) - if r.headers.get("Location") != "https://www.messenger.com/": + if not url.startswith("https://www.messenger.com/login/auth_token/"): + raise _exception.ParseError("Failed 2fa flow", data=url) + + r = session.get(url, allow_redirects=False) + url = r.headers.get("Location") + + if url != "https://www.messenger.com/": error = get_error_data(r.content.decode("utf-8")) - raise _exception.NotLoggedIn("Failed logging in: {}".format(error or r.url)) + raise _exception.NotLoggedIn("Failed logging in: {}, {}".format(url, error)) try: return cls._from_session(session=session)