Initial re-add of 2FA
This commit is contained in:
@@ -4,6 +4,7 @@ import requests
|
||||
import random
|
||||
import re
|
||||
import json
|
||||
import urllib.parse
|
||||
|
||||
from ._common import log, kw_only
|
||||
from . import _graphql, _util, _exception
|
||||
@@ -93,6 +94,78 @@ def client_id_factory() -> str:
|
||||
return hex(int(random.random() * 2 ** 31))[2:]
|
||||
|
||||
|
||||
def get_next_url(url: str) -> Optional[str]:
|
||||
parsed_url = urllib.parse.urlparse(url)
|
||||
query = urllib.parse.parse_qs(parsed_url.query)
|
||||
return query.get("next", [None])[0]
|
||||
|
||||
|
||||
def find_form_request(html: str):
|
||||
# Only import when required
|
||||
import bs4
|
||||
|
||||
soup = bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("form"))
|
||||
|
||||
form = soup.form
|
||||
if not form:
|
||||
raise _exception.ParseError("Could not find form to submit", data=soup)
|
||||
|
||||
url = form.get("action")
|
||||
if not url:
|
||||
raise _exception.ParseError("Could not find url to submit to", data=form)
|
||||
|
||||
if url.startswith("/"):
|
||||
url = "https://www.facebook.com" + url
|
||||
|
||||
# It's okay to set missing values to something crap, the values are localized, and
|
||||
# hence are not available in the raw HTML
|
||||
data = {
|
||||
x["name"]: x.get("value", "[missing]")
|
||||
for x in form.find_all(["input", "button"])
|
||||
}
|
||||
return url, data
|
||||
|
||||
|
||||
def two_factor_helper(session: requests.Session, r, on_2fa_callback):
|
||||
url, data = find_form_request(r.content.decode("utf-8"))
|
||||
|
||||
# You don't have to type a code if your device is already saved
|
||||
# Repeats if you get the code wrong
|
||||
while "approvals_code" not in data:
|
||||
data["approvals_code"] = on_2fa_callback()
|
||||
log.info("Submitting 2FA code")
|
||||
r = session.post(url, data=data, allow_redirects=False)
|
||||
url, data = find_form_request(r.content.decode("utf-8"))
|
||||
|
||||
# TODO: Can be missing if checkup flow was done on another device in the meantime?
|
||||
if "name_action_selected" in data:
|
||||
data["name_action_selected"] = "save_device"
|
||||
log.info("Saving browser")
|
||||
r = session.post(url, data=data, allow_redirects=False)
|
||||
url, data = find_form_request(r.content.decode("utf-8"))
|
||||
|
||||
log.info("Starting Facebook checkup flow")
|
||||
r = session.post(url, data=data, allow_redirects=False)
|
||||
|
||||
url, data = find_form_request(r.content.decode("utf-8"))
|
||||
if "submit[This was me]" not in data or "submit[This wasn't me]" not in data:
|
||||
raise _exception.ParseError("Could not fill out form properly (2)", data=data)
|
||||
data["submit[This was me]"] = "[any value]"
|
||||
del data["submit[This wasn't me]"]
|
||||
log.info("Verifying login attempt")
|
||||
r = session.post(url, data=data, allow_redirects=False)
|
||||
|
||||
url, data = find_form_request(r.content.decode("utf-8"))
|
||||
if "name_action_selected" not in data:
|
||||
raise _exception.ParseError("Could not fill out form properly (3)", data=data)
|
||||
data["name_action_selected"] = "save_device"
|
||||
log.info("Saving device again")
|
||||
r = session.post(url, data=data, allow_redirects=False)
|
||||
|
||||
print(r.status_code, r.url, r.headers)
|
||||
return r.headers.get("Location")
|
||||
|
||||
|
||||
def get_error_data(html: str) -> Optional[str]:
|
||||
"""Get error message from a request."""
|
||||
# Only import when required
|
||||
@@ -150,6 +223,7 @@ class Session:
|
||||
"fb_dtsg": self._fb_dtsg,
|
||||
}
|
||||
|
||||
# TODO: Add ability to load previous cookies in here, to avoid 2fa flow
|
||||
@classmethod
|
||||
def login(
|
||||
cls, email: str, password: str, on_2fa_callback: Callable[[], int] = None
|
||||
@@ -159,13 +233,25 @@ class Session:
|
||||
Args:
|
||||
email: Facebook ``email``, ``id`` or ``phone number``
|
||||
password: Facebook account password
|
||||
on_2fa_callback: Function that will be called, in case a 2FA code is needed.
|
||||
This should return the requested 2FA code.
|
||||
on_2fa_callback: Function that will be called, in case a two factor
|
||||
authentication code is needed. This should return the requested code.
|
||||
|
||||
Only tested with SMS codes, might not work with authentication apps.
|
||||
|
||||
Note: Facebook limits the amount of codes they will give you, so if you
|
||||
don't receive a code, be patient, and try again later!
|
||||
|
||||
Example:
|
||||
>>> import getpass
|
||||
>>> import fbchat
|
||||
>>> session = fbchat.Session.login("<email or phone>", getpass.getpass())
|
||||
>>> import getpass
|
||||
>>> session = fbchat.Session.login(
|
||||
... input("Email: "),
|
||||
... getpass.getpass(),
|
||||
... on_2fa_callback=lambda: input("2FA Code: ")
|
||||
... )
|
||||
Email: abc@gmail.com
|
||||
Password: ****
|
||||
2FA Code: 123456
|
||||
>>> session.user.id
|
||||
"1234"
|
||||
"""
|
||||
@@ -198,17 +284,34 @@ class Session:
|
||||
_exception.handle_requests_error(e)
|
||||
_exception.handle_http_error(r.status_code)
|
||||
|
||||
# TODO: Re-add 2FA
|
||||
if False:
|
||||
url = r.headers.get("Location")
|
||||
|
||||
# We weren't redirected, hence the email or password was wrong
|
||||
if not url:
|
||||
error = get_error_data(r.content.decode("utf-8"))
|
||||
raise _exception.NotLoggedIn(error)
|
||||
|
||||
if "checkpoint" in url:
|
||||
if not on_2fa_callback:
|
||||
raise _exception.NotLoggedIn(
|
||||
"2FA code required! Please supply `on_2fa_callback` to .login"
|
||||
)
|
||||
_ = on_2fa_callback()
|
||||
# Get a facebook.com url that handles the 2FA flow
|
||||
# This probably works differently for Messenger-only accounts
|
||||
url = get_next_url(url)
|
||||
# Explicitly allow redirects
|
||||
r = session.get(url, allow_redirects=True)
|
||||
url = two_factor_helper(session, r, on_2fa_callback)
|
||||
|
||||
if r.headers.get("Location") != "https://www.messenger.com/":
|
||||
if not url.startswith("https://www.messenger.com/login/auth_token/"):
|
||||
raise _exception.ParseError("Failed 2fa flow", data=url)
|
||||
|
||||
r = session.get(url, allow_redirects=False)
|
||||
url = r.headers.get("Location")
|
||||
|
||||
if url != "https://www.messenger.com/":
|
||||
error = get_error_data(r.content.decode("utf-8"))
|
||||
raise _exception.NotLoggedIn("Failed logging in: {}".format(error or r.url))
|
||||
raise _exception.NotLoggedIn("Failed logging in: {}, {}".format(url, error))
|
||||
|
||||
try:
|
||||
return cls._from_session(session=session)
|
||||
|
Reference in New Issue
Block a user