Merge pull request #140 into development branch from Dainius14/dev

This commit is contained in:
Mads T Marquart
2017-05-10 11:10:16 +02:00
committed by GitHub
4 changed files with 266 additions and 301 deletions

View File

@@ -79,7 +79,6 @@ class Client(object):
:param session_cookies: Cookie dict from a previous session (Will default to login if these are invalid) :param session_cookies: Cookie dict from a previous session (Will default to login if these are invalid)
""" """
self.is_def_recipient_set = False
self.sticky, self.pool = (None, None) self.sticky, self.pool = (None, None)
self._session = requests.session() self._session = requests.session()
self.req_counter = 1 self.req_counter = 1
@@ -87,6 +86,9 @@ class Client(object):
self.payloadDefault = {} self.payloadDefault = {}
self.client = 'mercury' self.client = 'mercury'
self.listening = False self.listening = False
self.is_def_thread_set = False
self.def_thread_id = None
self.def_thread_type = None
self.threads = [] self.threads = []
if not user_agent: if not user_agent:
@@ -117,29 +119,6 @@ class Client(object):
if not session_cookies or not self.setSession(session_cookies) or not self.is_logged_in(): if not session_cookies or not self.setSession(session_cookies) or not self.is_logged_in():
self.login(email, password, max_retries) self.login(email, password, max_retries)
def _console(self, msg):
"""Assumes an INFO level and log it.
This method shouldn't be used anymore.
Use the log itself:
>>> import logging
>>> from fbchat.client import log
>>> log.setLevel(logging.DEBUG)
You can do the same thing by adding the 'debug' argument:
>>> from fbchat import Client
>>> client = Client("...", "...", debug=True)
"""
warnings.warn(
"Client._console shouldn't be used. Use 'log.<level>'",
DeprecationWarning)
log.debug(msg)
def _setttstamp(self):
for i in self.fb_dtsg:
self.ttstamp += str(ord(i))
self.ttstamp += '2'
def _generatePayload(self, query): def _generatePayload(self, query):
"""Adds the following defaults to the payload: """Adds the following defaults to the payload:
__rev, __user, __a, ttstamp, fb_dtsg, __req __rev, __user, __a, ttstamp, fb_dtsg, __req
@@ -175,7 +154,7 @@ class Client(object):
self.payloadDefault = {} self.payloadDefault = {}
self.client_id = hex(int(random()*2147483648))[2:] self.client_id = hex(int(random()*2147483648))[2:]
self.start_time = now() self.start_time = now()
self.uid = int(self._session.cookies['c_user']) self.uid = str(self._session.cookies['c_user'])
self.user_channel = "p_" + str(self.uid) self.user_channel = "p_" + str(self.uid)
self.ttstamp = '' self.ttstamp = ''
@@ -315,8 +294,7 @@ class Client(object):
return True return True
def login(self, email, password, max_retries=5): def login(self, email, password, max_retries=5):
# Logging in self.onLoggingIn(email=email)
log.info("Logging in {}...".format(email))
if not (email and password): if not (email and password):
raise Exception("Email and password not set.") raise Exception("Email and password not set.")
@@ -330,7 +308,7 @@ class Client(object):
time.sleep(1) time.sleep(1)
continue continue
else: else:
log.info("Login of {} successful.".format(email)) self.onLoggedIn(email=email)
break break
else: else:
raise Exception("Login failed. Check email/password.") raise Exception("Login failed. Check email/password.")
@@ -350,15 +328,15 @@ class Client(object):
self.seq = "0" self.seq = "0"
return r return r
def setDefaultRecipient(self, recipient_id, is_user=True): def setDefaultThreadId(self, thread_id=str, thread_type=ThreadType):
"""Sets default recipient to send messages and images to. """Sets default recipient to send messages and images to.
:param recipient_id: the user id or thread id that you want to send a message to :param thread_id: user/group ID to default to
:param is_user: determines if the recipient_id is for user or thread :param thread_type: type of thread_id
""" """
self.def_recipient_id = recipient_id self.def_thread_id = thread_id
self.def_is_user = is_user self.def_thread_type = thread_type
self.is_def_recipient_set = True self.is_def_thread_set = True
def _adapt_user_in_chat_to_user_model(self, user_in_chat): def _adapt_user_in_chat_to_user_model(self, user_in_chat):
""" Adapts user info from chat to User model acceptable initial dict """ Adapts user info from chat to User model acceptable initial dict
@@ -442,24 +420,27 @@ class Client(object):
users.append(User(entry)) users.append(User(entry))
return users # have bug TypeError: __repr__ returned non-string (type bytes) return users # have bug TypeError: __repr__ returned non-string (type bytes)
def send(self, recipient_id=None, message=None, is_user=True, like=None, image_id=None, add_user_ids=None): """
SEND METHODS
"""
def _send(self, thread_id=None, message=None, thread_type=None, emoji_size=None, image_id=None, add_user_ids=None, new_title=None):
"""Send a message with given thread id """Send a message with given thread id
:param recipient_id: the user id or thread id that you want to send a message to :param thread_id: the user id or thread id that you want to send a message to
:param message: a text that you want to send :param message: a text that you want to send
:param is_user: determines if the recipient_id is for user or thread :param thread_type: determines if the recipient_id is for user or thread
:param like: size of the like sticker you want to send :param emoji_size: size of the like sticker you want to send
:param image_id: id for the image to send, gotten from the UploadURL :param image_id: id for the image to send, gotten from the UploadURL
:param add_user_ids: a list of user ids to add to a chat :param add_user_ids: a list of user ids to add to a chat
:return: a list of message ids of the sent message(s)
returns a list of message ids of the sent message(s)
""" """
if self.is_def_recipient_set: if thread_id is None and self.is_def_thread_set:
recipient_id = self.def_recipient_id thread_id = self.def_thread_id
is_user = self.def_is_user thread_type = self.def_thread_type
elif recipient_id is None: elif thread_id is None and not self.is_def_thread_set:
raise Exception('Recipient ID is not set.') raise ValueError('Default Thread ID is not set.')
messageAndOTID = generateOfflineThreadingID() messageAndOTID = generateOfflineThreadingID()
timestamp = now() timestamp = now()
@@ -487,40 +468,47 @@ class Client(object):
'status' : '0', 'status' : '0',
'offline_threading_id':messageAndOTID, 'offline_threading_id':messageAndOTID,
'message_id' : messageAndOTID, 'message_id' : messageAndOTID,
'threading_id':generateMessageID(self.client_id), 'threading_id': generateMessageID(self.client_id),
'ephemeral_ttl_mode:': '0', 'ephemeral_ttl_mode:': '0',
'manual_retry_cnt' : '0', 'manual_retry_cnt' : '0',
'signatureID' : getSignatureID() 'signatureID' : getSignatureID()
} }
if is_user: # Set recipient
data["other_user_fbid"] = recipient_id if thread_type == ThreadType.USER:
else: data["other_user_fbid"] = thread_id
data["thread_fbid"] = recipient_id elif thread_type == ThreadType.GROUP:
data["thread_fbid"] = thread_id
# Set title
if new_title:
data['action_type'] = 'ma-type:log-message'
data['log_message_data[name]'] = new_title
data['log_message_type'] = 'log:thread-name'
# Set users to add
if add_user_ids: if add_user_ids:
data['action_type'] = 'ma-type:log-message' data['action_type'] = 'ma-type:log-message'
# It's possible to add multiple users # It's possible to add multiple users
for i, add_user_id in enumerate(add_user_ids): for i, add_user_id in enumerate(add_user_ids):
data['log_message_data[added_participants][' + str(i) + ']'] = "fbid:" + str(add_user_id) data['log_message_data[added_participants][' + str(i) + ']'] = "fbid:" + str(add_user_id)
data['log_message_type'] = 'log:subscribe' data['log_message_type'] = 'log:subscribe'
else:
# Sending a simple message
if not add_user_ids and not new_title:
data['action_type'] = 'ma-type:user-generated-message' data['action_type'] = 'ma-type:user-generated-message'
data['body'] = message data['body'] = message or ''
data['has_attachment'] = image_id is not None data['has_attachment'] = image_id is not None
data['specific_to_list[0]'] = 'fbid:' + str(recipient_id) data['specific_to_list[0]'] = 'fbid:' + str(thread_id)
data['specific_to_list[1]'] = 'fbid:' + str(self.uid) data['specific_to_list[1]'] = 'fbid:' + str(self.uid)
# Set image to send
if image_id: if image_id:
data['image_ids[0]'] = image_id data['image_ids[0]'] = image_id
if like: # Set emoji to send
try: if emoji_size:
sticker = LIKES[like.lower()] data["sticker_id"] = emoji_size.value
except KeyError:
# if user doesn't enter l or m or s, then use the large one
sticker = LIKES['l']
data["sticker_id"] = sticker
r = self._post(SendURL, data) r = self._post(SendURL, data)
@@ -528,9 +516,10 @@ class Client(object):
log.warning('Error when sending message: Got {} response'.format(r.status_code)) log.warning('Error when sending message: Got {} response'.format(r.status_code))
return False return False
if isinstance(r._content, str) is False: response_content = {}
r._content = r._content.decode(facebookEncoding) if isinstance(r.content, str) is False:
j = get_json(r._content) response_content = r.content.decode(facebookEncoding)
j = get_json(response_content)
if 'error' in j: if 'error' in j:
# 'errorDescription' is in the users own language! # 'errorDescription' is in the users own language!
log.warning('Error #{} when sending message: {}'.format(j['error'], j['errorDescription'])) log.warning('Error #{} when sending message: {}'.format(j['error'], j['errorDescription']))
@@ -549,68 +538,140 @@ class Client(object):
log.debug("With data {}".format(data)) log.debug("With data {}".format(data))
return message_ids return message_ids
def sendMessage(self, message: str, thread_id: str = None, thread_type: ThreadType = None):
def sendRemoteImage(self, recipient_id=None, message=None, is_user=True, image=''):
"""Send an image from a URL
:param recipient_id: the user id or thread id that you want to send a message to
:param message: a text that you want to send
:param is_user: determines if the recipient_id is for user or thread
:param image: URL for an image to download and send
""" """
mimetype = guess_type(image)[0] Sends a message to given (or default, if not) thread with an additional image.
remote_image = requests.get(image).content :param message: message to send
image_id = self.uploadImage({'file': (image, remote_image, mimetype)}) :param thread_id: user/group chat ID
return self.send(recipient_id, message, is_user, None, image_id) :param thread_type: specify whether thread_id is user or group chat
:return: a list of message ids of the sent message(s)
def sendLocalImage(self, recipient_id=None, message=None, is_user=True, image=''):
"""Send an image from a file path
:param recipient_id: the user id or thread id that you want to send a message to
:param message: a text that you want to send
:param is_user: determines if the recipient_id is for user or thread
:param image: path to a local image to send
""" """
mimetype = guess_type(image)[0] return self._send(thread_id, message, thread_type, None, None, None, None)
image_id = self.uploadImage({'file': (image, open(image, 'rb'), mimetype)})
return self.send(recipient_id, message, is_user, None, image_id)
def sendEmoji(self, emoji_size: EmojiSize, thread_id: str = None, thread_type: ThreadType = None):
"""
Sends an emoji to given (or default, if not) thread.
:param emoji_size: size of emoji to send
:param thread_id: user/group chat ID
:param thread_type: specify whether thread_id is user or group chat
:return: a list of message ids of the sent message(s)
"""
return self._send(thread_id, None, thread_type, emoji_size, None, None, None)
def uploadImage(self, image): def sendRemoteImage(self, image_url: str, message: str = None, thread_id: str = None, thread_type: ThreadType = None):
"""
Sends an image from given URL to given (or default, if not) thread.
:param image_url: URL of an image to upload and send
:param message: additional message
:param thread_id: user/group chat ID
:param thread_type: specify whether thread_id is user or group chat
:return: a list of message ids of the sent message(s)
"""
mimetype = guess_type(image_url)[0]
remote_image = requests.get(image_url).content
image_id = self._uploadImage({'file': (image_url, remote_image, mimetype)})
return self._send(thread_id, message, thread_type, None, image_id, None, None)
# Doesn't upload properly
def sendLocalImage(self, image_path: str, message: str = None, thread_id: str = None, thread_type: ThreadType = None):
"""
Sends an image from given URL to given (or default, if not) thread.
:param image_path: path of an image to upload and send
:param message: additional message
:param thread_id: user/group chat ID
:param thread_type: specify whether thread_id is user or group chat
:return: a list of message ids of the sent message(s)
"""
mimetype = guess_type(image_path)[0]
image_id = self._uploadImage({'file': (image_path, open(image_path, 'rb'), mimetype)})
return self._send(thread_id, message, thread_type, None, image_id, None, None)
def addUsersToChat(self, user_list: list, thread_id: str = None):
"""
Adds users to given (or default, if not) thread.
:param user_list: list of users to add
:param thread_id: group chat ID
:return: a list of message ids of the sent message(s)
"""
return self._send(thread_id, None, ThreadType.GROUP, None, None, user_list, None)
def removeUserFromChat(self, user_id: str, thread_id: str = None):
"""
Adds users to given (or default, if not) thread.
:param user_id: user ID to remove
:param thread_id: group chat ID
:return: true if user was removed
"""
if thread_id is None and self.def_thread_type == ThreadType.GROUP:
thread_id = self.def_thread_id
elif thread_id is None:
raise ValueError('Default Thread ID is not set.')
data = {
"uid": user_id,
"tid": thread_id
}
r = self._post(RemoveUserURL, data)
return r.ok
def changeThreadTitle(self, new_title: str, thread_id: str = None):
"""
Change title of a group conversation.
:param new_title: new group chat title
:param thread_id: group chat ID
:return: a list of message ids of the sent message(s)
"""
if thread_id is None and self.def_thread_type == ThreadType.GROUP:
thread_id = self.def_thread_id
elif thread_id is None:
raise ValueError('Default Thread ID is not set.')
return self._send(thread_id, None, ThreadType.GROUP, None, None, None, new_title)
"""
END SEND METHODS
"""
def _uploadImage(self, image):
"""Upload an image and get the image_id for sending in a message """Upload an image and get the image_id for sending in a message
:param image: a tuple of (file name, data, mime type) to upload to facebook :param image: a tuple of (file name, data, mime type) to upload to facebook
""" """
r = self._postFile(UploadURL, image) r = self._postFile(UploadURL, image)
if isinstance(r._content, str) is False: response_content = {}
r._content = r._content.decode(facebookEncoding) if isinstance(r.content, str) is False:
response_content = r.content.decode(facebookEncoding)
# Strip the start and parse out the returned image_id # Strip the start and parse out the returned image_id
return json.loads(r._content[9:])['payload']['metadata'][0]['image_id'] return json.loads(response_content[9:])['payload']['metadata'][0]['image_id']
def getThreadInfo(self, last_n=20, thread_id: str = None, thread_type: ThreadType = None):
def getThreadInfo(self, userID, last_n=20, start=None, is_user=True):
"""Get the info of one Thread """Get the info of one Thread
:param userID: ID of the user you want the messages from :param last_n: number of retrieved messages from start (default 20)
:param last_n: (optional) number of retrieved messages from start :param thread_id: user/group chat ID
:param start: (optional) the start index of a thread (Deprecated) :param thread_type: specify whether thread_id is user or group chat
:param is_user: (optional) determines if the userID is for user or thread :return: a list of messages
""" """
if thread_id is None and self.is_def_thread_set:
thread_id = self.def_thread_id
thread_type = self.def_thread_type
elif thread_id is None and not self.is_def_thread_set:
raise ValueError('Default Thread ID is not set.')
assert last_n > 0, 'length must be positive integer, got %d' % last_n assert last_n > 0, 'length must be positive integer, got %d' % last_n
assert start is None, '`start` is deprecated, always 0 offset querry is returned'
if is_user: if thread_type == ThreadType.USER:
key = 'user_ids' key = 'user_ids'
else: elif thread_type == ThreadType.GROUP:
key = 'thread_fbids' key = 'thread_fbids'
# deprecated data = {'messages[{}][{}][offset]'.format(key, thread_id): 0,
# `start` doesn't matter, always returns from the last 'messages[{}][{}][limit]'.format(key, thread_id): last_n - 1,
# data['messages[{}][{}][offset]'.format(key, userID)] = start 'messages[{}][{}][timestamp]'.format(key, thread_id): now()}
data = {'messages[{}][{}][offset]'.format(key, userID): 0,
'messages[{}][{}][limit]'.format(key, userID): last_n - 1,
'messages[{}][{}][timestamp]'.format(key, userID): now()}
r = self._post(MessagesURL, query=data) r = self._post(MessagesURL, query=data)
if not r.ok or len(r.text) == 0: if not r.ok or len(r.text) == 0:
@@ -621,7 +682,7 @@ class Client(object):
return None return None
messages = [] messages = []
for message in j['payload']['actions']: for message in j['payload'].get('actions'):
messages.append(Message(**message)) messages.append(Message(**message))
return list(reversed(messages)) return list(reversed(messages))
@@ -908,132 +969,3 @@ class Client(object):
if len(full_data)==1: if len(full_data)==1:
full_data=full_data[0] full_data=full_data[0]
return full_data return full_data
def remove_user_from_chat(self, threadID, userID):
"""Remove user (userID) from group chat (threadID)
:param threadID: group chat id
:param userID: user id to remove from chat
"""
data = {
"uid" : userID,
"tid" : threadID
}
r = self._post(RemoveUserURL, data)
return r.ok
def add_users_to_chat(self, threadID, userID):
"""Add user (userID) to group chat (threadID)
:param threadID: group chat id
:param userID: user id to add to chat
"""
return self.send(threadID, is_user=False, add_user_ids=[userID])
def changeThreadTitle(self, threadID, newTitle):
"""Change title of a group conversation
:param threadID: group chat id
:param newTitle: new group chat title
"""
messageAndOTID = generateOfflineThreadingID()
timestamp = now()
date = datetime.now()
data = {
'client' : self.client,
'action_type' : 'ma-type:log-message',
'author' : 'fbid:' + str(self.uid),
'thread_id' : '',
'author_email' : '',
'coordinates' : '',
'timestamp' : timestamp,
'timestamp_absolute' : 'Today',
'timestamp_relative' : str(date.hour) + ":" + str(date.minute).zfill(2),
'timestamp_time_passed' : '0',
'is_unread' : False,
'is_cleared' : False,
'is_forward' : False,
'is_filtered_content' : False,
'is_spoof_warning' : False,
'source' : 'source:chat:web',
'source_tags[0]' : 'source:chat',
'status' : '0',
'offline_threading_id' : messageAndOTID,
'message_id' : messageAndOTID,
'threading_id': generateMessageID(self.client_id),
'manual_retry_cnt' : '0',
'thread_fbid' : threadID,
'log_message_data[name]' : newTitle,
'log_message_type' : 'log:thread-name'
}
r = self._post(SendURL, data)
return r.ok
def on_message_new(self, mid, author_id, message, metadata, recipient_id, thread_type):
"""subclass Client and override this method to add custom behavior on event
This version of on_message recieves recipient_id and thread_type.
For backwards compatability, this data is sent directly to the old on_message.
"""
self.on_message(mid, author_id, None, message, metadata)
def on_message(self, mid, author_id, author_name, message, metadata):
"""subclass Client and override this method to add custom behavior on event"""
self.markAsDelivered(author_id, mid)
self.markAsRead(author_id)
log.info("%s said: %s" % (author_name, message))
def on_friend_request(self, from_id):
"""subclass Client and override this method to add custom behavior on event"""
log.info("Friend request from %s." % from_id)
def on_typing(self, author_id):
"""subclass Client and override this method to add custom behavior on event"""
pass
def on_read(self, author, reader, time):
"""subclass Client and override this method to add custom behavior on event"""
pass
def on_people_added(self, user_ids, actor_id, thread_id):
"""subclass Client and override this method to add custom behavior on event"""
log.info("User(s) {} was added to {} by {}".format(repr(user_ids), thread_id, actor_id))
def on_person_removed(self, user_id, actor_id, thread_id):
"""subclass Client and override this method to add custom behavior on event"""
log.info("User {} was removed from {} by {}".format(user_id, thread_id, actor_id))
def on_inbox(self, viewer, unseen, unread, other_unseen, other_unread, timestamp):
"""subclass Client and override this method to add custom behavior on event"""
pass
def on_message_error(self, exception, message):
"""subclass Client and override this method to add custom behavior on event"""
log.warning("Exception:\n{}".format(exception))
def on_qprimer(self, timestamp):
pass
def on_unknown_type(self, m):
"""subclass Client and override this method to add custom behavior on event"""
log.debug("Unknown type {}".format(m))

View File

@@ -19,7 +19,6 @@ class User(Base):
self.url = data['path'] self.url = data['path']
self.name = data['text'] self.name = data['text']
self.score = data['score'] self.score = data['score']
self.data = data self.data = data
class Thread(): class Thread():
@@ -29,3 +28,16 @@ class Thread():
class Message(): class Message():
def __init__(self, **entries): def __init__(self, **entries):
self.__dict__.update(entries) self.__dict__.update(entries)
class ThreadType(Enum):
USER = 1
GROUP = 2
class TypingStatus(Enum):
DELETED = 0
TYPING = 1
class EmojiSize(Enum):
LARGE = '369239383222810'
MEDIUM = '369239343222814'
SMALL = '369239263222822'

6
test_data.js Normal file
View File

@@ -0,0 +1,6 @@
{
"email": "",
"password": "",
"user_thread_id": "",
"group_thread_id": ""
}

145
tests.py
View File

@@ -1,7 +1,10 @@
#!/usr/bin/env python #!/usr/bin/env python
import time
import json
import logging import logging
import fbchat import fbchat
from fbchat.models import *
import getpass import getpass
import unittest import unittest
import sys import sys
@@ -20,8 +23,8 @@ To use these tests, put:
- email - email
- password - password
- a group_uid - a group_uid
- a user_uid (the user will be kicked from the group and then added again) - a user_uid (the user will be kicked from the group and then added again) `test_data.js`,
(seperated these by a newline) in a file called `tests.data`, or type them manually in the terminal prompts or type them manually in the terminal prompts
Please remember to test both python v. 2.7 and python v. 3.6! Please remember to test both python v. 2.7 and python v. 3.6!
@@ -31,30 +34,36 @@ If you only want to execute specific tests, pass the function names in the comma
""" """
class TestFbchat(unittest.TestCase): class TestFbchat(unittest.TestCase):
def test_login_functions(self): def setUp(self):
self.assertTrue(client.is_logged_in()) pass
def tearDown(self):
time.sleep(3)
def test_loginFunctions(self):
self.assertTrue(client.isLoggedIn())
client.logout() client.logout()
self.assertFalse(client.is_logged_in()) self.assertFalse(client.isLoggedIn())
with self.assertRaises(Exception): with self.assertRaises(Exception):
client.login("not@email.com", "not_password", max_retries=1) client.login("not@email.com", "not_password", max_retries=1)
client.login(email, password) client.login(email, password)
self.assertTrue(client.is_logged_in()) self.assertTrue(client.isLoggedIn())
def test_sessions(self): def test_sessions(self):
global client global client
session_cookies = client.getSession() session_cookies = client.getSession()
client = fbchat.Client(email, password, session_cookies=session_cookies) client = fbchat.Client(email, password, session_cookies=session_cookies)
self.assertTrue(client.is_logged_in())
def test_setDefaultRecipient(self): self.assertTrue(client.isLoggedIn())
client.setDefaultRecipient(client.uid, is_user=True)
self.assertTrue(client.send(message="test_default_recipient")) def test_setDefaultThreadId(self):
client.setDefaultThreadId(client.uid, ThreadType.USER)
self.assertTrue(client.sendMessage("test_default_recipient"))
def test_getAllUsers(self): def test_getAllUsers(self):
users = client.getAllUsers() users = client.getAllUsers()
@@ -63,9 +72,9 @@ class TestFbchat(unittest.TestCase):
def test_getUsers(self): def test_getUsers(self):
users = client.getUsers("Mark Zuckerberg") users = client.getUsers("Mark Zuckerberg")
self.assertGreater(len(users), 0) self.assertGreater(len(users), 0)
u = users[0] u = users[0]
# Test if values are set correctly # Test if values are set correctly
self.assertIsInstance(u.uid, int) self.assertIsInstance(u.uid, int)
self.assertEquals(u.type, 'user') self.assertEquals(u.type, 'user')
@@ -73,73 +82,80 @@ class TestFbchat(unittest.TestCase):
self.assertEquals(u.url[:4], 'http') self.assertEquals(u.url[:4], 'http')
self.assertEquals(u.name, 'Mark Zuckerberg') self.assertEquals(u.name, 'Mark Zuckerberg')
self.assertGreater(u.score, 0) self.assertGreater(u.score, 0)
def test_send_likes(self): def test_sendEmoji(self):
self.assertTrue(client.send(client.uid, like='s')) self.assertTrue(client.sendEmoji(EmojiSize.SMALL, user_uid, ThreadType.USER))
self.assertTrue(client.send(client.uid, like='m')) self.assertTrue(client.sendEmoji(EmojiSize.MEDIUM, user_uid, ThreadType.USER))
self.assertTrue(client.send(client.uid, like='l')) self.assertTrue(client.sendEmoji(EmojiSize.LARGE, user_uid, ThreadType.USER))
self.assertTrue(client.send(group_uid, like='s', is_user=False)) self.assertTrue(client.sendEmoji(EmojiSize.SMALL, group_uid, ThreadType.GROUP))
self.assertTrue(client.send(group_uid, like='m', is_user=False)) self.assertTrue(client.sendEmoji(EmojiSize.MEDIUM, group_uid, ThreadType.GROUP))
self.assertTrue(client.send(group_uid, like='l', is_user=False)) self.assertTrue(client.sendEmoji(EmojiSize.LARGE, group_uid, ThreadType.GROUP))
def test_send(self): def test_sendMessage(self):
self.assertTrue(client.send(client.uid, message='test_send_user')) self.assertTrue(client.sendMessage('test_send_user', user_uid, ThreadType.USER))
self.assertTrue(client.send(group_uid, message='test_send_group', is_user=False)) self.assertTrue(client.sendMessage('test_send_group', group_uid, ThreadType.GROUP))
def test_send_images(self): def test_sendImages(self):
image_url = 'https://cdn4.iconfinder.com/data/icons/ionicons/512/icon-image-128.png' image_url = 'https://cdn4.iconfinder.com/data/icons/ionicons/512/icon-image-128.png'
image_local_url = path.join(path.dirname(__file__), 'test_image.png') image_local_url = path.join(path.dirname(__file__), 'test_image.png')
self.assertTrue(client.sendRemoteImage(client.uid, message='test_send_user_images_remote', image=image_url)) self.assertTrue(client.sendRemoteImage(image_url, 'test_send_user_images_remote', user_uid, ThreadType.USER))
self.assertTrue(client.sendLocalImage(client.uid, message='test_send_user_images_local', image=image_local_url)) self.assertTrue(client.sendRemoteImage(image_url, 'test_send_group_images_remote', group_uid, ThreadType.GROUP))
self.assertTrue(client.sendRemoteImage(group_uid, message='test_send_group_images_remote', is_user=False, image=image_url)) # Idk why but doesnt work, payload is null
self.assertTrue(client.sendLocalImage(group_uid, message='test_send_group_images_local', is_user=False, image=image_local_url)) # self.assertTrue(client.sendLocalImage(image_local_url, 'test_send_group_images_local', user_uid, ThreadType.USER))
# self.assertTrue(client.sendLocalImage(image_local_url, 'test_send_group_images_local', group_uid, ThreadType.GROUP))
def test_getThreadInfo(self): def test_getThreadInfo(self):
info = client.getThreadInfo(client.uid, last_n=1) client.sendMessage('test_user_getThreadInfo', user_uid, ThreadType.USER)
self.assertEquals(info[0].author, 'fbid:' + str(client.uid)) time.sleep(3)
client.send(group_uid, message='test_getThreadInfo', is_user=False) info = client.getThreadInfo(20, user_uid, ThreadType.USER)
info = client.getThreadInfo(group_uid, last_n=1, is_user=False) self.assertEquals(info[0].author, 'fbid:' + client.uid)
self.assertEquals(info[0].author, 'fbid:' + str(client.uid)) self.assertEquals(info[0].body, 'test_user_getThreadInfo')
self.assertEquals(info[0].body, 'test_getThreadInfo')
def test_markAs(self): client.sendMessage('test_group_getThreadInfo', group_uid, ThreadType.GROUP)
# To be implemented (requires some form of manual watching) time.sleep(3)
pass info = client.getThreadInfo(20, group_uid, ThreadType.GROUP)
self.assertEquals(info[0].author, 'fbid:' + client.uid)
self.assertEquals(info[0].body, 'test_group_getThreadInfo')
def test_listen(self): # def test_markAs(self):
client.do_one_listen() # # To be implemented (requires some form of manual watching)
# pass
# def test_listen(self):
# client.doOneListen()
def test_getUserInfo(self): def test_getUserInfo(self):
info = client.getUserInfo(4) info = client.getUserInfo(4)
self.assertEquals(info['name'], 'Mark Zuckerberg') self.assertEquals(info['name'], 'Mark Zuckerberg')
def test_remove_add_from_chat(self): def test_removeAddFromChat(self):
self.assertTrue(client.remove_user_from_chat(group_uid, user_uid)) self.assertTrue(client.removeUserFromChat(user_uid, group_uid))
self.assertTrue(client.add_users_to_chat(group_uid, user_uid)) self.assertTrue(client.addUsersToChat([user_uid], group_uid))
def test_changeThreadTitle(self): def test_changeThreadTitle(self):
self.assertTrue(client.changeThreadTitle(group_uid, 'test_changeThreadTitle')) self.assertTrue(client.changeThreadTitle('test_changeThreadTitle', group_uid))
def start_test(param_client, param_group_uid, param_user_uid, tests=[]): def start_test(param_client, param_group_uid, param_user_uid, tests=[]):
global client global client
global group_uid global group_uid
global user_uid global user_uid
client = param_client client = param_client
group_uid = param_group_uid group_uid = param_group_uid
user_uid = param_user_uid user_uid = param_user_uid
if len(tests) == 0: if len(tests) == 0:
suite = unittest.TestLoader().loadTestsFromTestCase(TestFbchat) suite = unittest.TestLoader().loadTestsFromTestCase(TestFbchat)
else: else:
suite = unittest.TestSuite(map(TestFbchat, tests)) suite = unittest.TestSuite(map(TestFbchat, tests))
print ('Starting test(s)') print('Starting test(s)')
unittest.TextTestRunner(verbosity=2).run(suite) unittest.TextTestRunner(verbosity=2).run(suite)
client = None
if __name__ == '__main__': if __name__ == 'tests':
# Python 3 does not use raw_input, whereas Python 2 does # Python 3 does not use raw_input, whereas Python 2 does
try: try:
input = raw_input input = raw_input
@@ -147,20 +163,19 @@ if __name__ == '__main__':
pass pass
try: try:
with open(path.join(path.dirname(__file__), 'tests.data'), 'r') as f: with open(path.join(path.dirname(__file__), 'test_data.js'), 'r') as f:
content = f.readlines() json = json.load(f)
content = [x.strip() for x in content if len(x.strip()) != 0] email = json["email"]
email = content[0] password = json["password"]
password = content[1] user_uid = json["user_thread_id"]
group_uid = content[2] group_uid = json["group_thread_id"]
user_uid = content[3]
except (IOError, IndexError) as e: except (IOError, IndexError) as e:
email = input('Email: ') email = input('Email: ')
password = getpass.getpass() password = getpass.getpass()
group_uid = input('Please enter a group uid (To test group functionality): ') group_uid = input('Please enter a group uid (To test group functionality): ')
user_uid = input('Please enter a user uid (To test kicking/adding functionality): ') user_uid = input('Please enter a user uid (To test kicking/adding functionality): ')
print ('Logging in') print('Logging in...')
client = fbchat.Client(email, password) client = fbchat.Client(email, password)
# Warning! Taking user input directly like this could be dangerous! Use only for testing purposes! # Warning! Taking user input directly like this could be dangerous! Use only for testing purposes!