Merge branch 'master' into master

This commit is contained in:
Dainius
2017-05-08 16:54:32 +03:00
committed by GitHub
7 changed files with 254 additions and 71 deletions

5
.gitignore vendored
View File

@@ -1,6 +1,6 @@
*py[co]
#test
# Test scripts
*.sh
# Packages
@@ -21,3 +21,6 @@ develop-eggs
# Sphinx documentation
docs/_build/
# Data for tests
tests.data

View File

@@ -61,7 +61,7 @@ Getting last messages sent
.. code-block:: python
last_messages = client.getThreadInfo(friend.uid,0)
last_messages = client.getThreadInfo(friend.uid, last_n=20)
last_messages.reverse() # messages come in reversed order
for message in last_messages:
@@ -99,7 +99,8 @@ Saving session
.. code-block:: python
client.saveSession(sessionfile)
session_cookies = client.setSession()
# save session_cookies
Loading session
@@ -107,8 +108,9 @@ Loading session
.. code-block:: python
client = fbchat.Client(None, None, do_login=False)
client.loadSession(sessionfile)
client = fbchat.Client(None, None, session_cookies=session_cookies)
# OR
client.setSession(session_cookies)
Authors

View File

@@ -15,7 +15,7 @@ from .client import *
__copyright__ = 'Copyright 2015 by Taehoon Kim'
__version__ = '0.8.2'
__version__ = '0.9.0'
__license__ = 'BSD'
__author__ = 'Taehoon Kim; Moreels Pieter-Jan'
__email__ = 'carpedm20@gmail.com'

View File

@@ -57,6 +57,7 @@ facebookEncoding = 'UTF-8'
# Log settings
log = logging.getLogger("client")
log.setLevel(logging.DEBUG)
class Client(object):
@@ -66,39 +67,27 @@ class Client(object):
documentation for the API.
"""
def __init__(self, email, password, debug=True, info_log=True, user_agent=None, max_retries=5, do_login=True):
def __init__(self, email, password, debug=True, info_log=True, user_agent=None, max_retries=5, session_cookies=None):
"""A client for the Facebook Chat (Messenger).
:param email: Facebook `email` or `id` or `phone number`
:param password: Facebook account password
import fbchat
chat = fbchat.Client(email, password)
:param debug: Configures the logger to `debug` logging_level
:param info_log: Configures the logger to `info` logging_level
:param user_agent: Custom user agent to use when sending requests. If `None`, user agent will be chosen from a premade list (see utils.py)
:param max_retries: Maximum number of times to retry login
:param session_cookies: Cookie dict from a previous session (Will default to login if these are invalid)
"""
self.is_def_recipient_set = False
self.debug = debug
self.sticky, self.pool = (None, None)
self._session = requests.session()
self.req_counter = 1
self.seq = "0"
self.payloadDefault={}
self.payloadDefault = {}
self.client = 'mercury'
self.listening = False
self.GENDERS = {
0: 'unknown',
1: 'female_singular',
2: 'male_singular',
3: 'female_singular_guess',
4: 'male_singular_guess',
5: 'mixed',
6: 'neuter_singular',
7: 'unknown_singular',
8: 'female_plural',
9: 'male_plural',
10: 'neuter_plural',
11: 'unknown_plural',
}
self.threads = []
# Setup event hooks
self.onLoggingIn = EventHook()
@@ -156,7 +145,7 @@ class Client(object):
'Connection' : 'keep-alive',
}
# Configure the logger differently based on the 'debug' parameter
# Configure the logger differently based on the 'debug' and 'info_log' parameters
if debug:
logging_level = logging.DEBUG
elif info_log:
@@ -168,13 +157,11 @@ class Client(object):
handler = logging.StreamHandler()
handler.setLevel(logging_level)
log.addHandler(handler)
log.setLevel(logging.DEBUG)
if do_login:
# If session cookies aren't set, not properly loaded or gives us an invalid session, then do the login
if not session_cookies or not self.setSession(session_cookies) or not self.isLoggedIn():
self.login(email, password, max_retries)
self.threads = []
def _console(self, msg):
"""Assumes an INFO level and log it.
@@ -206,11 +193,11 @@ class Client(object):
return payload
def _get(self, url, query=None, timeout=30):
payload=self._generatePayload(query)
payload = self._generatePayload(query)
return self._session.get(url, headers=self._header, params=payload, timeout=timeout)
def _post(self, url, query=None, timeout=30):
payload=self._generatePayload(query)
payload = self._generatePayload(query)
return self._session.post(url, headers=self._header, data=payload, timeout=timeout)
def _cleanGet(self, url, query=None, timeout=30):
@@ -343,50 +330,46 @@ class Client(object):
r = self._cleanPost(CheckpointURL, data)
return r
def saveSession(self, sessionfile):
"""Dumps the session cookies to (sessionfile).
WILL OVERWRITE ANY EXISTING FILE
def isLoggedIn(self):
# Send a request to the login url, to see if we're directed to the home page.
r = self._cleanGet(LoginURL)
return 'home' in r.url:
:param sessionfile: location of saved session file
def getSession(self):
"""Returns the session cookies"""
return self._session.cookies.get_dict()
def setSession(self, session_cookies):
"""Loads session cookies
:param session_cookies: dictionary containing session cookies
Return false if session_cookies does not contain proper cookies
"""
log.info('Saving session')
with open(sessionfile, 'w') as f:
# Grab cookies from current session, and save them as JSON
f.write(json.dumps(self._session.cookies.get_dict(), ensure_ascii=False))
# Quick check to see if session_cookies is formatted properly
if not session_cookies or 'c_user' not in session_cookies:
return False
def loadSession(self, sessionfile):
"""Loads session cookies from (sessionfile)
:param sessionfile: location of saved session file
"""
log.info('Loading session')
with open(sessionfile, 'r') as f:
try:
j = json.load(f)
if not j or 'c_user' not in j:
return False
# Load cookies into current session
self._session.cookies = requests.cookies.merge_cookies(self._session.cookies, j)
self._postLogin()
return True
except Exception as e:
raise Exception('Invalid json in {}, or bad merging of cookies'.format(sessionfile))
# Load cookies into current session
self._session.cookies = requests.cookies.merge_cookies(self._session.cookies, session_cookies)
self._postLogin()
return True
def login(self, email, password, max_retries=5):
self.onLoggingIn()
self.onLoggingIn(email)
if not (email and password):
raise Exception("Email and password not set.")
self.email = email
self.password = password
for i in range(1, max_retries+1):
if not self._login():
log.warning("Attempt #{} failed{}".format(i,{True:', retrying'}.get(i<5,'')))
log.warning("Attempt #{} failed{}".format(i, {True: ', retrying'}.get(i < 5, '')))
time.sleep(1)
continue
else:
self.onLoggedIn()
self.onLoggedIn(email)
break
else:
raise Exception("Login failed. Check email/password.")
@@ -474,6 +457,8 @@ class Client(object):
:param like: size of the like sticker you want to send
:param image_id: id for the image to send, gotten from the UploadURL
:param add_user_ids: a list of user ids to add to a chat
returns a list of message ids of the sent message(s)
"""
if self.is_def_recipient_set:
@@ -542,10 +527,9 @@ class Client(object):
r = self._post(SendURL, data)
if r.ok:
log.info('Message sent.')
else:
log.info('Message not sent.')
if not r.ok:
log.warning('Error when sending message: Got {} response'.format(r.status_code))
return False
if isinstance(r._content, str) is False:
r._content = r._content.decode(facebookEncoding)
@@ -555,9 +539,18 @@ class Client(object):
log.warning('Error #{} when sending message: {}'.format(j['error'], j['errorDescription']))
return False
message_ids = []
try:
message_ids += [action['message_id'] for action in j['payload']['actions'] if 'message_id' in action]
message_ids[0] # Try accessing element
except (KeyError, IndexError) as e:
log.warning('Error when sending message: No message ids could be found')
return False
log.info('Message sent.')
log.debug("Sending {}".format(r))
log.debug("With data {}".format(data))
return True
return message_ids
def sendRemoteImage(self, recipient_id=None, message=None, is_user=True, image=''):
"""Send an image from a URL
@@ -616,7 +609,7 @@ class Client(object):
# `start` doesn't matter, always returns from the last
# data['messages[{}][{}][offset]'.format(key, userID)] = start
data = {'messages[{}][{}][offset]'.format(key, userID): 0,
'messages[{}][{}][limit]'.format(key, userID): last_n,
'messages[{}][{}][limit]'.format(key, userID): last_n - 1,
'messages[{}][{}][timestamp]'.format(key, userID): now()}
r = self._post(MessagesURL, query=data)
@@ -958,7 +951,7 @@ class Client(object):
if type(_fbid) == int:
return _fbid
if type(_fbid) == str and 'fbid:' in _fbid:
if type(_fbid) in [str, unicode] and 'fbid:' in _fbid:
return int(_fbid[5:])
user_ids = [fbidStrip(uid) for uid in user_ids]

View File

@@ -11,12 +11,29 @@ USER_AGENTS = [
"Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6"
]
GENDERS = {
0: 'unknown',
1: 'female_singular',
2: 'male_singular',
3: 'female_singular_guess',
4: 'male_singular_guess',
5: 'mixed',
6: 'neuter_singular',
7: 'unknown_singular',
8: 'female_plural',
9: 'male_plural',
10: 'neuter_plural',
11: 'unknown_plural',
}
def now():
return int(time()*1000)
def strip_to_json(text):
return text[text.index('{'):]
def get_json(text):
return json.loads(re.sub(r"^[^{]*", '', text, 1))
return json.loads(strip_to_json(text))
def digit_to_char(digit):
if digit < 10:

BIN
test_image.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.6 KiB

168
tests.py Normal file
View File

@@ -0,0 +1,168 @@
#!/usr/bin/env python
import logging
import fbchat
import getpass
import unittest
import sys
from os import path
# Disable logging
logging.basicConfig(level=100)
fbchat.log.setLevel(100)
"""
Tests for fbchat
~~~~~~~~~~~~~~~~
To use these tests, put:
- email
- password
- a group_uid
- a user_uid (the user will be kicked from the group and then added again)
(seperated these by a newline) in a file called `tests.data`, or type them manually in the terminal prompts
Please remember to test both python v. 2.7 and python v. 3.6!
If you've made any changes to the 2FA functionality, test it with a 2FA enabled account
If you only want to execute specific tests, pass the function names in the commandline
"""
class TestFbchat(unittest.TestCase):
def test_login_functions(self):
self.assertTrue(client.is_logged_in())
client.logout()
self.assertFalse(client.is_logged_in())
with self.assertRaises(Exception):
client.login("not@email.com", "not_password", max_retries=1)
client.login(email, password)
self.assertTrue(client.is_logged_in())
def test_sessions(self):
global client
session_cookies = client.getSession()
client = fbchat.Client(email, password, session_cookies=session_cookies)
self.assertTrue(client.is_logged_in())
def test_setDefaultRecipient(self):
client.setDefaultRecipient(client.uid, is_user=True)
self.assertTrue(client.send(message="test_default_recipient"))
def test_getAllUsers(self):
users = client.getAllUsers()
self.assertGreater(len(users), 0)
def test_getUsers(self):
users = client.getUsers("Mark Zuckerberg")
self.assertGreater(len(users), 0)
u = users[0]
# Test if values are set correctly
self.assertIsInstance(u.uid, int)
self.assertEquals(u.type, 'user')
self.assertEquals(u.photo[:4], 'http')
self.assertEquals(u.url[:4], 'http')
self.assertEquals(u.name, 'Mark Zuckerberg')
self.assertGreater(u.score, 0)
def test_send_likes(self):
self.assertTrue(client.send(client.uid, like='s'))
self.assertTrue(client.send(client.uid, like='m'))
self.assertTrue(client.send(client.uid, like='l'))
self.assertTrue(client.send(group_uid, like='s', is_user=False))
self.assertTrue(client.send(group_uid, like='m', is_user=False))
self.assertTrue(client.send(group_uid, like='l', is_user=False))
def test_send(self):
self.assertTrue(client.send(client.uid, message='test_send_user'))
self.assertTrue(client.send(group_uid, message='test_send_group', is_user=False))
def test_send_images(self):
image_url = 'https://cdn4.iconfinder.com/data/icons/ionicons/512/icon-image-128.png'
image_local_url = path.join(path.dirname(__file__), 'test_image.png')
self.assertTrue(client.sendRemoteImage(client.uid, message='test_send_user_images_remote', image=image_url))
self.assertTrue(client.sendLocalImage(client.uid, message='test_send_user_images_local', image=image_local_url))
self.assertTrue(client.sendRemoteImage(group_uid, message='test_send_group_images_remote', is_user=False, image=image_url))
self.assertTrue(client.sendLocalImage(group_uid, message='test_send_group_images_local', is_user=False, image=image_local_url))
def test_getThreadInfo(self):
info = client.getThreadInfo(client.uid, last_n=1)
self.assertEquals(info[0].author, 'fbid:' + str(client.uid))
client.send(group_uid, message='test_getThreadInfo', is_user=False)
info = client.getThreadInfo(group_uid, last_n=1, is_user=False)
self.assertEquals(info[0].author, 'fbid:' + str(client.uid))
self.assertEquals(info[0].body, 'test_getThreadInfo')
def test_markAs(self):
# To be implemented (requires some form of manual watching)
pass
def test_listen(self):
client.do_one_listen()
def test_getUserInfo(self):
info = client.getUserInfo(4)
self.assertEquals(info['name'], 'Mark Zuckerberg')
def test_remove_add_from_chat(self):
self.assertTrue(client.remove_user_from_chat(group_uid, user_uid))
self.assertTrue(client.add_users_to_chat(group_uid, user_uid))
def test_changeThreadTitle(self):
self.assertTrue(client.changeThreadTitle(group_uid, 'test_changeThreadTitle'))
def start_test(param_client, param_group_uid, param_user_uid, tests=[]):
global client
global group_uid
global user_uid
client = param_client
group_uid = param_group_uid
user_uid = param_user_uid
if len(tests) == 0:
suite = unittest.TestLoader().loadTestsFromTestCase(TestFbchat)
else:
suite = unittest.TestSuite(map(TestFbchat, tests))
print ('Starting test(s)')
unittest.TextTestRunner(verbosity=2).run(suite)
if __name__ == '__main__':
# Python 3 does not use raw_input, whereas Python 2 does
try:
input = raw_input
except Exception as e:
pass
try:
with open(path.join(path.dirname(__file__), 'tests.data'), 'r') as f:
content = f.readlines()
content = [x.strip() for x in content if len(x.strip()) != 0]
email = content[0]
password = content[1]
group_uid = content[2]
user_uid = content[3]
except (IOError, IndexError) as e:
email = input('Email: ')
password = getpass.getpass()
group_uid = input('Please enter a group uid (To test group functionality): ')
user_uid = input('Please enter a user uid (To test kicking/adding functionality): ')
print ('Logging in')
client = fbchat.Client(email, password)
# Warning! Taking user input directly like this could be dangerous! Use only for testing purposes!
start_test(client, group_uid, user_uid, sys.argv[1:])