Merge pull request #435 from carpedm20/state-refactor

Move state handling into new State model
This commit is contained in:
Mads Marquart
2019-07-02 18:04:10 +02:00
committed by GitHub
2 changed files with 233 additions and 229 deletions

View File

@@ -11,6 +11,7 @@ from collections import OrderedDict
from ._util import *
from .models import *
from ._graphql import graphql_queries_to_json, graphql_response_to_json, GraphQL
from ._state import State
import time
import json
@@ -33,11 +34,19 @@ class Client(object):
See https://fbchat.readthedocs.io for complete documentation of the API.
"""
ssl_verify = True
"""Verify ssl certificate, set to False to allow debugging with a proxy"""
listening = False
"""Whether the client is listening. Used when creating an external event loop to determine when to stop listening"""
@property
def ssl_verify(self):
"""Verify ssl certificate, set to False to allow debugging with a proxy."""
# TODO: Deprecate this
return self._state._session.verify
@ssl_verify.setter
def ssl_verify(self, value):
self._state._session.verify = value
@property
def uid(self):
"""The ID of the client.
@@ -69,51 +78,32 @@ class Client(object):
:raises: FBchatException on failed login
"""
self._sticky, self._pool = (None, None)
self._session = requests.session()
self._req_counter = 1
self._seq = "0"
# See `createPoll` for the reason for using `OrderedDict` here
self._payload_default = OrderedDict()
self._client_id = hex(int(random() * 2 ** 31))[2:]
self._default_thread_id = None
self._default_thread_type = None
self._pull_channel = 0
self._markAlive = True
self._buddylist = dict()
if not user_agent:
user_agent = choice(USER_AGENTS)
self._header = {
"Content-Type": "application/x-www-form-urlencoded",
"Referer": "https://www.facebook.com",
"Origin": "https://www.facebook.com",
"User-Agent": user_agent,
"Connection": "keep-alive",
}
handler.setLevel(logging_level)
# If session cookies aren't set, not properly loaded or gives us an invalid session, then do the login
if (
not session_cookies
or not self.setSession(session_cookies)
or not self.setSession(session_cookies, user_agent=user_agent)
or not self.isLoggedIn()
):
self.login(email, password, max_tries)
self.login(email, password, max_tries, user_agent=user_agent)
"""
INTERNAL REQUEST METHODS
"""
def _generatePayload(self, query):
"""Adds the following defaults to the payload:
__rev, __user, __a, ttstamp, fb_dtsg, __req
"""
if not query:
query = {}
query.update(self._payload_default)
query["__req"] = str_base(self._req_counter, 36)
self._req_counter += 1
query.update(self._state.get_params())
return query
def _fix_fb_errors(self, error_code):
@@ -123,19 +113,14 @@ class Client(object):
It may be a bad idea to do this in an exception handler, if you have a better method, please suggest it!
"""
if error_code == "1357004":
log.warning("Got error #1357004. Doing a _postLogin, and resending request")
self._postLogin()
log.warning("Got error #1357004. Refreshing state and resending request")
self._state = State.from_session(session=self._state._session)
return True
return False
def _get(self, url, query=None, fix_request=False, as_json=False, error_retries=3):
payload = self._generatePayload(query)
r = self._session.get(
prefix_url(url),
headers=self._header,
params=payload,
verify=self.ssl_verify,
)
r = self._state._session.get(prefix_url(url), params=payload)
if not fix_request:
return r
try:
@@ -161,9 +146,7 @@ class Client(object):
error_retries=3,
):
payload = self._generatePayload(query)
r = self._session.post(
prefix_url(url), headers=self._header, data=payload, verify=self.ssl_verify
)
r = self._state._session.post(prefix_url(url), data=payload)
if not fix_request:
return r
try:
@@ -184,21 +167,6 @@ class Client(object):
)
raise e
def _cleanGet(self, url, query=None, allow_redirects=True):
return self._session.get(
prefix_url(url),
headers=self._header,
params=query,
verify=self.ssl_verify,
allow_redirects=allow_redirects,
)
def _cleanPost(self, url, query=None):
self._req_counter += 1
return self._session.post(
prefix_url(url), headers=self._header, data=query, verify=self.ssl_verify
)
def _postFile(
self,
url,
@@ -209,17 +177,7 @@ class Client(object):
error_retries=3,
):
payload = self._generatePayload(query)
# Removes 'Content-Type' from the header
headers = dict(
(i, self._header[i]) for i in self._header if i != "Content-Type"
)
r = self._session.post(
prefix_url(url),
headers=headers,
data=payload,
files=files,
verify=self.ssl_verify,
)
r = self._state._session.post(prefix_url(url), data=payload, files=files)
if not fix_request:
return r
try:
@@ -270,128 +228,6 @@ class Client(object):
LOGIN METHODS
"""
def _resetValues(self):
self._payload_default = OrderedDict()
self._session = requests.session()
self._req_counter = 1
self._seq = "0"
self._uid = None
def _postLogin(self):
self._payload_default = OrderedDict()
self._client_id = hex(int(random() * 2147483648))[2:]
self._uid = self._session.cookies.get_dict().get("c_user")
if self._uid is None:
raise FBchatException("Could not find c_user cookie")
self._uid = str(self._uid)
r = self._get("/")
soup = bs(r.text, "html.parser")
fb_dtsg_element = soup.find("input", {"name": "fb_dtsg"})
if fb_dtsg_element:
fb_dtsg = fb_dtsg_element["value"]
else:
fb_dtsg = re.search(r'name="fb_dtsg" value="(.*?)"', r.text).group(1)
fb_h_element = soup.find("input", {"name": "h"})
if fb_h_element:
self._fb_h = fb_h_element["value"]
# Set default payload
self._payload_default["__rev"] = int(
r.text.split('"client_revision":', 1)[1].split(",", 1)[0]
)
self._payload_default["__a"] = "1"
self._payload_default["fb_dtsg"] = fb_dtsg
def _login(self, email, password):
soup = bs(self._get("https://m.facebook.com/").text, "html.parser")
data = dict(
(elem["name"], elem["value"])
for elem in soup.findAll("input")
if elem.has_attr("value") and elem.has_attr("name")
)
data["email"] = email
data["pass"] = password
data["login"] = "Log In"
r = self._cleanPost("https://m.facebook.com/login.php?login_attempt=1", data)
# Usually, 'Checkpoint' will refer to 2FA
if "checkpoint" in r.url and ('id="approvals_code"' in r.text.lower()):
r = self._2FA(r)
# Sometimes Facebook tries to show the user a "Save Device" dialog
if "save-device" in r.url:
r = self._cleanGet("https://m.facebook.com/login/save-device/cancel/")
if "home" in r.url:
self._postLogin()
return True, r.url
else:
return False, r.url
def _2FA(self, r):
soup = bs(r.text, "html.parser")
data = dict()
s = self.on2FACode()
data["approvals_code"] = s
data["fb_dtsg"] = soup.find("input", {"name": "fb_dtsg"})["value"]
data["nh"] = soup.find("input", {"name": "nh"})["value"]
data["submit[Submit Code]"] = "Submit Code"
data["codes_submitted"] = 0
log.info("Submitting 2FA code.")
r = self._cleanPost("https://m.facebook.com/login/checkpoint/", data)
if "home" in r.url:
return r
del data["approvals_code"]
del data["submit[Submit Code]"]
del data["codes_submitted"]
data["name_action_selected"] = "save_device"
data["submit[Continue]"] = "Continue"
log.info(
"Saving browser."
) # At this stage, we have dtsg, nh, name_action_selected, submit[Continue]
r = self._cleanPost("https://m.facebook.com/login/checkpoint/", data)
if "home" in r.url:
return r
del data["name_action_selected"]
log.info(
"Starting Facebook checkup flow."
) # At this stage, we have dtsg, nh, submit[Continue]
r = self._cleanPost("https://m.facebook.com/login/checkpoint/", data)
if "home" in r.url:
return r
del data["submit[Continue]"]
data["submit[This was me]"] = "This Was Me"
log.info(
"Verifying login attempt."
) # At this stage, we have dtsg, nh, submit[This was me]
r = self._cleanPost("https://m.facebook.com/login/checkpoint/", data)
if "home" in r.url:
return r
del data["submit[This was me]"]
data["submit[Continue]"] = "Continue"
data["name_action_selected"] = "save_device"
log.info(
"Saving device again."
) # At this stage, we have dtsg, nh, submit[Continue], name_action_selected
r = self._cleanPost("https://m.facebook.com/login/checkpoint/", data)
return r
def isLoggedIn(self):
"""
Sends a request to Facebook to check the login status
@@ -399,11 +235,7 @@ class Client(object):
:return: True if the client is still logged in
:rtype: bool
"""
# Send a request to the login url, to see if we're directed to the home page
r = self._cleanGet(
"https://m.facebook.com/login.php?login_attempt=1", allow_redirects=False
)
return "Location" in r.headers and "home" in r.headers["Location"]
return self._state.is_logged_in()
def getSession(self):
"""Retrieves session cookies
@@ -411,9 +243,9 @@ class Client(object):
:return: A dictionay containing session cookies
:rtype: dict
"""
return self._session.cookies.get_dict()
return self._state.get_cookies()
def setSession(self, session_cookies):
def setSession(self, session_cookies, user_agent=None):
"""Loads session cookies
:param session_cookies: A dictionay containing session cookies
@@ -421,23 +253,21 @@ class Client(object):
:return: False if `session_cookies` does not contain proper cookies
:rtype: bool
"""
# Quick check to see if session_cookies is formatted properly
if not session_cookies or "c_user" not in session_cookies:
return False
try:
# Load cookies into current session
self._session.cookies = requests.cookies.merge_cookies(
self._session.cookies, session_cookies
)
self._postLogin()
state = State.from_cookies(session_cookies, user_agent=user_agent)
except Exception as e:
log.exception("Failed loading session")
self._resetValues()
return False
uid = state.get_user_id()
if uid is None:
log.warning("Could not find c_user cookie")
return False
self._state = state
self._uid = uid
return True
def login(self, email, password, max_tries=5):
def login(self, email, password, max_tries=5, user_agent=None):
"""
Uses `email` and `password` to login the user (If the user is already logged in, this will do a re-login)
@@ -456,23 +286,21 @@ class Client(object):
raise FBchatUserError("Email and password not set")
for i in range(1, max_tries + 1):
login_successful, login_url = self._login(email, password)
if not login_successful:
log.warning(
"Attempt #{} failed{}".format(
i, {True: ", retrying"}.get(i < max_tries, "")
)
)
try:
state = State.login(email, password, user_agent=user_agent)
uid = state.get_user_id()
if uid is None:
raise FBchatException("Could not find user id")
except Exception:
if i >= max_tries:
raise
log.exception("Attempt #{} failed, retrying".format(i))
time.sleep(1)
continue
else:
self._state = state
self._uid = uid
self.onLoggedIn(email=email)
break
else:
raise FBchatUserError(
"Login failed. Check email/password. "
"(Failed on url: {})".format(login_url)
)
def logout(self):
"""
@@ -481,17 +309,11 @@ class Client(object):
:return: True if the action was successful
:rtype: bool
"""
if not hasattr(self, "_fb_h"):
h_r = self._post("/bluebar/modern_settings_menu/", {"pmid": "4"})
self._fb_h = re.search(r'name=\\"h\\" value=\\"(.*?)\\"', h_r.text).group(1)
data = {"ref": "mb", "h": self._fb_h}
r = self._get("/logout.php", data)
self._resetValues()
return r.ok
if self._state.logout():
self._state = None
self._uid = None
return True
return False
"""
END LOGIN METHODS
@@ -1299,7 +1121,7 @@ class Client(object):
# update JS token if received in response
fb_dtsg = get_jsmods_require(j, 2)
if fb_dtsg is not None:
self._payload_default["fb_dtsg"] = fb_dtsg
self._state.fb_dtsg = fb_dtsg
try:
message_ids = [
@@ -2092,9 +1914,6 @@ class Client(object):
# We're using ordered dicts, because the Facebook endpoint that parses the POST
# parameters is badly implemented, and deals with ordering the options wrongly.
# This also means we had to change `client._payload_default` to an ordered dict,
# since that's being copied in between this point and the `requests` call
#
# If you can find a way to fix this for the endpoint, or if you find another
# endpoint, please do suggest it ;)
data = OrderedDict([("question_text", poll.title), ("target_id", thread_id)])

185
fbchat/_state.py Normal file
View File

@@ -0,0 +1,185 @@
# -*- coding: UTF-8 -*-
from __future__ import unicode_literals
import attr
import bs4
import re
import requests
import random
from . import _util, _exception
FB_DTSG_REGEX = re.compile(r'name="fb_dtsg" value="(.*?)"')
def session_factory(user_agent=None):
session = requests.session()
session.headers["Referer"] = "https://www.facebook.com"
# TODO: Deprecate setting the user agent manually
session.headers["User-Agent"] = user_agent or random.choice(_util.USER_AGENTS)
return session
def _2fa_helper(session, code, r):
soup = bs4.BeautifulSoup(r.text, "html.parser")
data = dict()
url = "https://m.facebook.com/login/checkpoint/"
data["approvals_code"] = code
data["fb_dtsg"] = soup.find("input", {"name": "fb_dtsg"})["value"]
data["nh"] = soup.find("input", {"name": "nh"})["value"]
data["submit[Submit Code]"] = "Submit Code"
data["codes_submitted"] = 0
_util.log.info("Submitting 2FA code.")
r = session.post(url, data=data)
if "home" in r.url:
return r
del data["approvals_code"]
del data["submit[Submit Code]"]
del data["codes_submitted"]
data["name_action_selected"] = "save_device"
data["submit[Continue]"] = "Continue"
_util.log.info("Saving browser.")
# At this stage, we have dtsg, nh, name_action_selected, submit[Continue]
r = session.post(url, data=data)
if "home" in r.url:
return r
del data["name_action_selected"]
_util.log.info("Starting Facebook checkup flow.")
# At this stage, we have dtsg, nh, submit[Continue]
r = session.post(url, data=data)
if "home" in r.url:
return r
del data["submit[Continue]"]
data["submit[This was me]"] = "This Was Me"
_util.log.info("Verifying login attempt.")
# At this stage, we have dtsg, nh, submit[This was me]
r = session.post(url, data=data)
if "home" in r.url:
return r
del data["submit[This was me]"]
data["submit[Continue]"] = "Continue"
data["name_action_selected"] = "save_device"
_util.log.info("Saving device again.")
# At this stage, we have dtsg, nh, submit[Continue], name_action_selected
r = session.post(url, data=data)
return r
@attr.s(slots=True) # TODO i Python 3: Add kw_only=True
class State(object):
"""Stores and manages state required for most Facebook requests."""
fb_dtsg = attr.ib()
_revision = attr.ib()
_session = attr.ib(factory=session_factory)
_counter = attr.ib(0)
_logout_h = attr.ib(None)
def get_user_id(self):
rtn = self.get_cookies().get("c_user")
if rtn is None:
return None
return str(rtn)
def get_params(self):
self._counter += 1 # TODO: Make this operation atomic / thread-safe
return {
"__a": 1,
"__req": _util.str_base(self._counter, 36),
"__rev": self._revision,
"fb_dtsg": self.fb_dtsg,
}
@classmethod
def login(cls, email, password, user_agent=None):
session = session_factory(user_agent=user_agent)
soup = bs4.BeautifulSoup(
session.get("https://m.facebook.com/").text, "html.parser"
)
data = dict(
(elem["name"], elem["value"])
for elem in soup.findAll("input")
if elem.has_attr("value") and elem.has_attr("name")
)
data["email"] = email
data["pass"] = password
data["login"] = "Log In"
r = session.post("https://m.facebook.com/login.php?login_attempt=1", data=data)
# Usually, 'Checkpoint' will refer to 2FA
if "checkpoint" in r.url and ('id="approvals_code"' in r.text.lower()):
code = on_2fa_callback()
r = _2fa_helper(session, code, r)
# Sometimes Facebook tries to show the user a "Save Device" dialog
if "save-device" in r.url:
r = session.get("https://m.facebook.com/login/save-device/cancel/")
if "home" in r.url:
return cls.from_session(session=session)
else:
raise _exception.FBchatUserError(
"Login failed. Check email/password. "
"(Failed on url: {})".format(r.url)
)
def is_logged_in(self):
# Send a request to the login url, to see if we're directed to the home page
url = "https://m.facebook.com/login.php?login_attempt=1"
r = self._session.get(url, allow_redirects=False)
return "Location" in r.headers and "home" in r.headers["Location"]
def logout(self):
logout_h = self._logout_h
if not logout_h:
url = _util.prefix_url("/bluebar/modern_settings_menu/")
h_r = self._session.post(url, data={"pmid": "4"})
logout_h = re.search(r'name=\\"h\\" value=\\"(.*?)\\"', h_r.text).group(1)
url = _util.prefix_url("/logout.php")
return self._session.get(url, params={"ref": "mb", "h": logout_h}).ok
@classmethod
def from_session(cls, session):
r = session.get(_util.prefix_url("/"))
soup = bs4.BeautifulSoup(r.text, "html.parser")
fb_dtsg_element = soup.find("input", {"name": "fb_dtsg"})
if fb_dtsg_element:
fb_dtsg = fb_dtsg_element["value"]
else:
# Fall back to searching with a regex
fb_dtsg = FB_DTSG_REGEX.search(r.text).group(1)
revision = int(r.text.split('"client_revision":', 1)[1].split(",", 1)[0])
logout_h_element = soup.find("input", {"name": "h"})
logout_h = logout_h_element["value"] if logout_h_element else None
return cls(
fb_dtsg=fb_dtsg, revision=revision, session=session, logout_h=logout_h
)
def get_cookies(self):
return self._session.cookies.get_dict()
@classmethod
def from_cookies(cls, cookies, user_agent=None):
session = session_factory(user_agent=user_agent)
session.cookies = requests.cookies.merge_cookies(session.cookies, cookies)
return cls.from_session(session=session)