Merge branch 'development' into master

This commit is contained in:
Mads T Marquart
2017-05-15 15:49:14 +02:00
committed by GitHub
9 changed files with 804 additions and 462 deletions

3
.gitignore vendored
View File

@@ -1,5 +1,7 @@
*py[co]
.idea/
# Test scripts
*.sh
@@ -23,4 +25,5 @@ develop-eggs
docs/_build/
# Data for tests
test_data.json
tests.data

View File

@@ -15,7 +15,7 @@ from .client import *
__copyright__ = 'Copyright 2015 by Taehoon Kim'
__version__ = '0.10.1'
__version__ = '0.10.3'
__license__ = 'BSD'
__author__ = 'Taehoon Kim; Moreels Pieter-Jan'
__email__ = 'carpedm20@gmail.com'

File diff suppressed because it is too large Load Diff

57
fbchat/event_hook.py Normal file
View File

@@ -0,0 +1,57 @@
import inspect
class EventHook(object):
"""
A simple implementation of the Observer-Pattern.
The user can specify an event signature upon inizializazion,
defined by kwargs in the form of argumentname=class (e.g. id=int).
The arguments' types are not checked in this implementation though.
Callables with a fitting signature can be added with += or removed with -=.
All listeners can be notified by calling the EventHook class with fitting
arguments.
Thanks http://stackoverflow.com/a/35957226/5556222
"""
def __init__(self, **signature):
self._signature = signature
self._argnames = set(signature.keys())
self._handlers = []
def _kwargs_str(self):
return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())
def __iadd__(self, handler):
params = inspect.signature(handler).parameters
valid = True
argnames = set(n for n in params.keys())
if argnames != self._argnames:
valid = False
for p in params.values():
if p.kind == p.VAR_KEYWORD:
valid = True
break
if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
valid = False
break
if not valid:
raise ValueError("Listener must have these arguments: (%s)"
% self._kwargs_str())
self._handlers.append(handler)
return self
def __isub__(self, handler):
self._handlers.remove(handler)
return self
def __call__(self, *args, **kwargs):
if args or set(kwargs.keys()) != self._argnames:
raise ValueError("This EventHook must be called with these " +
"keyword arguments: (%s)" % self._kwargs_str() +
", but was called with: (%s)" %self._signature)
for handler in self._handlers[:]:
handler(**kwargs)
def __repr__(self):
return "EventHook(%s)" % self._kwargs_str()

View File

@@ -1,15 +1,8 @@
from __future__ import unicode_literals
import sys
from enum import Enum
class Base():
def __repr__(self):
uni = self.__unicode__()
return uni.encode('utf-8') if sys.version_info < (3, 0) else uni
def __unicode__(self):
return u'<%s %s (%s)>' % (self.type.upper(), self.name, self.url)
class User(Base):
class User:
def __init__(self, data):
if data['type'] != 'user':
raise Exception("[!] %s <%s> is not a user" % (data['text'], data['path']))
@@ -19,13 +12,85 @@ class User(Base):
self.url = data['path']
self.name = data['text']
self.score = data['score']
self.data = data
class Thread():
def __repr__(self):
uni = self.__unicode__()
return uni.encode('utf-8') if sys.version_info < (3, 0) else uni
def __unicode__(self):
return u'<%s %s (%s)>' % (self.type.upper(), self.name, self.url)
@staticmethod
def adaptFromChat(user_in_chat):
""" Adapts user info from chat to User model acceptable initial dict
:param user_in_chat: user info from chat
'dir': None,
'mThumbSrcSmall': None,
'is_friend': False,
'is_nonfriend_messenger_contact': True,
'alternateName': '',
'i18nGender': 16777216,
'vanity': '',
'type': 'friend',
'searchTokens': ['Voznesenskij', 'Sergej'],
'thumbSrc': 'https://fb-s-b-a.akamaihd.net/h-ak-xfa1/v/t1.0-1/c9.0.32.32/p32x32/10354686_10150004552801856_220367501106153455_n.jpg?oh=71a87d76d4e4d17615a20c43fb8dbb47&oe=59118CE4&__gda__=1493753268_ae75cef40e9785398e744259ccffd7ff',
'mThumbSrcLarge': None,
'firstName': 'Sergej',
'name': 'Sergej Voznesenskij',
'uri': 'https://www.facebook.com/profile.php?id=100014812758264',
'id': '100014812758264',
'gender': 2
"""
return {
'type': 'user',
'uid': user_in_chat['id'],
'photo': user_in_chat['thumbSrc'],
'path': user_in_chat['uri'],
'text': user_in_chat['name'],
'score': '',
'data': user_in_chat,
}
class Thread:
def __init__(self, **entries):
self.__dict__.update(entries)
class Message():
class Message:
def __init__(self, **entries):
self.__dict__.update(entries)
class ThreadType(Enum):
USER = 1
GROUP = 2
class TypingStatus(Enum):
DELETED = 0
TYPING = 1
class EmojiSize(Enum):
LARGE = {
'value': '369239383222810',
'name': 'large'
}
MEDIUM = {
'value': '369239343222814',
'name': 'medium'
}
SMALL = {
'value': '369239263222822',
'name': 'small'
}
LIKES = {
'l': EmojiSize.LARGE,
'm': EmojiSize.MEDIUM,
's': EmojiSize.SMALL
}
LIKES['large'] = LIKES['l']
LIKES['medium'] =LIKES['m']
LIKES['small'] = LIKES['s']

View File

@@ -1,8 +0,0 @@
LIKES={
'l': '369239383222810',
'm': '369239343222814',
's': '369239263222822'
}
LIKES['large'] = LIKES['l']
LIKES['medium'] =LIKES['m']
LIKES['small'] = LIKES['s']

View File

@@ -2,6 +2,7 @@ import re
import json
from time import time
from random import random
import warnings
USER_AGENTS = [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/601.1.10 (KHTML, like Gecko) Version/8.0.5 Safari/601.1.10",
@@ -25,6 +26,7 @@ GENDERS = {
11: 'unknown_plural',
}
def now():
return int(time()*1000)
@@ -39,7 +41,7 @@ def digit_to_char(digit):
return str(digit)
return chr(ord('a') + digit - 10)
def str_base(number,base):
def str_base(number, base):
if number < 0:
return '-' + str_base(-number, base)
(d, m) = divmod(number, base)
@@ -50,15 +52,42 @@ def str_base(number,base):
def generateMessageID(client_id=None):
k = now()
l = int(random() * 4294967295)
return ("<%s:%s-%s@mail.projektitan.com>" % (k, l, client_id));
return "<%s:%s-%s@mail.projektitan.com>" % (k, l, client_id)
def getSignatureID():
return hex(int(random() * 2147483648))
def generateOfflineThreadingID() :
def generateOfflineThreadingID():
ret = now()
value = int(random() * 4294967295);
value = int(random() * 4294967295)
string = ("0000000000000000000000" + bin(value))[-22:]
msgs = bin(ret) + string
return str(int(msgs,2))
return str(int(msgs, 2))
def isUserToThreadType(is_user):
return ThreadType.USER if is_user else ThreadType.GROUP
def deprecation(name, deprecated_in=None, details='', stacklevel=3):
"""This is a function which should be used to mark parameters as deprecated.
It will result in a warning being emmitted when the parameter is used.
"""
warning = "{} is deprecated".format(name)
if deprecated_in:
warning += ' in v. {}'.format(deprecated_in)
if details:
warning += '. {}'.format(details)
warnings.simplefilter('always', DeprecationWarning)
warnings.warn(warning, category=DeprecationWarning, stacklevel=stacklevel)
warnings.simplefilter('default', DeprecationWarning)
def deprecated(deprecated_in=None, details=''):
"""This is a decorator which can be used to mark functions as deprecated.
It will result in a warning being emmitted when the decorated function is used.
"""
def wrap(func, *args, **kwargs):
def wrapped_func(*args, **kwargs):
deprecation(func.__qualname__, deprecated_in, details, stacklevel=2)
return func(*args, **kwargs)
return wrapped_func
return wrap

6
test_data.json Normal file
View File

@@ -0,0 +1,6 @@
{
"email": "",
"password": "",
"user_thread_id": "",
"group_thread_id": ""
}

131
tests.py
View File

@@ -1,27 +1,36 @@
#!/usr/bin/env python
import time
import json
import logging
import fbchat
from fbchat.models import *
import getpass
import unittest
import sys
from os import path
# Disable logging
logging.basicConfig(level=100)
fbchat.log.setLevel(100)
#Setup logging
logging.basicConfig(level=logging.INFO)
"""
Tests for fbchat
~~~~~~~~~~~~~~~~
To use these tests, put:
- email
- password
- a group_uid
- a user_uid (the user will be kicked from the group and then added again)
(seperated these by a newline) in a file called `tests.data`, or type them manually in the terminal prompts
To use these tests, make a json file called test_data.json, put this example in it, and fill in the gaps:
{
"email": "example@email.com",
"password": "example_password",
"group_thread_id": 0,
"user_thread_id": 0
}
or type this information manually in the terminal prompts.
- email: Your (or a test user's) email / phone number
- password: Your (or a test user's) password
- group_thread_id: A test group that will be used to test group functionality
- user_thread_id: A person that will be used to test kick/add functionality (This user should be in the group)
Please remember to test both python v. 2.7 and python v. 3.6!
@@ -31,30 +40,36 @@ If you only want to execute specific tests, pass the function names in the comma
"""
class TestFbchat(unittest.TestCase):
def test_login_functions(self):
self.assertTrue(client.is_logged_in())
def setUp(self):
pass
def tearDown(self):
time.sleep(3)
def test_loginFunctions(self):
self.assertTrue(client.isLoggedIn())
client.logout()
self.assertFalse(client.is_logged_in())
self.assertFalse(client.isLoggedIn())
with self.assertRaises(Exception):
client.login("not@email.com", "not_password", max_retries=1)
client.login(email, password)
self.assertTrue(client.is_logged_in())
self.assertTrue(client.isLoggedIn())
def test_sessions(self):
global client
session_cookies = client.getSession()
client = fbchat.Client(email, password, session_cookies=session_cookies)
self.assertTrue(client.is_logged_in())
self.assertTrue(client.isLoggedIn())
def test_setDefaultRecipient(self):
client.setDefaultRecipient(client.uid, is_user=True)
self.assertTrue(client.send(message="test_default_recipient"))
def test_setDefaultThreadId(self):
client.setDefaultThreadId(client.uid, ThreadType.USER)
self.assertTrue(client.sendMessage("test_default_recipient"))
def test_getAllUsers(self):
users = client.getAllUsers()
@@ -74,51 +89,57 @@ class TestFbchat(unittest.TestCase):
self.assertEquals(u.name, 'Mark Zuckerberg')
self.assertGreater(u.score, 0)
def test_send_likes(self):
self.assertTrue(client.send(client.uid, like='s'))
self.assertTrue(client.send(client.uid, like='m'))
self.assertTrue(client.send(client.uid, like='l'))
self.assertTrue(client.send(group_uid, like='s', is_user=False))
self.assertTrue(client.send(group_uid, like='m', is_user=False))
self.assertTrue(client.send(group_uid, like='l', is_user=False))
def test_sendEmoji(self):
self.assertTrue(client.sendEmoji(EmojiSize.SMALL, user_uid, ThreadType.USER))
self.assertTrue(client.sendEmoji(EmojiSize.MEDIUM, user_uid, ThreadType.USER))
self.assertTrue(client.sendEmoji(EmojiSize.LARGE, user_uid, ThreadType.USER))
self.assertTrue(client.sendEmoji(EmojiSize.SMALL, group_uid, ThreadType.GROUP))
self.assertTrue(client.sendEmoji(EmojiSize.MEDIUM, group_uid, ThreadType.GROUP))
self.assertTrue(client.sendEmoji(EmojiSize.LARGE, group_uid, ThreadType.GROUP))
def test_send(self):
self.assertTrue(client.send(client.uid, message='test_send_user'))
self.assertTrue(client.send(group_uid, message='test_send_group', is_user=False))
def test_sendMessage(self):
self.assertTrue(client.sendMessage('test_send_user', user_uid, ThreadType.USER))
self.assertTrue(client.sendMessage('test_send_group', group_uid, ThreadType.GROUP))
def test_send_images(self):
def test_sendImages(self):
image_url = 'https://cdn4.iconfinder.com/data/icons/ionicons/512/icon-image-128.png'
image_local_url = path.join(path.dirname(__file__), 'test_image.png')
self.assertTrue(client.sendRemoteImage(client.uid, message='test_send_user_images_remote', image=image_url))
self.assertTrue(client.sendLocalImage(client.uid, message='test_send_user_images_local', image=image_local_url))
self.assertTrue(client.sendRemoteImage(group_uid, message='test_send_group_images_remote', is_user=False, image=image_url))
self.assertTrue(client.sendLocalImage(group_uid, message='test_send_group_images_local', is_user=False, image=image_local_url))
self.assertTrue(client.sendRemoteImage(image_url, 'test_send_user_images_remote', user_uid, ThreadType.USER))
self.assertTrue(client.sendRemoteImage(image_url, 'test_send_group_images_remote', group_uid, ThreadType.GROUP))
# Idk why but doesnt work, payload is null
self.assertTrue(client.sendLocalImage(image_local_url, 'test_send_group_images_local', user_uid, ThreadType.USER))
self.assertTrue(client.sendLocalImage(image_local_url, 'test_send_group_images_local', group_uid, ThreadType.GROUP))
def test_getThreadInfo(self):
info = client.getThreadInfo(client.uid, last_n=1)
self.assertEquals(info[0].author, 'fbid:' + str(client.uid))
client.send(group_uid, message='test_getThreadInfo', is_user=False)
info = client.getThreadInfo(group_uid, last_n=1, is_user=False)
self.assertEquals(info[0].author, 'fbid:' + str(client.uid))
self.assertEquals(info[0].body, 'test_getThreadInfo')
client.sendMessage('test_user_getThreadInfo', user_uid, ThreadType.USER)
time.sleep(3)
info = client.getThreadInfo(20, user_uid, ThreadType.USER)
self.assertEquals(info[0].author, 'fbid:' + client.uid)
self.assertEquals(info[0].body, 'test_user_getThreadInfo')
client.sendMessage('test_group_getThreadInfo', group_uid, ThreadType.GROUP)
time.sleep(3)
info = client.getThreadInfo(20, group_uid, ThreadType.GROUP)
self.assertEquals(info[0].author, 'fbid:' + client.uid)
self.assertEquals(info[0].body, 'test_group_getThreadInfo')
def test_markAs(self):
# To be implemented (requires some form of manual watching)
pass
def test_listen(self):
client.do_one_listen()
client.doOneListen()
def test_getUserInfo(self):
info = client.getUserInfo(4)
self.assertEquals(info['name'], 'Mark Zuckerberg')
def test_remove_add_from_chat(self):
self.assertTrue(client.remove_user_from_chat(group_uid, user_uid))
self.assertTrue(client.add_users_to_chat(group_uid, user_uid))
def test_removeAddFromChat(self):
self.assertTrue(client.removeUserFromChat(user_uid, group_uid))
self.assertTrue(client.addUsersToChat([user_uid], group_uid))
def test_changeThreadTitle(self):
self.assertTrue(client.changeThreadTitle(group_uid, 'test_changeThreadTitle'))
self.assertTrue(client.changeThreadTitle('test_changeThreadTitle', group_uid))
def start_test(param_client, param_group_uid, param_user_uid, tests=[]):
@@ -134,12 +155,13 @@ def start_test(param_client, param_group_uid, param_user_uid, tests=[]):
suite = unittest.TestLoader().loadTestsFromTestCase(TestFbchat)
else:
suite = unittest.TestSuite(map(TestFbchat, tests))
print ('Starting test(s)')
print('Starting test(s)')
unittest.TextTestRunner(verbosity=2).run(suite)
client = None
if __name__ == '__main__':
if __name__ == 'tests':
# Python 3 does not use raw_input, whereas Python 2 does
try:
input = raw_input
@@ -147,20 +169,19 @@ if __name__ == '__main__':
pass
try:
with open(path.join(path.dirname(__file__), 'tests.data'), 'r') as f:
content = f.readlines()
content = [x.strip() for x in content if len(x.strip()) != 0]
email = content[0]
password = content[1]
group_uid = content[2]
user_uid = content[3]
with open(path.join(path.dirname(__file__), 'test_data.json'), 'r') as f:
json = json.load(f)
email = json['email']
password = json['password']
user_uid = json['user_thread_id']
group_uid = json['group_thread_id']
except (IOError, IndexError) as e:
email = input('Email: ')
password = getpass.getpass()
group_uid = input('Please enter a group uid (To test group functionality): ')
user_uid = input('Please enter a user uid (To test kicking/adding functionality): ')
group_uid = input('Please enter a group thread id (To test group functionality): ')
user_uid = input('Please enter a user thread id (To test kicking/adding functionality): ')
print ('Logging in')
print('Logging in...')
client = fbchat.Client(email, password)
# Warning! Taking user input directly like this could be dangerous! Use only for testing purposes!