Use messenger.com URLs instead of facebook.com

This should allow people who have only created a messenger account to
log in.

Also parse required `fb_dtsg` and `client_revision` values better.

The 2-fa flow is removed for now, I'll re-add it later.
This commit is contained in:
Mads Marquart
2020-05-06 21:57:24 +02:00
parent cce947b18c
commit 079d4093c4
3 changed files with 129 additions and 172 deletions

View File

@@ -524,7 +524,9 @@ class Client:
data = {"voice_clip": voice_clip} data = {"voice_clip": voice_clip}
j = self.session._payload_post( j = self.session._payload_post(
"https://upload.facebook.com/ajax/mercury/upload.php", data, files=file_dict "https://upload.messenger.com/ajax/mercury/upload.php",
data,
files=file_dict,
) )
if len(j["metadata"]) != len(file_dict): if len(j["metadata"]) != len(file_dict):

View File

@@ -8,7 +8,7 @@ from . import _util, _exception, _session, _graphql, _events
from typing import Iterable, Optional, Mapping, List from typing import Iterable, Optional, Mapping, List
HOST = "edge-chat.facebook.com" HOST = "edge-chat.messenger.com"
TOPICS = [ TOPICS = [
# Things that happen in chats (e.g. messages) # Things that happen in chats (e.g. messages)
@@ -271,10 +271,10 @@ class Listener:
headers = { headers = {
"Cookie": get_cookie_header( "Cookie": get_cookie_header(
self.session._session, "https://edge-chat.facebook.com/chat" self.session._session, "https://edge-chat.messenger.com/chat"
), ),
"User-Agent": self.session._session.headers["User-Agent"], "User-Agent": self.session._session.headers["User-Agent"],
"Origin": "https://www.facebook.com", "Origin": "https://www.messenger.com",
"Host": HOST, "Host": HOST,
} }

View File

@@ -1,17 +1,43 @@
import attr import attr
import bs4
import datetime import datetime
import re
import requests import requests
import random import random
import urllib.parse import re
import json
from ._common import log, kw_only from ._common import log, kw_only
from . import _graphql, _util, _exception from . import _graphql, _util, _exception
from typing import Optional, Tuple, Mapping, Callable from typing import Optional, Mapping, Callable, Any
FB_DTSG_REGEX = re.compile(r'name="fb_dtsg" value="(.*?)"')
SERVER_JS_DEFINE_REGEX = re.compile(r'require\("ServerJSDefine"\)\)?\.handleDefines\(')
SERVER_JS_DEFINE_JSON_DECODER = json.JSONDecoder()
def parse_server_js_define(html: str) -> Mapping[str, Any]:
"""Parse ``ServerJSDefine`` entries from a HTML document."""
# Find points where we should start parsing
define_splits = SERVER_JS_DEFINE_REGEX.split(html)
# Skip leading entry
_, *define_splits = define_splits
rtn = []
if not define_splits:
raise _exception.ParseError("Could not find any ServerJSDefine", data=html)
# Parse entries (should be two)
for entry in define_splits:
try:
parsed, _ = SERVER_JS_DEFINE_JSON_DECODER.raw_decode(entry, idx=0)
except json.JSONDecodeError as e:
raise _exception.ParseError("Invalid ServerJSDefine", data=entry) from e
if not isinstance(parsed, list):
raise _exception.ParseError("Invalid ServerJSDefine", data=parsed)
rtn.extend(parsed)
# Convert to a dict
return _util.get_jsmods_define(rtn)
def base36encode(number: int) -> str: def base36encode(number: int) -> str:
@@ -32,7 +58,7 @@ def base36encode(number: int) -> str:
def prefix_url(url: str) -> str: def prefix_url(url: str) -> str:
if url.startswith("/"): if url.startswith("/"):
return "https://www.facebook.com" + url return "https://www.messenger.com" + url
return url return url
@@ -51,15 +77,11 @@ def get_user_id(session: requests.Session) -> str:
return str(rtn) return str(rtn)
def find_input_fields(html: str):
return bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("input"))
def session_factory() -> requests.Session: def session_factory() -> requests.Session:
from . import __version__ from . import __version__
session = requests.session() session = requests.session()
session.headers["Referer"] = "https://www.facebook.com" session.headers["Referer"] = "https://www.messenger.com/"
# We won't try to set a fake user agent to mask our presence! # We won't try to set a fake user agent to mask our presence!
# Facebook allows us access anyhow, and it makes our motives clearer: # Facebook allows us access anyhow, and it makes our motives clearer:
# We're not trying to cheat Facebook, we simply want to access their service # We're not trying to cheat Facebook, we simply want to access their service
@@ -71,81 +93,25 @@ def client_id_factory() -> str:
return hex(int(random.random() * 2 ** 31))[2:] return hex(int(random.random() * 2 ** 31))[2:]
def is_home(url: str) -> bool: def get_error_data(html: str) -> Optional[str]:
parts = urllib.parse.urlparse(url) """Get error message from a request."""
# Check the urls `/home.php` and `/` # Only import when required
return "home" in parts.path or "/" == parts.path import bs4
def _2fa_helper(session: requests.Session, code: int, r):
soup = find_input_fields(r.text)
data = dict()
url = "https://m.facebook.com/login/checkpoint/"
data["approvals_code"] = str(code)
data["fb_dtsg"] = soup.find("input", {"name": "fb_dtsg"})["value"]
data["nh"] = soup.find("input", {"name": "nh"})["value"]
data["submit[Submit Code]"] = "Submit Code"
data["codes_submitted"] = "0"
log.info("Submitting 2FA code.")
r = session.post(url, data=data)
if is_home(r.url):
return r
del data["approvals_code"]
del data["submit[Submit Code]"]
del data["codes_submitted"]
data["name_action_selected"] = "save_device"
data["submit[Continue]"] = "Continue"
log.info("Saving browser.")
# At this stage, we have dtsg, nh, name_action_selected, submit[Continue]
r = session.post(url, data=data)
if is_home(r.url):
return r
del data["name_action_selected"]
log.info("Starting Facebook checkup flow.")
# At this stage, we have dtsg, nh, submit[Continue]
r = session.post(url, data=data)
if is_home(r.url):
return r
del data["submit[Continue]"]
data["submit[This was me]"] = "This Was Me"
log.info("Verifying login attempt.")
# At this stage, we have dtsg, nh, submit[This was me]
r = session.post(url, data=data)
if is_home(r.url):
return r
del data["submit[This was me]"]
data["submit[Continue]"] = "Continue"
data["name_action_selected"] = "save_device"
log.info("Saving device again.")
# At this stage, we have dtsg, nh, submit[Continue], name_action_selected
r = session.post(url, data=data)
return r
def get_error_data(html: str, url: str) -> Tuple[Optional[int], Optional[str]]:
"""Get error code and message from a request."""
code = None
try:
code = int(_util.get_url_parameter(url, "e"))
except (TypeError, ValueError):
pass
soup = bs4.BeautifulSoup( soup = bs4.BeautifulSoup(
html, "html.parser", parse_only=bs4.SoupStrainer("div", id="login_error") html, "html.parser", parse_only=bs4.SoupStrainer("form", id="login_form")
) )
return code, soup.get_text() or None # Attempt to extract and format the error string
# The error message is in the user's own language!
return ". ".join(list(soup.stripped_strings)[:2]) or None
def get_fb_dtsg(define) -> Optional[str]:
if "DTSGInitData" in define:
return define["DTSGInitData"]["token"]
elif "DTSGInitialData" in define:
return define["DTSGInitialData"]["token"]
return None
@attr.s(slots=True, kw_only=kw_only, repr=False, eq=False) @attr.s(slots=True, kw_only=kw_only, repr=False, eq=False)
@@ -161,7 +127,6 @@ class Session:
_session = attr.ib(factory=session_factory, type=requests.Session) _session = attr.ib(factory=session_factory, type=requests.Session)
_counter = attr.ib(0, type=int) _counter = attr.ib(0, type=int)
_client_id = attr.ib(factory=client_id_factory, type=str) _client_id = attr.ib(factory=client_id_factory, type=str)
_logout_h = attr.ib(None, type=Optional[str])
@property @property
def user(self): def user(self):
@@ -206,53 +171,49 @@ class Session:
""" """
session = session_factory() session = session_factory()
try: data = {
r = session.get("https://m.facebook.com/") # "jazoest": "2754",
except requests.RequestException as e: # "lsd": "AVqqqRUa",
_exception.handle_requests_error(e) "initial_request_id": "x", # any, just has to be present
soup = find_input_fields(r.text) # "timezone": "-120",
# "lgndim": "eyJ3IjoxNDQwLCJoIjo5MDAsImF3IjoxNDQwLCJhaCI6ODc3LCJjIjoyNH0=",
data = dict( # "lgnrnd": "044039_RGm9",
(elem["name"], elem["value"]) # "lgnjs": "n",
for elem in soup "email": email,
if elem.has_attr("value") and elem.has_attr("name") "pass": password,
) # "login": "1",
data["email"] = email # "persistent": "1",
data["pass"] = password # "default_persistent": "0",
data["login"] = "Log In" }
try: try:
url = "https://m.facebook.com/login.php?login_attempt=1" # Should hit a redirect to https://www.messenger.com/
r = session.post(url, data=data) # If this does happen, the session is logged in!
except requests.RequestException as e: r = session.post(
_exception.handle_requests_error(e) "https://www.messenger.com/login/password/",
data=data,
# Usually, 'Checkpoint' will refer to 2FA allow_redirects=False,
if "checkpoint" in r.url and ('id="approvals_code"' in r.text.lower()):
if not on_2fa_callback:
raise ValueError(
"2FA code required, please add `on_2fa_callback` to .login"
)
code = on_2fa_callback()
try:
r = _2fa_helper(session, code, r)
except requests.RequestException as e:
_exception.handle_requests_error(e)
# Sometimes Facebook tries to show the user a "Save Device" dialog
if "save-device" in r.url:
try:
r = session.get("https://m.facebook.com/login/save-device/cancel/")
except requests.RequestException as e:
_exception.handle_requests_error(e)
if is_home(r.url):
return cls._from_session(session=session)
else:
code, msg = get_error_data(r.text, r.url)
raise _exception.ExternalError(
"Login failed at url {!r}".format(r.url), msg, code=code
) )
except requests.RequestException as e:
_exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
# TODO: Re-add 2FA
if False:
if not on_2fa_callback:
raise _exception.NotLoggedIn(
"2FA code required! Please supply `on_2fa_callback` to .login"
)
_ = on_2fa_callback()
if r.headers.get("Location") != "https://www.messenger.com/":
error = get_error_data(r.content.decode("utf-8"))
raise _exception.NotLoggedIn("Failed logging in: {}".format(error or r.url))
try:
return cls._from_session(session=session)
except _exception.NotLoggedIn as e:
raise _exception.ParseError("Failed loading session", data=r) from e
def is_logged_in(self) -> bool: def is_logged_in(self) -> bool:
"""Send a request to Facebook to check the login status. """Send a request to Facebook to check the login status.
@@ -264,12 +225,12 @@ class Session:
>>> assert session.is_logged_in() >>> assert session.is_logged_in()
""" """
# Send a request to the login url, to see if we're directed to the home page # Send a request to the login url, to see if we're directed to the home page
url = "https://m.facebook.com/login.php?login_attempt=1"
try: try:
r = self._session.get(url, allow_redirects=False) r = self._session.get(prefix_url("/login/"), allow_redirects=False)
except requests.RequestException as e: except requests.RequestException as e:
_exception.handle_requests_error(e) _exception.handle_requests_error(e)
return "Location" in r.headers and is_home(r.headers["Location"]) _exception.handle_http_error(r.status_code)
return "https://www.messenger.com/" == r.headers.get("Location")
def logout(self) -> None: def logout(self) -> None:
"""Safely log out the user. """Safely log out the user.
@@ -279,56 +240,51 @@ class Session:
Example: Example:
>>> session.logout() >>> session.logout()
""" """
logout_h = self._logout_h data = {"fb_dtsg": self._fb_dtsg}
if not logout_h:
url = prefix_url("/bluebar/modern_settings_menu/")
try:
h_r = self._session.post(url, data={"pmid": "4"})
except requests.RequestException as e:
_exception.handle_requests_error(e)
logout_h = re.search(r'name=\\"h\\" value=\\"(.*?)\\"', h_r.text).group(1)
url = prefix_url("/logout.php")
try: try:
r = self._session.get(url, params={"ref": "mb", "h": logout_h}) r = self._session.post(
prefix_url("/logout/"), data=data, allow_redirects=False
)
except requests.RequestException as e: except requests.RequestException as e:
_exception.handle_requests_error(e) _exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code) _exception.handle_http_error(r.status_code)
if "Location" not in r.headers:
raise _exception.FacebookError("Failed logging out, was not redirected!")
if "https://www.messenger.com/login/" != r.headers["Location"]:
raise _exception.FacebookError(
"Failed logging out, got bad redirect: {}".format(r.headers["Location"])
)
@classmethod @classmethod
def _from_session(cls, session): def _from_session(cls, session):
# TODO: Automatically set user_id when the cookie changes in the session # TODO: Automatically set user_id when the cookie changes in the session
user_id = get_user_id(session) user_id = get_user_id(session)
# Make a request to the main page to retrieve ServerJSDefine entries
try: try:
r = session.get(prefix_url("/")) r = session.get(prefix_url("/"), allow_redirects=False)
except requests.RequestException as e: except requests.RequestException as e:
_exception.handle_requests_error(e) _exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
soup = find_input_fields(r.text) define = parse_server_js_define(r.content.decode("utf-8"))
fb_dtsg_element = soup.find("input", {"name": "fb_dtsg"}) fb_dtsg = get_fb_dtsg(define)
if fb_dtsg_element: if fb_dtsg is None:
fb_dtsg = fb_dtsg_element["value"] raise _exception.ParseError("Could not find fb_dtsg", data=define)
else: if not fb_dtsg:
# Fall back to searching with a regex # Happens when the client is not actually logged in
res = FB_DTSG_REGEX.search(r.text) raise _exception.NotLoggedIn(
if not res: "Found empty fb_dtsg, the session was probably invalid."
raise _exception.NotLoggedIn("Could not find fb_dtsg") )
fb_dtsg = res.group(1)
revision = int(r.text.split('"client_revision":', 1)[1].split(",", 1)[0]) try:
revision = int(define["SiteData"]["client_revision"])
except TypeError:
raise _exception.ParseError("Could not find client revision", data=define)
logout_h_element = soup.find("input", {"name": "h"}) return cls(user_id=user_id, fb_dtsg=fb_dtsg, revision=revision, session=session)
logout_h = logout_h_element["value"] if logout_h_element else None
return cls(
user_id=user_id,
fb_dtsg=fb_dtsg,
revision=revision,
session=session,
logout_h=logout_h,
)
def get_cookies(self) -> Mapping[str, str]: def get_cookies(self) -> Mapping[str, str]:
"""Retrieve session cookies, that can later be used in `from_cookies`. """Retrieve session cookies, that can later be used in `from_cookies`.
@@ -383,10 +339,9 @@ class Session:
# update fb_dtsg token if received in response # update fb_dtsg token if received in response
if "jsmods" in j: if "jsmods" in j:
define = _util.get_jsmods_define(j["jsmods"]["define"]) define = _util.get_jsmods_define(j["jsmods"]["define"])
if "DTSGInitData" in define: fb_dtsg = get_fb_dtsg(define)
self._fb_dtsg = define["DTSGInitData"]["token"] if fb_dtsg:
elif "DTSGInitialData" in define: self._fb_dtsg = fb_dtsg
self._fb_dtsg = define["DTSGInitialData"]["token"]
try: try:
return j["payload"] return j["payload"]