diff --git a/fbchat/_client.py b/fbchat/_client.py index daebf3f..9c99d81 100644 --- a/fbchat/_client.py +++ b/fbchat/_client.py @@ -11,6 +11,7 @@ from collections import OrderedDict from ._util import * from .models import * from ._graphql import graphql_queries_to_json, graphql_response_to_json, GraphQL +from ._state import State import time import json @@ -33,11 +34,19 @@ class Client(object): See https://fbchat.readthedocs.io for complete documentation of the API. """ - ssl_verify = True - """Verify ssl certificate, set to False to allow debugging with a proxy""" listening = False """Whether the client is listening. Used when creating an external event loop to determine when to stop listening""" + @property + def ssl_verify(self): + """Verify ssl certificate, set to False to allow debugging with a proxy.""" + # TODO: Deprecate this + return self._state._session.verify + + @ssl_verify.setter + def ssl_verify(self, value): + self._state._session.verify = value + @property def uid(self): """The ID of the client. @@ -69,51 +78,32 @@ class Client(object): :raises: FBchatException on failed login """ self._sticky, self._pool = (None, None) - self._session = requests.session() - self._req_counter = 1 self._seq = "0" - # See `createPoll` for the reason for using `OrderedDict` here - self._payload_default = OrderedDict() + self._client_id = hex(int(random() * 2 ** 31))[2:] self._default_thread_id = None self._default_thread_type = None self._pull_channel = 0 self._markAlive = True self._buddylist = dict() - if not user_agent: - user_agent = choice(USER_AGENTS) - - self._header = { - "Content-Type": "application/x-www-form-urlencoded", - "Referer": "https://www.facebook.com", - "Origin": "https://www.facebook.com", - "User-Agent": user_agent, - "Connection": "keep-alive", - } - handler.setLevel(logging_level) # If session cookies aren't set, not properly loaded or gives us an invalid session, then do the login if ( not session_cookies - or not self.setSession(session_cookies) + or not self.setSession(session_cookies, user_agent=user_agent) or not self.isLoggedIn() ): - self.login(email, password, max_tries) + self.login(email, password, max_tries, user_agent=user_agent) """ INTERNAL REQUEST METHODS """ def _generatePayload(self, query): - """Adds the following defaults to the payload: - __rev, __user, __a, ttstamp, fb_dtsg, __req - """ if not query: query = {} - query.update(self._payload_default) - query["__req"] = str_base(self._req_counter, 36) - self._req_counter += 1 + query.update(self._state.get_params()) return query def _fix_fb_errors(self, error_code): @@ -123,19 +113,14 @@ class Client(object): It may be a bad idea to do this in an exception handler, if you have a better method, please suggest it! """ if error_code == "1357004": - log.warning("Got error #1357004. Doing a _postLogin, and resending request") - self._postLogin() + log.warning("Got error #1357004. Refreshing state and resending request") + self._state = State.from_session(session=self._state._session) return True return False def _get(self, url, query=None, fix_request=False, as_json=False, error_retries=3): payload = self._generatePayload(query) - r = self._session.get( - prefix_url(url), - headers=self._header, - params=payload, - verify=self.ssl_verify, - ) + r = self._state._session.get(prefix_url(url), params=payload) if not fix_request: return r try: @@ -161,9 +146,7 @@ class Client(object): error_retries=3, ): payload = self._generatePayload(query) - r = self._session.post( - prefix_url(url), headers=self._header, data=payload, verify=self.ssl_verify - ) + r = self._state._session.post(prefix_url(url), data=payload) if not fix_request: return r try: @@ -184,21 +167,6 @@ class Client(object): ) raise e - def _cleanGet(self, url, query=None, allow_redirects=True): - return self._session.get( - prefix_url(url), - headers=self._header, - params=query, - verify=self.ssl_verify, - allow_redirects=allow_redirects, - ) - - def _cleanPost(self, url, query=None): - self._req_counter += 1 - return self._session.post( - prefix_url(url), headers=self._header, data=query, verify=self.ssl_verify - ) - def _postFile( self, url, @@ -209,17 +177,7 @@ class Client(object): error_retries=3, ): payload = self._generatePayload(query) - # Removes 'Content-Type' from the header - headers = dict( - (i, self._header[i]) for i in self._header if i != "Content-Type" - ) - r = self._session.post( - prefix_url(url), - headers=headers, - data=payload, - files=files, - verify=self.ssl_verify, - ) + r = self._state._session.post(prefix_url(url), data=payload, files=files) if not fix_request: return r try: @@ -270,128 +228,6 @@ class Client(object): LOGIN METHODS """ - def _resetValues(self): - self._payload_default = OrderedDict() - self._session = requests.session() - self._req_counter = 1 - self._seq = "0" - self._uid = None - - def _postLogin(self): - self._payload_default = OrderedDict() - self._client_id = hex(int(random() * 2147483648))[2:] - self._uid = self._session.cookies.get_dict().get("c_user") - if self._uid is None: - raise FBchatException("Could not find c_user cookie") - self._uid = str(self._uid) - - r = self._get("/") - soup = bs(r.text, "html.parser") - - fb_dtsg_element = soup.find("input", {"name": "fb_dtsg"}) - if fb_dtsg_element: - fb_dtsg = fb_dtsg_element["value"] - else: - fb_dtsg = re.search(r'name="fb_dtsg" value="(.*?)"', r.text).group(1) - - fb_h_element = soup.find("input", {"name": "h"}) - if fb_h_element: - self._fb_h = fb_h_element["value"] - - # Set default payload - self._payload_default["__rev"] = int( - r.text.split('"client_revision":', 1)[1].split(",", 1)[0] - ) - self._payload_default["__a"] = "1" - self._payload_default["fb_dtsg"] = fb_dtsg - - def _login(self, email, password): - soup = bs(self._get("https://m.facebook.com/").text, "html.parser") - data = dict( - (elem["name"], elem["value"]) - for elem in soup.findAll("input") - if elem.has_attr("value") and elem.has_attr("name") - ) - data["email"] = email - data["pass"] = password - data["login"] = "Log In" - - r = self._cleanPost("https://m.facebook.com/login.php?login_attempt=1", data) - - # Usually, 'Checkpoint' will refer to 2FA - if "checkpoint" in r.url and ('id="approvals_code"' in r.text.lower()): - r = self._2FA(r) - - # Sometimes Facebook tries to show the user a "Save Device" dialog - if "save-device" in r.url: - r = self._cleanGet("https://m.facebook.com/login/save-device/cancel/") - - if "home" in r.url: - self._postLogin() - return True, r.url - else: - return False, r.url - - def _2FA(self, r): - soup = bs(r.text, "html.parser") - data = dict() - - s = self.on2FACode() - - data["approvals_code"] = s - data["fb_dtsg"] = soup.find("input", {"name": "fb_dtsg"})["value"] - data["nh"] = soup.find("input", {"name": "nh"})["value"] - data["submit[Submit Code]"] = "Submit Code" - data["codes_submitted"] = 0 - log.info("Submitting 2FA code.") - - r = self._cleanPost("https://m.facebook.com/login/checkpoint/", data) - - if "home" in r.url: - return r - - del data["approvals_code"] - del data["submit[Submit Code]"] - del data["codes_submitted"] - - data["name_action_selected"] = "save_device" - data["submit[Continue]"] = "Continue" - log.info( - "Saving browser." - ) # At this stage, we have dtsg, nh, name_action_selected, submit[Continue] - r = self._cleanPost("https://m.facebook.com/login/checkpoint/", data) - - if "home" in r.url: - return r - - del data["name_action_selected"] - log.info( - "Starting Facebook checkup flow." - ) # At this stage, we have dtsg, nh, submit[Continue] - r = self._cleanPost("https://m.facebook.com/login/checkpoint/", data) - - if "home" in r.url: - return r - - del data["submit[Continue]"] - data["submit[This was me]"] = "This Was Me" - log.info( - "Verifying login attempt." - ) # At this stage, we have dtsg, nh, submit[This was me] - r = self._cleanPost("https://m.facebook.com/login/checkpoint/", data) - - if "home" in r.url: - return r - - del data["submit[This was me]"] - data["submit[Continue]"] = "Continue" - data["name_action_selected"] = "save_device" - log.info( - "Saving device again." - ) # At this stage, we have dtsg, nh, submit[Continue], name_action_selected - r = self._cleanPost("https://m.facebook.com/login/checkpoint/", data) - return r - def isLoggedIn(self): """ Sends a request to Facebook to check the login status @@ -399,11 +235,7 @@ class Client(object): :return: True if the client is still logged in :rtype: bool """ - # Send a request to the login url, to see if we're directed to the home page - r = self._cleanGet( - "https://m.facebook.com/login.php?login_attempt=1", allow_redirects=False - ) - return "Location" in r.headers and "home" in r.headers["Location"] + return self._state.is_logged_in() def getSession(self): """Retrieves session cookies @@ -411,9 +243,9 @@ class Client(object): :return: A dictionay containing session cookies :rtype: dict """ - return self._session.cookies.get_dict() + return self._state.get_cookies() - def setSession(self, session_cookies): + def setSession(self, session_cookies, user_agent=None): """Loads session cookies :param session_cookies: A dictionay containing session cookies @@ -421,23 +253,21 @@ class Client(object): :return: False if `session_cookies` does not contain proper cookies :rtype: bool """ - # Quick check to see if session_cookies is formatted properly - if not session_cookies or "c_user" not in session_cookies: - return False - try: # Load cookies into current session - self._session.cookies = requests.cookies.merge_cookies( - self._session.cookies, session_cookies - ) - self._postLogin() + state = State.from_cookies(session_cookies, user_agent=user_agent) except Exception as e: log.exception("Failed loading session") - self._resetValues() return False + uid = state.get_user_id() + if uid is None: + log.warning("Could not find c_user cookie") + return False + self._state = state + self._uid = uid return True - def login(self, email, password, max_tries=5): + def login(self, email, password, max_tries=5, user_agent=None): """ Uses `email` and `password` to login the user (If the user is already logged in, this will do a re-login) @@ -456,23 +286,21 @@ class Client(object): raise FBchatUserError("Email and password not set") for i in range(1, max_tries + 1): - login_successful, login_url = self._login(email, password) - if not login_successful: - log.warning( - "Attempt #{} failed{}".format( - i, {True: ", retrying"}.get(i < max_tries, "") - ) - ) + try: + state = State.login(email, password, user_agent=user_agent) + uid = state.get_user_id() + if uid is None: + raise FBchatException("Could not find user id") + except Exception: + if i >= max_tries: + raise + log.exception("Attempt #{} failed, retrying".format(i)) time.sleep(1) - continue else: + self._state = state + self._uid = uid self.onLoggedIn(email=email) break - else: - raise FBchatUserError( - "Login failed. Check email/password. " - "(Failed on url: {})".format(login_url) - ) def logout(self): """ @@ -481,17 +309,11 @@ class Client(object): :return: True if the action was successful :rtype: bool """ - if not hasattr(self, "_fb_h"): - h_r = self._post("/bluebar/modern_settings_menu/", {"pmid": "4"}) - self._fb_h = re.search(r'name=\\"h\\" value=\\"(.*?)\\"', h_r.text).group(1) - - data = {"ref": "mb", "h": self._fb_h} - - r = self._get("/logout.php", data) - - self._resetValues() - - return r.ok + if self._state.logout(): + self._state = None + self._uid = None + return True + return False """ END LOGIN METHODS @@ -1299,7 +1121,7 @@ class Client(object): # update JS token if received in response fb_dtsg = get_jsmods_require(j, 2) if fb_dtsg is not None: - self._payload_default["fb_dtsg"] = fb_dtsg + self._state.fb_dtsg = fb_dtsg try: message_ids = [ @@ -2092,9 +1914,6 @@ class Client(object): # We're using ordered dicts, because the Facebook endpoint that parses the POST # parameters is badly implemented, and deals with ordering the options wrongly. - # This also means we had to change `client._payload_default` to an ordered dict, - # since that's being copied in between this point and the `requests` call - # # If you can find a way to fix this for the endpoint, or if you find another # endpoint, please do suggest it ;) data = OrderedDict([("question_text", poll.title), ("target_id", thread_id)]) diff --git a/fbchat/_state.py b/fbchat/_state.py new file mode 100644 index 0000000..574015c --- /dev/null +++ b/fbchat/_state.py @@ -0,0 +1,185 @@ +# -*- coding: UTF-8 -*- +from __future__ import unicode_literals + +import attr +import bs4 +import re +import requests +import random + +from . import _util, _exception + +FB_DTSG_REGEX = re.compile(r'name="fb_dtsg" value="(.*?)"') + + +def session_factory(user_agent=None): + session = requests.session() + session.headers["Referer"] = "https://www.facebook.com" + # TODO: Deprecate setting the user agent manually + session.headers["User-Agent"] = user_agent or random.choice(_util.USER_AGENTS) + return session + + +def _2fa_helper(session, code, r): + soup = bs4.BeautifulSoup(r.text, "html.parser") + data = dict() + + url = "https://m.facebook.com/login/checkpoint/" + + data["approvals_code"] = code + data["fb_dtsg"] = soup.find("input", {"name": "fb_dtsg"})["value"] + data["nh"] = soup.find("input", {"name": "nh"})["value"] + data["submit[Submit Code]"] = "Submit Code" + data["codes_submitted"] = 0 + _util.log.info("Submitting 2FA code.") + + r = session.post(url, data=data) + + if "home" in r.url: + return r + + del data["approvals_code"] + del data["submit[Submit Code]"] + del data["codes_submitted"] + + data["name_action_selected"] = "save_device" + data["submit[Continue]"] = "Continue" + _util.log.info("Saving browser.") + # At this stage, we have dtsg, nh, name_action_selected, submit[Continue] + r = session.post(url, data=data) + + if "home" in r.url: + return r + + del data["name_action_selected"] + _util.log.info("Starting Facebook checkup flow.") + # At this stage, we have dtsg, nh, submit[Continue] + r = session.post(url, data=data) + + if "home" in r.url: + return r + + del data["submit[Continue]"] + data["submit[This was me]"] = "This Was Me" + _util.log.info("Verifying login attempt.") + # At this stage, we have dtsg, nh, submit[This was me] + r = session.post(url, data=data) + + if "home" in r.url: + return r + + del data["submit[This was me]"] + data["submit[Continue]"] = "Continue" + data["name_action_selected"] = "save_device" + _util.log.info("Saving device again.") + # At this stage, we have dtsg, nh, submit[Continue], name_action_selected + r = session.post(url, data=data) + return r + + +@attr.s(slots=True) # TODO i Python 3: Add kw_only=True +class State(object): + """Stores and manages state required for most Facebook requests.""" + + fb_dtsg = attr.ib() + _revision = attr.ib() + _session = attr.ib(factory=session_factory) + _counter = attr.ib(0) + _logout_h = attr.ib(None) + + def get_user_id(self): + rtn = self.get_cookies().get("c_user") + if rtn is None: + return None + return str(rtn) + + def get_params(self): + self._counter += 1 # TODO: Make this operation atomic / thread-safe + return { + "__a": 1, + "__req": _util.str_base(self._counter, 36), + "__rev": self._revision, + "fb_dtsg": self.fb_dtsg, + } + + @classmethod + def login(cls, email, password, user_agent=None): + session = session_factory(user_agent=user_agent) + + soup = bs4.BeautifulSoup( + session.get("https://m.facebook.com/").text, "html.parser" + ) + data = dict( + (elem["name"], elem["value"]) + for elem in soup.findAll("input") + if elem.has_attr("value") and elem.has_attr("name") + ) + data["email"] = email + data["pass"] = password + data["login"] = "Log In" + + r = session.post("https://m.facebook.com/login.php?login_attempt=1", data=data) + + # Usually, 'Checkpoint' will refer to 2FA + if "checkpoint" in r.url and ('id="approvals_code"' in r.text.lower()): + code = on_2fa_callback() + r = _2fa_helper(session, code, r) + + # Sometimes Facebook tries to show the user a "Save Device" dialog + if "save-device" in r.url: + r = session.get("https://m.facebook.com/login/save-device/cancel/") + + if "home" in r.url: + return cls.from_session(session=session) + else: + raise _exception.FBchatUserError( + "Login failed. Check email/password. " + "(Failed on url: {})".format(r.url) + ) + + def is_logged_in(self): + # Send a request to the login url, to see if we're directed to the home page + url = "https://m.facebook.com/login.php?login_attempt=1" + r = self._session.get(url, allow_redirects=False) + return "Location" in r.headers and "home" in r.headers["Location"] + + def logout(self): + logout_h = self._logout_h + if not logout_h: + url = _util.prefix_url("/bluebar/modern_settings_menu/") + h_r = self._session.post(url, data={"pmid": "4"}) + logout_h = re.search(r'name=\\"h\\" value=\\"(.*?)\\"', h_r.text).group(1) + + url = _util.prefix_url("/logout.php") + return self._session.get(url, params={"ref": "mb", "h": logout_h}).ok + + @classmethod + def from_session(cls, session): + r = session.get(_util.prefix_url("/")) + + soup = bs4.BeautifulSoup(r.text, "html.parser") + + fb_dtsg_element = soup.find("input", {"name": "fb_dtsg"}) + if fb_dtsg_element: + fb_dtsg = fb_dtsg_element["value"] + else: + # Fall back to searching with a regex + fb_dtsg = FB_DTSG_REGEX.search(r.text).group(1) + + revision = int(r.text.split('"client_revision":', 1)[1].split(",", 1)[0]) + + logout_h_element = soup.find("input", {"name": "h"}) + logout_h = logout_h_element["value"] if logout_h_element else None + + return cls( + fb_dtsg=fb_dtsg, revision=revision, session=session, logout_h=logout_h + ) + + def get_cookies(self): + return self._session.cookies.get_dict() + + @classmethod + def from_cookies(cls, cookies, user_agent=None): + session = session_factory(user_agent=user_agent) + session.cookies = requests.cookies.merge_cookies(session.cookies, cookies) + return cls.from_session(session=session)