Added python 2.7 support, reworked events

- Reworked events, so now they support python 2.7 (I had to remove some
functionality though, but that was a little unnecessary anyway)
- Events now support the old style of writing, for people who's more
comfortable with that: ```python
class EchoBot(fbchat.Client):
    def onMessage(self, *args, **kwargs):
        self.something(*args, **kwargs)
```
While still supporting the new method:
```python
class EchoBot(fbchat.Client):
    def __init__(self, *args, **kwargs):
         super(EchoBot, self).__init__(*args, **kwargs)
         self.onMessage += lamda *args, **kwargs: self.something(*args,
**kwargs)
```
- Included `msg` as a parameter in every event function, since it's
useful if you want to extract some of the other data
- Moved test data to the folder `tests`
- Fixed various other functions, and improved stability
This commit is contained in:
Mads Marquart
2017-05-22 20:33:00 +02:00
parent 83a45ebc03
commit a76ebbb22a
8 changed files with 169 additions and 157 deletions

1
.gitignore vendored
View File

@@ -26,4 +26,5 @@ docs/_build/
# Data for tests
my_test_data.json
my_data.json
tests.data

View File

@@ -11,9 +11,10 @@
:license: BSD, see LICENSE for more details.
"""
from urllib import parse
from __future__ import unicode_literals
import requests
import logging
import urllib
from uuid import uuid1
from random import choice
from datetime import datetime
@@ -49,7 +50,7 @@ class Client(object):
"""
def __init__(self, email, password, debug=False, info_log=False, user_agent=None, max_retries=5,
session_cookies=None, logging_level=logging.INFO):
session_cookies=None, logging_level=logging.INFO, set_default_events=True):
"""A client for the Facebook Chat (Messenger).
:param email: Facebook `email` or `id` or `phone number`
@@ -60,6 +61,7 @@ class Client(object):
:param max_retries: Maximum number of times to retry login
:param session_cookies: Cookie dict from a previous session (Will default to login if these are invalid)
:param logging_level: Configures the logger to logging_level
:param set_default_events: Specifies whether the default logging.info events should be initialized
"""
self.sticky, self.pool = (None, None)
@@ -72,6 +74,7 @@ class Client(object):
self.default_thread_id = None
self.default_thread_type = None
self.threads = []
self.set_default_events = set_default_events
self._setupEventHooks()
self._setupOldEventHooks()
@@ -101,105 +104,106 @@ class Client(object):
if not session_cookies or not self.setSession(session_cookies) or not self.isLoggedIn():
self.login(email, password, max_retries)
def _setEventHook(self, event_name, *functions):
if not hasattr(type(self), event_name):
if self.set_default_events:
eventhook = EventHook(*functions)
else:
eventhook = EventHook()
setattr(self, event_name, eventhook)
def _setupEventHooks(self):
# Setup event hooks
self.onLoggingIn = EventHook(email=str)
self.onLoggedIn = EventHook(email=str)
self.onListening = EventHook()
self._setEventHook('onLoggingIn', lambda email: log.info("Logging in {}...".format(email)))
self.onMessage = EventHook(mid=str, author_id=str, message=str, thread_id=int, thread_type=ThreadType, ts=str, metadata=dict)
self.onColorChange = EventHook(mid=str, author_id=str, new_color=str, thread_id=str, thread_type=ThreadType, ts=str, metadata=dict)
self.onEmojiChange = EventHook(mid=str, author_id=str, new_emoji=str, thread_id=str, thread_type=ThreadType, ts=str, metadata=dict)
self.onTitleChange = EventHook(mid=str, author_id=str, new_title=str, thread_id=str, thread_type=ThreadType, ts=str, metadata=dict)
self.onNicknameChange = EventHook(mid=str, author_id=str, changed_for=str, new_title=str, thread_id=str, thread_type=ThreadType, ts=str, metadata=dict)
self._setEventHook('onLoggedIn', lambda email: log.info("Login of {} successful.".format(email)))
self._setEventHook('onListening', lambda: log.info("Listening..."))
self._setEventHook('onListenError', lambda exception: raise_exception(exception))
self.onMessageSeen = EventHook(seen_by=str, thread_id=str, thread_type=ThreadType, seen_ts=int, delivered_ts=int, metadata=dict)
self.onMessageDelivered = EventHook(msg_ids=list, delivered_for=str, thread_id=str, thread_type=ThreadType, ts=int, metadata=dict)
self.onMarkedSeen = EventHook(threads=list, seen_ts=int, delivered_ts=int, metadata=dict)
self._setEventHook('onMessage', lambda mid, author_id, message, thread_id, thread_type, ts, metadata, msg:\
log.info("Message from {} in {} ({}): {}".format(author_id, thread_id, thread_type.name, message)))
self.onInbox = EventHook(unseen=int, unread=int, recent_unread=int)
self.onPeopleAdded = EventHook(added_ids=list, author_id=str, thread_id=str)
self.onPersonRemoved = EventHook(removed_id=str, author_id=str, thread_id=str)
self.onFriendRequest = EventHook(from_id=str)
self._setEventHook('onColorChange', lambda mid, author_id, new_color, thread_id, thread_type, ts, metadata, msg:\
log.info("Color change from {} in {} ({}): {}".format(author_id, thread_id, thread_type.name, new_color)))
self.onUnknownMesssageType = EventHook(msg=dict)
self.onMessageError = EventHook(exception=Exception, msg=dict)
self._setEventHook('onEmojiChange', lambda mid, author_id, new_emoji, thread_id, thread_type, ts, metadata, msg:\
log.info("Emoji change from {} in {} ({}): {}".format(author_id, thread_id, thread_type.name, new_emoji)))
# Setup event handlers
self.onLoggingIn += lambda email: log.info("Logging in %s..." % email)
self.onLoggedIn += lambda email: log.info("Login of %s successful." % email)
self.onListening += lambda: log.info("Listening...")
self._setEventHook('onTitleChange', lambda mid, author_id, new_title, thread_id, thread_type, ts, metadata, msg:\
log.info("Title change from {} in {} ({}): {}".format(author_id, thread_id, thread_type.name, new_title)))
self.onMessage += lambda mid, author_id, message, thread_id, thread_type, ts, metadata:\
log.info("Message from %s in %s (%s): %s" % (author_id, thread_id, thread_type.name, message))
self.onColorChange += lambda mid, author_id, new_color, thread_id, thread_type, ts, metadata:\
log.info("Color change from %s in %s (%s): %s" % (author_id, thread_id, thread_type.name, new_color))
self.onEmojiChange += lambda mid, author_id, new_emoji, thread_id, thread_type, ts, metadata:\
log.info("Emoji change from %s in %s (%s): %s" % (author_id, thread_id, thread_type.name, new_emoji))
self.onTitleChange += lambda mid, author_id, new_title, thread_id, thread_type, ts, metadata:\
log.info("Title change from %s in %s (%s): %s" % (author_id, thread_id, thread_type.name, new_title))
self.onNicknameChange += lambda mid, author_id, new_title, changed_for, thread_id, thread_type, ts, metadata:\
log.info("Nickname change from %s in %s (%s) for %s: %s" % (author_id, thread_id, thread_type.name, changed_for, new_title))
self.onPeopleAdded += lambda added_ids, author_id, thread_id:\
log.info("%s added: %s" % (author_id, [x for x in added_ids]))
self.onPersonRemoved += lambda removed_id, author_id, thread_id:\
log.info("%s removed: %s" % (author_id, removed_id))
self._setEventHook('onNicknameChange', lambda mid, author_id, new_title, changed_for, thread_id, thread_type, ts, metadata, msg:\
log.info("Nickname change from {} in {} ({}) for {}: {}".format(author_id, thread_id, thread_type.name, changed_for, new_title)))
self.onMessageSeen += lambda seen_by, thread_id, thread_type, seen_ts, delivered_ts, metadata:\
log.info("Messages seen by %s in %s (%s) at %ss", seen_by, thread_id, thread_type.name, seen_ts/1000)
self.onMessageDelivered += lambda msg_ids, delivered_for, thread_id, thread_type, ts, metadata:\
log.info("Messages %s delivered to %s in %s (%s) at %ss", msg_ids, delivered_for, thread_id, thread_type.name, ts/1000)
self.onMarkedSeen += lambda threads, seen_ts, delivered_ts, metadata:\
log.info("Marked messages as seen in threads %s at %ss", [(x[0], x[1].name) for x in threads], seen_ts/1000)
self._setEventHook('onMessageSeen', lambda seen_by, thread_id, thread_type, seen_ts, delivered_ts, metadata, msg:\
log.info("Messages seen by {} in {} ({}) at {}s".format(seen_by, thread_id, thread_type.name, seen_ts/1000)))
self.onUnknownMesssageType += lambda msg: log.info("Unknown message type received: %s" % msg)
self.onMessageError += lambda exception, msg: log.exception(exception)
self._setEventHook('onMessageDelivered', lambda msg_ids, delivered_for, thread_id, thread_type, ts, metadata, msg:\
log.info("Messages {} delivered to {} in {} ({}) at {}s".format(msg_ids, delivered_for, thread_id, thread_type.name, ts/1000)))
def _checkOldEventHook(self, old_event, deprecated_in='0.10.3', removed_in='0.15.0'):
self._setEventHook('onMarkedSeen', lambda threads, seen_ts, delivered_ts, metadata, msg:\
log.info("Marked messages as seen in threads {} at {}s".format([(x[0], x[1].name) for x in threads], seen_ts/1000)))
self._setEventHook('onPeopleAdded', lambda added_ids, author_id, thread_id, msg:\
log.info("{} added: {}".format(author_id, ', '.join(added_ids))))
self._setEventHook('onPersonRemoved', lambda removed_id, author_id, thread_id, msg:\
log.info("{} removed: {}".format(author_id, removed_id)))
self._setEventHook('onFriendRequest', lambda from_id, msg: log.info("Friend request from {}".format(from_id)))
self._setEventHook('onInbox', lambda unseen, unread, recent_unread, msg: log.info('Inbox event: {}, {}, {}'.format(unseen, unread, recent_unread)))
self._setEventHook('onUnknownMesssageType', lambda msg: log.debug('Unknown message received: {}'.format(msg)))
self._setEventHook('onMessageError', lambda exception, msg: log.exception('Exception in parsing of {}'.format(msg)))
def _checkOldEventHook(self, old_event, new_event, deprecated_in='0.10.3', removed_in='0.15.0'):
if hasattr(type(self), old_event):
deprecation('Client.{}'.format(old_event), deprecated_in=deprecated_in, removed_in=removed_in, details='Use new event system instead')
return True
else:
return False
deprecation('Client.{}'.format(old_event), deprecated_in=deprecated_in, removed_in=removed_in, details='Use new event system instead (specifically Client.{})'.format(new_event))
if not hasattr(type(self), new_event):
return True
return False
def _setupOldEventHooks(self):
if self._checkOldEventHook('on_message', deprecated_in='0.7.0', removed_in='0.12.0'):
self.onMessage += lambda mid, author_id, message, thread_id, thread_type, ts, metadata:\
self.on_message(mid, author_id, None, message, metadata)
if self._checkOldEventHook('on_message', 'onMessage', deprecated_in='0.7.0', removed_in='0.12.0'):
self.onMessage += lambda mid, author_id, message, thread_id, thread_type, ts, metadata, msg:\
self.on_message(mid, author_id, None, message, metadata)
if self._checkOldEventHook('on_message_new'):
self.onMessage += lambda mid, author_id, message, thread_id, thread_type, ts, metadata:\
self.on_message_new(mid, author_id, message, metadata, thread_id, True if thread_type is ThreadType.USER else False)
if self._checkOldEventHook('on_message_new', 'onMessage'):
self.onMessage += lambda mid, author_id, message, thread_id, thread_type, ts, metadata, msg:\
self.on_message_new(mid, author_id, message, metadata, thread_id, True if thread_type is ThreadType.USER else False)
if self._checkOldEventHook('on_friend_request'):
self.onFriendRequest += lambda from_id: self.on_friend_request(from_id)
if self._checkOldEventHook('on_friend_request', 'onFriendRequest'):
self.onFriendRequest += lambda from_id, msg: self.on_friend_request(from_id)
if self._checkOldEventHook('on_typing'):
self.onTyping += lambda author_id, typing_status: self.on_typing(author_id)
if self._checkOldEventHook('on_typing', 'onTyping'):
self.onTyping += lambda author_id, typing_status, msg: self.on_typing(author_id)
if self._checkOldEventHook('on_read'):
self.onSeen += lambda seen_by, thread_id, timestamp: self.on_read(seen_by, thread_id, timestamp)
if self._checkOldEventHook('on_read', 'onSeen'):
self.onSeen += lambda seen_by, thread_id, timestamp, msg: self.on_read(seen_by, thread_id, timestamp)
if self._checkOldEventHook('on_people_added'):
self.onPeopleAdded += lambda added_ids, author_id, thread_id: self.on_people_added(added_ids, author_id, thread_id)
if self._checkOldEventHook('on_people_added', 'onPeopleAdded'):
self.onPeopleAdded += lambda added_ids, author_id, thread_id, msg: self.on_people_added(added_ids, author_id, thread_id)
if self._checkOldEventHook('on_person_removed'):
self.onPersonRemoved += lambda removed_id, author_id, thread_id: self.on_person_removed(removed_id, author_id, thread_id)
if self._checkOldEventHook('on_person_removed', 'onPersonRemoved'):
self.onPersonRemoved += lambda removed_id, author_id, thread_id, msg: self.on_person_removed(removed_id, author_id, thread_id)
if self._checkOldEventHook('on_inbox'):
self.onInbox += lambda unseen, unread, recent_unread: self.on_inbox(None, unseen, unread, None, recent_unread, None)
if self._checkOldEventHook('on_inbox', 'onInbox'):
self.onInbox += lambda unseen, unread, recent_unread, msg: self.on_inbox(None, unseen, unread, None, recent_unread, None)
if self._checkOldEventHook('on_qprimer'):
if self._checkOldEventHook('on_qprimer', ''):
pass
if self._checkOldEventHook('on_message_error'):
if self._checkOldEventHook('on_message_error', 'onMessageError'):
self.onMessageError += lambda exception, msg: self.on_message_error(exception, msg)
if self._checkOldEventHook('on_unknown_type'):
if self._checkOldEventHook('on_unknown_type', 'onUnknownMesssageType'):
self.onUnknownMesssageType += lambda msg: self.on_unknown_type(msg)
@deprecated(deprecated_in='0.6.0', removed_in='0.11.0', details='Use log.<level> instead')
@@ -836,13 +840,20 @@ class Client(object):
"action": "ADD_REACTION",
"client_mutation_id": "1",
"actor_id": self.uid,
"message_id": message_id,
"message_id": str(message_id),
"reaction": reaction.value
}
}
}
}
try:
url_part = urllib.parse.urlencode(full_data)
except AttributeError:
# This is a very hacky solution, please suggest a better one ;)
url_part = urllib.urlencode(full_data)\
.replace('u%27', '%27')\
.replace('%5CU{}'.format(MessageReactionFix[reaction.value][0]), MessageReactionFix[reaction.value][1])
j = self._checkRequest(self._post(ReqUrl.MESSAGE_REACTION + "/?" + parse.urlencode(full_data)))
j = self._checkRequest(self._post('{}/?{}'.format(ReqUrl.MESSAGE_REACTION, url_part)))
return False if j is None else True
@@ -851,8 +862,9 @@ class Client(object):
"""
Sets users typing status.
:param status: typing or not typing
:param status: specify whether the status is typing or not (TypingStatus)
:param thread_id: user/group chat ID
:param thread_type: specify whether thread_id is user or group chat
:return: True if status changed
"""
thread_id, thread_type = self._getThread(thread_id, None)
@@ -953,8 +965,8 @@ class Client(object):
try:
for participant in j['payload']['participants']:
participants[participant["fbid"]] = participant["name"]
except Exception as e:
log.exception(e)
except Exception:
log.exception('Exception while getting names for people in getThreadList. {}'.format(j))
# Prevent duplicates in self.threads
threadIDs = [getattr(x, "thread_id") for x in self.threads]
@@ -1113,7 +1125,7 @@ class Client(object):
added_ids = [str(x['userFbId']) for x in delta['addedParticipants']]
thread_id = str(metadata['threadKey']['threadFbId'])
self.onPeopleAdded(mid=mid, added_ids=added_ids, author_id=author_id, thread_id=thread_id,
ts=ts)
ts=ts, msg=m)
continue
# Left/removed participants
@@ -1121,7 +1133,7 @@ class Client(object):
removed_id = str(delta['leftParticipantFbId'])
thread_id = str(metadata['threadKey']['threadFbId'])
self.onPersonRemoved(mid=mid, removed_id=removed_id, author_id=author_id, thread_id=thread_id,
ts=ts)
ts=ts, msg=m)
continue
# Color change
@@ -1129,7 +1141,7 @@ class Client(object):
new_color = delta["untypedData"]["theme_color"]
thread_id, thread_type = getThreadIdAndThreadType(metadata)
self.onColorChange(mid=mid, author_id=author_id, new_color=new_color, thread_id=thread_id,
thread_type=thread_type, ts=ts, metadata=metadata)
thread_type=thread_type, ts=ts, metadata=metadata, msg=m)
continue
# Emoji change
@@ -1137,7 +1149,7 @@ class Client(object):
new_emoji = delta["untypedData"]["thread_icon"]
thread_id, thread_type = getThreadIdAndThreadType(metadata)
self.onEmojiChange(mid=mid, author_id=author_id, new_emoji=new_emoji, thread_id=thread_id,
thread_type=thread_type, ts=ts, metadata=metadata)
thread_type=thread_type, ts=ts, metadata=metadata, msg=m)
continue
# Thread title change
@@ -1145,7 +1157,7 @@ class Client(object):
new_title = delta["name"]
thread_id, thread_type = getThreadIdAndThreadType(metadata)
self.onTitleChange(mid=mid, author_id=author_id, new_title=new_title, thread_id=thread_id,
thread_type=thread_type, ts=ts, metadata=metadata)
thread_type=thread_type, ts=ts, metadata=metadata, msg=m)
continue
# Nickname change
@@ -1161,11 +1173,11 @@ class Client(object):
# Message delivered
elif delta.get("class") == "DeliveryReceipt":
message_ids = delta["messageIds"]
delivered_for = str(delta["actorFbId"])
delivered_for = str(delta.get("actorFbId") or delta["threadKey"]["otherUserFbId"])
ts = int(delta["deliveredWatermarkTimestampMs"])
thread_id, thread_type = getThreadIdAndThreadType(delta)
self.onMessageDelivered(msg_ids=message_ids, delivered_for=delivered_for,
thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata)
thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m)
continue
# Message seen
@@ -1175,7 +1187,7 @@ class Client(object):
delivered_ts = int(delta["watermarkTimestampMs"])
thread_id, thread_type = getThreadIdAndThreadType(delta)
self.onMessageSeen(seen_by=seen_by, thread_id=thread_id, thread_type=thread_type,
seen_ts=seen_ts, delivered_ts=delivered_ts, metadata=metadata)
seen_ts=seen_ts, delivered_ts=delivered_ts, metadata=metadata, msg=m)
continue
# Messages marked as seen
@@ -1188,7 +1200,7 @@ class Client(object):
threads = [getThreadIdAndThreadType({"threadKey": thr}) for thr in delta.get("threadKeys")]
# thread_id, thread_type = getThreadIdAndThreadType(delta)
self.onMarkedSeen(threads=threads, seen_ts=seen_ts, delivered_ts=delivered_ts, metadata=delta)
self.onMarkedSeen(threads=threads, seen_ts=seen_ts, delivered_ts=delivered_ts, metadata=delta, msg=m)
continue
# New message
@@ -1196,12 +1208,12 @@ class Client(object):
message = delta.get('body', '')
thread_id, thread_type = getThreadIdAndThreadType(metadata)
self.onMessage(mid=mid, author_id=author_id, message=message,
thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=m)
thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=m, msg=m)
continue
# Inbox
if mtype == "inbox":
self.onInbox(unseen=m["unseen"], unread=m["unread"], recent_unread=m["recent_unread"])
self.onInbox(unseen=m["unseen"], unread=m["unread"], recent_unread=m["recent_unread"], msg=m)
# Typing
# elif mtype == "typ":
@@ -1266,6 +1278,8 @@ class Client(object):
self.listening = False
except requests.exceptions.Timeout:
pass
except Exception as e:
self.onListenError(e)
@deprecated(deprecated_in='0.10.2', removed_in='0.15.0', details='Use stopListening() instead')

View File

@@ -1,43 +1,21 @@
import inspect
# -*- coding: UTF-8 -*-
class EventHook(object):
"""
A simple implementation of the Observer-Pattern.
The user can specify an event signature upon inizializazion,
defined by kwargs in the form of argumentname=class (e.g. id=int).
The arguments' types are not checked in this implementation though.
Callables with a fitting signature can be added with += or removed with -=.
All listeners can be notified by calling the EventHook class with fitting
arguments.
Thanks http://stackoverflow.com/a/35957226/5556222
All listeners added to this will be called, regardless of parameters
"""
def __init__(self, **signature):
self._signature = signature
self._argnames = set(signature.keys())
self._handlers = []
def __init__(self, *args):
self._handlers = list(args)
def _kwargs_str(self):
return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())
def add(self, handler):
return self.__iadd__(handler)
def remove(self, handler):
return self.__isub__(handler)
def __iadd__(self, handler):
params = inspect.signature(handler).parameters
valid = True
argnames = set(n for n in params.keys())
if argnames != self._argnames:
valid = False
for p in params.values():
if p.kind == p.VAR_KEYWORD:
valid = True
break
if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
valid = False
break
if not valid:
raise ValueError("Listener must have these arguments: (%s)"
% self._kwargs_str())
self._handlers.append(handler)
return self
@@ -46,12 +24,5 @@ class EventHook(object):
return self
def __call__(self, *args, **kwargs):
if args or set(kwargs.keys()) != self._argnames:
raise ValueError("This EventHook must be called with these " +
"keyword arguments: (%s)" % self._kwargs_str() +
", but was called with: (%s)" %self._signature)
for handler in self._handlers[:]:
handler(**kwargs)
def __repr__(self):
return "EventHook(%s)" % self._kwargs_str()
for handler in self._handlers:
handler(*args, **kwargs)

View File

@@ -1,3 +1,5 @@
# -*- coding: UTF-8 -*-
from __future__ import unicode_literals
import sys
from enum import Enum
@@ -111,3 +113,13 @@ class MessageReaction(Enum):
ANGRY = '😠'
YES = '👍'
NO = '👎'
MessageReactionFix = {
'😍': ('0001f60d', '%F0%9F%98%8D'),
'😆': ('0001f606', '%F0%9F%98%86'),
'😮': ('0001f62e', '%F0%9F%98%AE'),
'😢': ('0001f622', '%F0%9F%98%A2'),
'😠': ('0001f620', '%F0%9F%98%A0'),
'👍': ('0001f44d', '%F0%9F%91%8D'),
'👎': ('0001f44e', '%F0%9F%91%8E')
}

View File

@@ -1,3 +1,6 @@
# -*- coding: UTF-8 -*-
from __future__ import unicode_literals
import re
import json
from time import time
@@ -98,18 +101,21 @@ def getSignatureID():
def generateOfflineThreadingID():
ret = now()
value = int(random() * 4294967295)
string = ("0000000000000000000000" + bin(value))[-22:]
msgs = bin(ret) + string
string = ("0000000000000000000000" + format(value, 'b'))[-22:]
msgs = format(ret, 'b') + string
return str(int(msgs, 2))
def isUserToThreadType(is_user):
return ThreadType.USER if is_user else ThreadType.GROUP
def raise_exception(e):
raise e
def deprecation(name, deprecated_in=None, removed_in=None, details='', stacklevel=3):
"""This is a function which should be used to mark parameters as deprecated.
It will result in a warning being emmitted when the parameter is used.
"""
warning = "{} is deprecated".format(name)
warning = "Client.{} is deprecated".format(name)
if deprecated_in:
warning += ' in v. {}'.format(deprecated_in)
if removed_in:
@@ -127,7 +133,7 @@ def deprecated(deprecated_in=None, removed_in=None, details=''):
"""
def wrap(func, *args, **kwargs):
def wrapped_func(*args, **kwargs):
deprecation(func.__qualname__, deprecated_in=deprecated_in, removed_in=removed_in, details=details, stacklevel=3)
deprecation(func.__name__, deprecated_in=deprecated_in, removed_in=removed_in, details=details, stacklevel=3)
return func(*args, **kwargs)
return wrapped_func
return wrap

View File

@@ -1,5 +1,7 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
from __future__ import unicode_literals
import json
import logging
import unittest
@@ -16,7 +18,7 @@ logging_level = logging.ERROR
Tests for fbchat
~~~~~~~~~~~~~~~~
To use these tests copy test_data.json to my_test_data.json or type this information manually in the terminal prompts.
To use these tests copy `tests/data.json` to `tests/my_data.json` or type this information manually in the terminal prompts.
- email: Your (or a test user's) email / phone number
- password: Your (or a test user's) password
@@ -49,7 +51,7 @@ class TestFbchat(unittest.TestCase):
self.assertFalse(client.isLoggedIn())
with self.assertRaises(Exception):
client.login("not@email.com", "not_password", max_retries=1)
client.login('not@email.com', 'not_password', max_retries=1)
client.login(email, password)
@@ -63,14 +65,14 @@ class TestFbchat(unittest.TestCase):
self.assertTrue(client.isLoggedIn())
def test_defaultThread(self):
# setDefaultThread
client.setDefaultThread(client.uid, ThreadType.USER)
self.assertTrue(client.sendMessage('test_default_recipient★'))
# resetDefaultThread
client.resetDefaultThread()
with self.assertRaises(ValueError):
client.sendMessage("should_not_send")
# setDefaultThread
client.setDefaultThread(client.uid, ThreadType.USER)
self.assertTrue(client.sendMessage("test_default_recipient"))
client.sendMessage('should_not_send')
def test_getAllUsers(self):
users = client.getAllUsers()
@@ -100,32 +102,32 @@ class TestFbchat(unittest.TestCase):
self.assertTrue(client.sendEmoji("😆", EmojiSize.LARGE, group_uid, ThreadType.GROUP))
def test_sendMessage(self):
self.assertIsNotNone(client.sendMessage('test_send_user', user_uid, ThreadType.USER))
self.assertIsNotNone(client.sendMessage('test_send_group', group_uid, ThreadType.GROUP))
self.assertIsNone(client.sendMessage('test_send_user_should_fail', user_uid, ThreadType.GROUP))
self.assertIsNone(client.sendMessage('test_send_group_should_fail', group_uid, ThreadType.USER))
self.assertIsNotNone(client.sendMessage('test_send_user', user_uid, ThreadType.USER))
self.assertIsNotNone(client.sendMessage('test_send_group', group_uid, ThreadType.GROUP))
self.assertIsNone(client.sendMessage('test_send_user_should_fail', user_uid, ThreadType.GROUP))
self.assertIsNone(client.sendMessage('test_send_group_should_fail', group_uid, ThreadType.USER))
def test_sendImages(self):
image_url = 'https://cdn4.iconfinder.com/data/icons/ionicons/512/icon-image-128.png'
image_local_url = path.join(path.dirname(__file__), 'test_image.png')
image_local_url = path.join(path.dirname(__file__), 'tests/image.png')
#self.assertTrue(client.sendRemoteImage(image_url, 'test_send_user_images_remote', user_uid, ThreadType.USER))
self.assertTrue(client.sendRemoteImage(image_url, 'test_send_group_images_remote', group_uid, ThreadType.GROUP))
self.assertTrue(client.sendRemoteImage(image_url, 'test_send_group_images_remote', group_uid, ThreadType.GROUP))
# Idk why but doesnt work, payload is null
#self.assertTrue(client.sendLocalImage(image_local_url, 'test_send_group_images_local', user_uid, ThreadType.USER))
self.assertTrue(client.sendLocalImage(image_local_url, 'test_send_group_images_local', group_uid, ThreadType.GROUP))
self.assertTrue(client.sendLocalImage(image_local_url, 'test_send_group_images_local', group_uid, ThreadType.GROUP))
def test_getThreadInfo(self):
client.sendMessage('test_user_getThreadInfo', user_uid, ThreadType.USER)
client.sendMessage('test_user_getThreadInfo', user_uid, ThreadType.USER)
info = client.getThreadInfo(20, user_uid, ThreadType.USER)
self.assertEqual(info[0].author, 'fbid:' + client.uid)
self.assertEqual(info[0].body, 'test_user_getThreadInfo')
self.assertEqual(info[0].body, 'test_user_getThreadInfo')
client.sendMessage('test_group_getThreadInfo', group_uid, ThreadType.GROUP)
client.sendMessage('test_group_getThreadInfo', group_uid, ThreadType.GROUP)
info = client.getThreadInfo(20, group_uid, ThreadType.GROUP)
self.assertEqual(info[0].author, 'fbid:' + client.uid)
self.assertEqual(info[0].body, 'test_group_getThreadInfo')
self.assertEqual(info[0].body, 'test_group_getThreadInfo')
def test_markAs(self):
# To be implemented (requires some form of manual watching)
@@ -143,7 +145,7 @@ class TestFbchat(unittest.TestCase):
self.assertTrue(client.addUsersToGroup([user_uid], group_uid))
def test_changeGroupTitle(self):
self.assertTrue(client.changeGroupTitle('test_changeGroupTitle', group_uid))
self.assertTrue(client.changeGroupTitle('test_changeGroupTitle', group_uid))
def test_changeThreadColor(self):
self.assertTrue(client.changeThreadColor(ThreadColor.BRILLIANT_ROSE, group_uid))
@@ -152,9 +154,15 @@ class TestFbchat(unittest.TestCase):
self.assertTrue(client.changeThreadColor(ThreadColor.MESSENGER_BLUE, user_uid))
def test_reactToMessage(self):
mid = client.sendMessage("react_to_message", user_uid, ThreadType.USER)
mid = client.sendMessage('react_to_message', user_uid, ThreadType.USER)
self.assertTrue(client.reactToMessage(mid, MessageReaction.LOVE))
def test_setTypingStatus(self):
self.assertTrue(client.setTypingStatus(TypingStatus.TYPING, thread_id=user_uid, thread_type=ThreadType.USER))
self.assertTrue(client.setTypingStatus(TypingStatus.STOPPED, thread_id=user_uid, thread_type=ThreadType.USER))
self.assertTrue(client.setTypingStatus(TypingStatus.TYPING, thread_id=group_uid, thread_type=ThreadType.GROUP))
self.assertTrue(client.setTypingStatus(TypingStatus.STOPPED, thread_id=group_uid, thread_type=ThreadType.GROUP))
def start_test(param_client, param_group_uid, param_user_uid, tests=[]):
global client
@@ -185,7 +193,7 @@ if __name__ == '__main__':
pass
try:
with open(path.join(path.dirname(__file__), 'my_test_data.json'), 'r') as f:
with open(path.join(path.dirname(__file__), 'tests/my_data.json'), 'r') as f:
json = json.load(f)
email = json['email']
password = json['password']

View File

Before

Width:  |  Height:  |  Size: 1.6 KiB

After

Width:  |  Height:  |  Size: 1.6 KiB