Compare commits
159 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
6116bc9ca4 | ||
|
7bf6a9fadc | ||
|
4490360e11 | ||
|
a4dfe0d279 | ||
|
47679d1d3b | ||
|
62e17daf78 | ||
|
1f359f2a72 | ||
|
cebe7a28c0 | ||
|
e614800d5f | ||
|
151a114235 | ||
|
38f66147cb | ||
|
ffa26c20b5 | ||
|
430ada7f84 | ||
|
988e37eb42 | ||
|
1938b90bce | ||
|
f61d1403f3 | ||
|
d228f34f64 | ||
|
97049556ed | ||
|
b64c6a94cc | ||
|
edc655bae7 | ||
|
884af48270 | ||
|
95f018fad3 | ||
|
b44758a195 | ||
|
f1c20d490e | ||
|
04372d498e | ||
|
63ea899605 | ||
|
4fdd145d1e | ||
|
57ee68b0e0 | ||
|
99c6884681 | ||
|
1c1438e9bc | ||
|
22f1b3e489 | ||
|
fb1ad5800c | ||
|
4dd15b05ef | ||
|
d7cdb644c4 | ||
|
bfcf4950b3 | ||
|
6612c97f05 | ||
|
b92cf62726 | ||
|
a53ba33a81 | ||
|
c04d38cf63 | ||
|
a051adcbc0 | ||
|
900a9cdf72 | ||
|
611b329934 | ||
|
2642788bc1 | ||
|
8268445f0b | ||
|
c12dcd9263 | ||
|
3142524809 | ||
|
4c9d3bd9d7 | ||
|
ba103066b8 | ||
|
0b0d6179a2 | ||
|
e8806d4ef8 | ||
|
c96e5f174c | ||
|
315242e069 | ||
|
a94fa5fbe3 | ||
|
90203afdd0 | ||
|
2c0d098852 | ||
|
e4290cd465 | ||
|
46b85dec5c | ||
|
bbc34bd009 | ||
|
c495317e65 | ||
|
a946050228 | ||
|
83789dcefa | ||
|
4f1f9bf1ce | ||
|
32c72c2f35 | ||
|
42ae0035af | ||
|
96e28fdbe6 | ||
|
0f889f50cf | ||
|
478eaebdec | ||
|
7ecf229db5 | ||
|
dda75c6099 | ||
|
28d5ac9f90 | ||
|
52acfb4636 | ||
|
2a64bad385 | ||
|
1a73699f1a | ||
|
1b5a7a0063 | ||
|
4b3eb440cf | ||
|
d1f457866b | ||
|
6f29aa82cb | ||
|
b1a2ff7d84 | ||
|
883b16e251 | ||
|
116b39cf6a | ||
|
eae1db9c7d | ||
|
730bab5d40 | ||
|
d52dac233e | ||
|
1f37277a8d | ||
|
15014d7055 | ||
|
7a35ca05b1 | ||
|
be6b6909d9 | ||
|
42c1d26b2e | ||
|
d38f8ad2ec | ||
|
023fd58f05 | ||
|
ad10a8f07f | ||
|
7d6cf039d4 | ||
|
f0271e17b0 | ||
|
57954816b2 | ||
|
3e4e1f9bb9 | ||
|
7340918209 | ||
|
707df4f941 | ||
|
8eb6b83411 | ||
|
e0aedd617b | ||
|
ee81620c14 | ||
|
2d027af71a | ||
|
9d5f06b810 | ||
|
b8fdcda2fb | ||
|
0dac7b7b81 | ||
|
b750e753d6 | ||
|
ee33e92bed | ||
|
7413a643f6 | ||
|
34452f9220 | ||
|
24831b2462 | ||
|
cd4a18cb5a | ||
|
c00b3df8b2 | ||
|
1beb821b2c | ||
|
a58791048a | ||
|
f0c6e8612f | ||
|
1cebbf92e6 | ||
|
a64982583b | ||
|
cb8b0915de | ||
|
1d2576b06d | ||
|
ead9a3c0e9 | ||
|
59ba418faa | ||
|
c51a332560 | ||
|
a73d2feed6 | ||
|
6929193e9d | ||
|
fea4ad9e89 | ||
|
68099049d4 | ||
|
44cf08bdfd | ||
|
9e32cf17a4 | ||
|
0661367ebb | ||
|
3c07e42ba2 | ||
|
2cd6376818 | ||
|
5e7f7750de | ||
|
2a223ec6db | ||
|
a99108fff6 | ||
|
8de4698cc4 | ||
|
637319ec2c | ||
|
f9398564cd | ||
|
b57f423eb4 | ||
|
3093f1f2b6 | ||
|
961777e0c1 | ||
|
d7139701f7 | ||
|
c6bac17d48 | ||
|
3638fc5356 | ||
|
aca9176f7f | ||
|
0d5e4f6d3f | ||
|
92a5ffdef8 | ||
|
b3359fccdb | ||
|
d8f7366d1f | ||
|
ff94dc20af | ||
|
a8df0a548f | ||
|
13d0dc4ba4 | ||
|
64125a1aca | ||
|
4feae03092 | ||
|
5f993c2bf8 | ||
|
35bbcbffba | ||
|
5faca54d67 | ||
|
82496b8e04 | ||
|
2d74ec7823 | ||
|
1d42c4d3a6 | ||
|
4a8ef00442 |
9
.gitignore
vendored
9
.gitignore
vendored
@@ -24,7 +24,12 @@ develop-eggs
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# Data for tests
|
||||
# Scripts and data for tests
|
||||
my_tests.py
|
||||
my_test_data.json
|
||||
my_data.json
|
||||
tests.data
|
||||
tests.data
|
||||
.pytest_cache
|
||||
|
||||
# Virtual environment
|
||||
venv/
|
||||
|
35
.travis.yml
Normal file
35
.travis.yml
Normal file
@@ -0,0 +1,35 @@
|
||||
sudo: false
|
||||
language: python
|
||||
python:
|
||||
- 2.7
|
||||
- 3.4
|
||||
- 3.5
|
||||
- 3.6
|
||||
|
||||
env:
|
||||
global:
|
||||
# These two accounts are made specifically for this purpose, and the passwords are really only encrypted for obscurity
|
||||
# In reality, you could probably still login with the accounts by looking at log output from travis-ci.org
|
||||
- client1_email=travis.fbchat1@gmail.com # Facebook ID: 100023782141139
|
||||
- secure: "W1NON6qaLnvYIOVoC93MXkmbAIkUkHcGREBwN0BSVM3cLuMduk4VVkz6PY2T8bnntGYVwicXwcn5aNJ6pDue17TBZqGPk/tdpws8mnAZUtBYhpkIFTTlyh5kJSZejx9fd5s4nceGpH6ofCCnNxPp2PdHKU8piqnQYZVQ4cFNNDE=" # client1_password
|
||||
- client2_email=fbchat.travis2@gmail.com # Facebook ID: 100026538491708
|
||||
- secure: "V7RB3go2Tc/DdW1x9DkMI+vCfnOgiS3ygmFCABs/GjfPZjZL7VLMJgYGlx0cjeeeN+Oxa2GrhczRAKeMdGB6Ss2lGGAVs6cjJ56ODuBHWT6/FNzLjtDkTnjD+Kfh0l8ZOdxTF3MQ6M/9hU6z5ek+XYGr7u+/7wOYZ5L2cK5MaQ0=" # client2_password
|
||||
- group_id=1463789480385605
|
||||
|
||||
before_script:
|
||||
- if [[ "$TRAVIS_PYTHON_VERSION" = "2.7" ]]; then export PYTEST_ADDOPTS='-m ""'; fi; # expensive tests (otherwise disabled in pytest.ini)
|
||||
- if [[ "$TRAVIS_PULL_REQUEST" != false ]]; then export PYTEST_ADDOPTS='-m offline'; fi; # offline tests only
|
||||
|
||||
script: python -m pytest || python -m pytest --lf; # Run failed tests twice
|
||||
|
||||
cache: pip
|
||||
|
||||
deploy:
|
||||
provider: pypi
|
||||
user: madsmtm
|
||||
password:
|
||||
secure: "VA0MLSrwIW/T2KjMwjLZCzrLHw8pJT6tAvb48t7qpBdm8x192hax61pz1TaBZoJvlzyBPFKvluftuclTc7yEFwzXe7Gjqgd/ODKZl/wXDr36hQ7BBOLPZujdwmWLvTzMh3eJZlvkgcLCzrvK3j2oW8cM/+FZeVi/5/FhVuJ4ofs="
|
||||
on:
|
||||
python: 3.6
|
||||
branch: master
|
||||
tags: true
|
@@ -27,4 +27,8 @@ Installation:
|
||||
|
||||
$ pip install fbchat
|
||||
|
||||
© Copyright 2015 - 2017 by Taehoon Kim / `@carpedm20 <http://carpedm20.github.io/about/>`__
|
||||
Maintainer
|
||||
----------
|
||||
|
||||
- Mads Marquart / `@madsmtm <https://github.com/madsmtm>`__
|
||||
- Taehoon Kim / `@carpedm20 <http://carpedm20.github.io/about/>`__
|
||||
|
@@ -52,12 +52,12 @@ A thread can refer to two things: A Messenger group chat or a single Facebook us
|
||||
These will specify whether the thread is a single user chat or a group chat.
|
||||
This is required for many of `fbchat`'s functions, since Facebook differetiates between these two internally
|
||||
|
||||
Searching for group chats and finding their ID is not yet possible with `fbchat`,
|
||||
but searching for users is possible via. :func:`Client.searchForUsers`. See :ref:`intro_fetching`
|
||||
Searching for group chats and finding their ID can be done via. :func:`Client.searchForGroups`,
|
||||
and searching for users is possible via. :func:`Client.searchForUsers`. See :ref:`intro_fetching`
|
||||
|
||||
You can get your own user ID by using :any:`Client.uid`
|
||||
|
||||
Getting the ID of a group chat is fairly trivial though, since you only need to navigate to `<https://www.facebook.com/messages/>`_,
|
||||
Getting the ID of a group chat is fairly trivial otherwise, since you only need to navigate to `<https://www.facebook.com/messages/>`_,
|
||||
click on the group you want to find the ID of, and then read the id from the address bar.
|
||||
The URL will look something like this: ``https://www.facebook.com/messages/t/1234567890``, where ``1234567890`` would be the ID of the group.
|
||||
An image to illustrate this is shown below:
|
||||
@@ -70,8 +70,8 @@ The same method can be applied to some user accounts, though if they've set a cu
|
||||
Here's an snippet showing the usage of thread IDs and thread types, where ``<user id>`` and ``<group id>``
|
||||
corresponds to the ID of a single user, and the ID of a group respectively::
|
||||
|
||||
client.sendMessage('<message>', thread_id='<user id>', thread_type=ThreadType.USER)
|
||||
client.sendMessage('<message>', thread_id='<group id>', thread_type=ThreadType.GROUP)
|
||||
client.send(Message(text='<message>'), thread_id='<user id>', thread_type=ThreadType.USER)
|
||||
client.send(Message(text='<message>'), thread_id='<group id>', thread_type=ThreadType.GROUP)
|
||||
|
||||
Some functions (e.g. :func:`Client.changeThreadColor`) don't require a thread type, so in these cases you just provide the thread ID::
|
||||
|
||||
@@ -91,7 +91,7 @@ Some of `fbchat`'s functions require these ID's, like :func:`Client.reactToMessa
|
||||
and some of then provide this ID, like :func:`Client.sendMessage`.
|
||||
This snippet shows how to send a message, and then use the returned ID to react to that message with a 😍 emoji::
|
||||
|
||||
message_id = client.sendMessage('message', thread_id=thread_id, thread_type=thread_type)
|
||||
message_id = client.send(Message(text='message'), thread_id=thread_id, thread_type=thread_type)
|
||||
client.reactToMessage(message_id, MessageReaction.LOVE)
|
||||
|
||||
|
||||
@@ -108,7 +108,7 @@ like adding users to and removing users from a group chat, logically only works
|
||||
The simplest way of using `fbchat` is to send a message.
|
||||
The following snippet will, as you've probably already figured out, send the message `test message` to your account::
|
||||
|
||||
message_id = client.sendMessage('test message', thread_id=client.uid, thread_type=ThreadType.USER)
|
||||
message_id = client.send(Message(text='test message'), thread_id=client.uid, thread_type=ThreadType.USER)
|
||||
|
||||
You can see a full example showing all the possible thread interactions with `fbchat` by going to :ref:`examples`
|
||||
|
||||
@@ -175,8 +175,8 @@ meaning it will simply print information to the console when an event happens
|
||||
The event actions can be changed by subclassing the :class:`Client`, and then overwriting the event methods::
|
||||
|
||||
class CustomClient(Client):
|
||||
def onMessage(self, mid, author_id, message, thread_id, thread_type, ts, metadata, msg, **kwargs):
|
||||
# Do something with the message here
|
||||
def onMessage(self, mid, author_id, message_object, thread_id, thread_type, ts, metadata, msg, **kwargs):
|
||||
# Do something with message_object here
|
||||
pass
|
||||
|
||||
client = CustomClient('<email>', '<password>')
|
||||
@@ -184,13 +184,13 @@ The event actions can be changed by subclassing the :class:`Client`, and then ov
|
||||
**Notice:** The following snippet is as equally valid as the previous one::
|
||||
|
||||
class CustomClient(Client):
|
||||
def onMessage(self, message, author_id, thread_id, thread_type, **kwargs):
|
||||
# Do something with the message here
|
||||
def onMessage(self, message_object, author_id, thread_id, thread_type, **kwargs):
|
||||
# Do something with message_object here
|
||||
pass
|
||||
|
||||
client = CustomClient('<email>', '<password>')
|
||||
|
||||
The change was in the parameters that our `onMessage` method took: ``message`` and ``author_id`` got swapped,
|
||||
The change was in the parameters that our `onMessage` method took: ``message_object`` and ``author_id`` got swapped,
|
||||
and ``mid``, ``ts``, ``metadata`` and ``msg`` got removed, but the function still works, since we included ``**kwargs``
|
||||
|
||||
.. note::
|
||||
|
@@ -7,6 +7,6 @@ client = Client('<email>', '<password>')
|
||||
|
||||
print('Own id: {}'.format(client.uid))
|
||||
|
||||
client.sendMessage('Hi me!', thread_id=client.uid, thread_type=ThreadType.USER)
|
||||
client.send(Message(text='Hi me!'), thread_id=client.uid, thread_type=ThreadType.USER)
|
||||
|
||||
client.logout()
|
||||
|
@@ -4,15 +4,15 @@ from fbchat import log, Client
|
||||
|
||||
# Subclass fbchat.Client and override required methods
|
||||
class EchoBot(Client):
|
||||
def onMessage(self, author_id, message, thread_id, thread_type, **kwargs):
|
||||
self.markAsDelivered(author_id, thread_id)
|
||||
self.markAsRead(author_id)
|
||||
def onMessage(self, author_id, message_object, thread_id, thread_type, **kwargs):
|
||||
self.markAsDelivered(thread_id, message_object.uid)
|
||||
self.markAsRead(thread_id)
|
||||
|
||||
log.info("Message from {} in {} ({}): {}".format(author_id, thread_id, thread_type.name, message))
|
||||
log.info("{} from {} in {}".format(message_object, thread_id, thread_type.name))
|
||||
|
||||
# If you're not the author, echo
|
||||
if author_id != self.uid:
|
||||
self.sendMessage(message, thread_id=thread_id, thread_type=thread_type)
|
||||
self.send(message_object, thread_id=thread_id, thread_type=thread_type)
|
||||
|
||||
client = EchoBot("<email>", "<password>")
|
||||
client.listen()
|
||||
|
@@ -12,7 +12,7 @@ print("users' IDs: {}".format(user.uid for user in users))
|
||||
print("users' names: {}".format(user.name for user in users))
|
||||
|
||||
|
||||
# If we have a user id, we can use `getUserInfo` to fetch a `User` object
|
||||
# If we have a user id, we can use `fetchUserInfo` to fetch a `User` object
|
||||
user = client.fetchUserInfo('<user id>')['<user id>']
|
||||
# We can also query both mutiple users together, which returns list of `User` objects
|
||||
users = client.fetchUserInfo('<1st user id>', '<2nd user id>', '<3rd user id>')
|
||||
@@ -49,4 +49,16 @@ for message in messages:
|
||||
print(message.text)
|
||||
|
||||
|
||||
# If we have a thread id, we can use `fetchThreadInfo` to fetch a `Thread` object
|
||||
thread = client.fetchThreadInfo('<thread id>')['<thread id>']
|
||||
print("thread's name: {}".format(thread.name))
|
||||
print("thread's type: {}".format(thread.type))
|
||||
|
||||
|
||||
# `searchForThreads` searches works like `searchForUsers`, but gives us a list of threads instead
|
||||
thread = client.searchForThreads('<name of thread>')[0]
|
||||
print("thread's name: {}".format(thread.name))
|
||||
print("thread's type: {}".format(thread.type))
|
||||
|
||||
|
||||
# Here should be an example of `getUnread`
|
||||
|
@@ -9,19 +9,25 @@ thread_id = '1234567890'
|
||||
thread_type = ThreadType.GROUP
|
||||
|
||||
# Will send a message to the thread
|
||||
client.sendMessage('<message>', thread_id=thread_id, thread_type=thread_type)
|
||||
client.send(Message(text='<message>'), thread_id=thread_id, thread_type=thread_type)
|
||||
|
||||
# Will send the default `like` emoji
|
||||
client.sendEmoji(emoji=None, size=EmojiSize.LARGE, thread_id=thread_id, thread_type=thread_type)
|
||||
client.send(Message(emoji_size=EmojiSize.LARGE), thread_id=thread_id, thread_type=thread_type)
|
||||
|
||||
# Will send the emoji `👍`
|
||||
client.sendEmoji(emoji='👍', size=EmojiSize.LARGE, thread_id=thread_id, thread_type=thread_type)
|
||||
client.send(Message(text='👍', emoji_size=EmojiSize.LARGE), thread_id=thread_id, thread_type=thread_type)
|
||||
|
||||
# Will send the sticker with ID `767334476626295`
|
||||
client.send(Message(sticker=Sticker('767334476626295')), thread_id=thread_id, thread_type=thread_type)
|
||||
|
||||
# Will send a message with a mention
|
||||
client.send(Message(text='This is a @mention', mentions=[Mention(thread_id, offset=10, length=8)]), thread_id=thread_id, thread_type=thread_type)
|
||||
|
||||
# Will send the image located at `<image path>`
|
||||
client.sendLocalImage('<image path>', message='This is a local image', thread_id=thread_id, thread_type=thread_type)
|
||||
client.sendLocalImage('<image path>', message=Message(text='This is a local image'), thread_id=thread_id, thread_type=thread_type)
|
||||
|
||||
# Will download the image at the url `<image url>`, and then send it
|
||||
client.sendRemoteImage('<image url>', message='This is a remote image', thread_id=thread_id, thread_type=thread_type)
|
||||
client.sendRemoteImage('<image url>', message=Message(text='This is a remote image'), thread_id=thread_id, thread_type=thread_type)
|
||||
|
||||
|
||||
# Only do these actions if the thread is a group
|
||||
|
@@ -4,14 +4,14 @@ from fbchat import log, Client
|
||||
from fbchat.models import *
|
||||
|
||||
class RemoveBot(Client):
|
||||
def onMessage(self, author_id, message, thread_id, thread_type, **kwargs):
|
||||
def onMessage(self, author_id, message_object, thread_id, thread_type, **kwargs):
|
||||
# We can only kick people from group chats, so no need to try if it's a user chat
|
||||
if message == 'Remove me!' and thread_type == ThreadType.GROUP:
|
||||
if message_object.text == 'Remove me!' and thread_type == ThreadType.GROUP:
|
||||
log.info('{} will be removed from {}'.format(author_id, thread_id))
|
||||
self.removeUserFromGroup(author_id, thread_id=thread_id)
|
||||
else:
|
||||
# Sends the data to the inherited onMessage, so that we can still see when a message is recieved
|
||||
super(type(self), self).onMessage(author_id=author_id, message=message, thread_id=thread_id, thread_type=thread_type, **kwargs)
|
||||
super(RemoveBot, self).onMessage(author_id=author_id, message_object=message_object, thread_id=thread_id, thread_type=thread_type, **kwargs)
|
||||
|
||||
client = RemoveBot("<email>", "<password>")
|
||||
client.listen()
|
||||
|
@@ -1,5 +1,10 @@
|
||||
# -*- coding: UTF-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
from datetime import datetime
|
||||
from .client import *
|
||||
|
||||
|
||||
"""
|
||||
fbchat
|
||||
~~~~~~
|
||||
@@ -10,11 +15,9 @@
|
||||
:license: BSD, see LICENSE for more details.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from .client import *
|
||||
|
||||
__copyright__ = 'Copyright 2015 - {} by Taehoon Kim'.format(datetime.now().year)
|
||||
__version__ = '1.0.1'
|
||||
__version__ = '1.3.8'
|
||||
__license__ = 'BSD'
|
||||
__author__ = 'Taehoon Kim; Moreels Pieter-Jan; Mads Marquart'
|
||||
__email__ = 'carpedm20@gmail.com'
|
||||
|
923
fbchat/client.py
923
fbchat/client.py
File diff suppressed because it is too large
Load Diff
@@ -23,49 +23,225 @@ class ConcatJSONDecoder(json.JSONDecoder):
|
||||
return objs
|
||||
# End shameless copy
|
||||
|
||||
def graphql_color_to_enum(color):
|
||||
if color is None:
|
||||
return None
|
||||
if len(color) == 0:
|
||||
return ThreadColor.MESSENGER_BLUE
|
||||
try:
|
||||
return ThreadColor('#{}'.format(color[2:].lower()))
|
||||
except ValueError:
|
||||
raise FBchatException('Could not get ThreadColor from color: {}'.format(color))
|
||||
|
||||
def get_customization_info(thread):
|
||||
if thread is None or thread.get('customization_info') is None:
|
||||
return {}
|
||||
info = thread['customization_info']
|
||||
|
||||
rtn = {
|
||||
'emoji': info.get('emoji'),
|
||||
'color': graphql_color_to_enum(info.get('outgoing_bubble_color'))
|
||||
}
|
||||
if thread.get('thread_type') in ('GROUP', 'ROOM') or thread.get('is_group_thread') or thread.get('thread_key', {}).get('thread_fbid'):
|
||||
rtn['nicknames'] = {}
|
||||
for k in info.get('participant_customizations', []):
|
||||
rtn['nicknames'][k['participant_id']] = k.get('nickname')
|
||||
elif info.get('participant_customizations'):
|
||||
uid = thread.get('thread_key', {}).get('other_user_id') or thread.get('id')
|
||||
pc = info['participant_customizations']
|
||||
if len(pc) > 0:
|
||||
if pc[0].get('participant_id') == uid:
|
||||
rtn['nickname'] = pc[0].get('nickname')
|
||||
else:
|
||||
rtn['own_nickname'] = pc[0].get('nickname')
|
||||
if len(pc) > 1:
|
||||
if pc[1].get('participant_id') == uid:
|
||||
rtn['nickname'] = pc[1].get('nickname')
|
||||
else:
|
||||
rtn['own_nickname'] = pc[1].get('nickname')
|
||||
return rtn
|
||||
|
||||
|
||||
def graphql_to_sticker(s):
|
||||
if not s:
|
||||
return None
|
||||
sticker = Sticker(
|
||||
uid=s['id']
|
||||
)
|
||||
if s.get('pack'):
|
||||
sticker.pack = s['pack'].get('id')
|
||||
if s.get('sprite_image'):
|
||||
sticker.is_animated = True
|
||||
sticker.medium_sprite_image = s['sprite_image'].get('uri')
|
||||
sticker.large_sprite_image = s['sprite_image_2x'].get('uri')
|
||||
sticker.frames_per_row = s.get('frames_per_row')
|
||||
sticker.frames_per_col = s.get('frames_per_column')
|
||||
sticker.frame_rate = s.get('frame_rate')
|
||||
sticker.url = s.get('url')
|
||||
sticker.width = s.get('width')
|
||||
sticker.height = s.get('height')
|
||||
if s.get('label'):
|
||||
sticker.label = s['label']
|
||||
return sticker
|
||||
|
||||
def graphql_to_attachment(a):
|
||||
_type = a['__typename']
|
||||
if _type in ['MessageImage', 'MessageAnimatedImage']:
|
||||
return ImageAttachment(
|
||||
original_extension=a.get('original_extension') or (a['filename'].split('-')[0] if a.get('filename') else None),
|
||||
width=a.get('original_dimensions', {}).get('width'),
|
||||
height=a.get('original_dimensions', {}).get('height'),
|
||||
is_animated=_type=='MessageAnimatedImage',
|
||||
thumbnail_url=a.get('thumbnail', {}).get('uri'),
|
||||
preview=a.get('preview') or a.get('preview_image'),
|
||||
large_preview=a.get('large_preview'),
|
||||
animated_preview=a.get('animated_image'),
|
||||
uid=a.get('legacy_attachment_id')
|
||||
)
|
||||
elif _type == 'MessageVideo':
|
||||
return VideoAttachment(
|
||||
width=a.get('original_dimensions', {}).get('width'),
|
||||
height=a.get('original_dimensions', {}).get('height'),
|
||||
duration=a.get('playable_duration_in_ms'),
|
||||
preview_url=a.get('playable_url'),
|
||||
small_image=a.get('chat_image'),
|
||||
medium_image=a.get('inbox_image'),
|
||||
large_image=a.get('large_image'),
|
||||
uid=a.get('legacy_attachment_id')
|
||||
)
|
||||
elif _type == 'MessageAudio':
|
||||
return AudioAttachment(
|
||||
filename=a.get('filename'),
|
||||
url=a.get('playable_url'),
|
||||
duration=a.get('playable_duration_in_ms'),
|
||||
audio_type=a.get('audio_type')
|
||||
)
|
||||
elif _type == 'MessageFile':
|
||||
return FileAttachment(
|
||||
url=a.get('url'),
|
||||
name=a.get('filename'),
|
||||
is_malicious=a.get('is_malicious'),
|
||||
uid=a.get('message_file_fbid')
|
||||
)
|
||||
else:
|
||||
return Attachment(
|
||||
uid=a.get('legacy_attachment_id')
|
||||
)
|
||||
|
||||
def graphql_to_message(message):
|
||||
if message.get('message_sender') is None:
|
||||
message['message_sender'] = {}
|
||||
if message.get('message') is None:
|
||||
message['message'] = {}
|
||||
is_read = None
|
||||
if message.get('unread') is not None:
|
||||
is_read = not message['unread']
|
||||
return Message(
|
||||
message.get('message_id'),
|
||||
author=message.get('message_sender').get('id'),
|
||||
timestamp=message.get('timestamp_precise'),
|
||||
is_read=is_read,
|
||||
reactions=message.get('message_reactions'),
|
||||
rtn = Message(
|
||||
text=message.get('message').get('text'),
|
||||
mentions=[Mention(m.get('entity', {}).get('id'), offset=m.get('offset'), length=m.get('length')) for m in message.get('message').get('ranges', [])],
|
||||
sticker=message.get('sticker'),
|
||||
attachments=message.get('blob_attachments')
|
||||
emoji_size=get_emojisize_from_tags(message.get('tags_list')),
|
||||
sticker=graphql_to_sticker(message.get('sticker'))
|
||||
)
|
||||
rtn.uid = str(message.get('message_id'))
|
||||
rtn.author = str(message.get('message_sender').get('id'))
|
||||
rtn.timestamp = message.get('timestamp_precise')
|
||||
if message.get('unread') is not None:
|
||||
rtn.is_read = not message['unread']
|
||||
rtn.reactions = {str(r['user']['id']):MessageReaction(r['reaction']) for r in message.get('message_reactions')}
|
||||
if message.get('blob_attachments') is not None:
|
||||
rtn.attachments = [graphql_to_attachment(attachment) for attachment in message['blob_attachments']]
|
||||
# TODO: This is still missing parsing:
|
||||
# message.get('extensible_attachment')
|
||||
return rtn
|
||||
|
||||
def graphql_to_user(user):
|
||||
if user.get('profile_picture') is None:
|
||||
user['profile_picture'] = {}
|
||||
c_info = get_customization_info(user)
|
||||
return User(
|
||||
user['id'],
|
||||
url=user.get('url'),
|
||||
first_name=user.get('first_name'),
|
||||
last_name=user.get('last_name'),
|
||||
is_friend=user.get('is_viewer_friend'),
|
||||
gender=GENDERS[user.get('gender')],
|
||||
gender=GENDERS.get(user.get('gender')),
|
||||
affinity=user.get('affinity'),
|
||||
nickname=c_info.get('nickname'),
|
||||
color=c_info.get('color'),
|
||||
emoji=c_info.get('emoji'),
|
||||
own_nickname=c_info.get('own_nickname'),
|
||||
photo=user['profile_picture'].get('uri'),
|
||||
name=user.get('name')
|
||||
name=user.get('name'),
|
||||
message_count=user.get('messages_count')
|
||||
)
|
||||
|
||||
def graphql_to_thread(thread):
|
||||
if thread['thread_type'] == 'GROUP':
|
||||
return graphql_to_group(thread)
|
||||
elif thread['thread_type'] == 'ONE_TO_ONE':
|
||||
if thread.get('big_image_src') is None:
|
||||
thread['big_image_src'] = {}
|
||||
c_info = get_customization_info(thread)
|
||||
participants = [node['messaging_actor'] for node in thread['all_participants']['nodes']]
|
||||
user = next(p for p in participants if p['id'] == thread['thread_key']['other_user_id'])
|
||||
last_message_timestamp = None
|
||||
if 'last_message' in thread:
|
||||
last_message_timestamp = thread['last_message']['nodes'][0]['timestamp_precise']
|
||||
|
||||
return User(
|
||||
user['id'],
|
||||
url=user.get('url'),
|
||||
name=user.get('name'),
|
||||
first_name=user.get('short_name'),
|
||||
last_name=user.get('name').split(user.get('short_name'),1).pop().strip(),
|
||||
is_friend=user.get('is_viewer_friend'),
|
||||
gender=GENDERS.get(user.get('gender')),
|
||||
affinity=user.get('affinity'),
|
||||
nickname=c_info.get('nickname'),
|
||||
color=c_info.get('color'),
|
||||
emoji=c_info.get('emoji'),
|
||||
own_nickname=c_info.get('own_nickname'),
|
||||
photo=user['big_image_src'].get('uri'),
|
||||
message_count=thread.get('messages_count'),
|
||||
last_message_timestamp=last_message_timestamp
|
||||
)
|
||||
else:
|
||||
raise FBchatException('Unknown thread type: {}, with data: {}'.format(thread.get('thread_type'), thread))
|
||||
|
||||
def graphql_to_group(group):
|
||||
if group.get('image') is None:
|
||||
group['image'] = {}
|
||||
c_info = get_customization_info(group)
|
||||
last_message_timestamp = None
|
||||
if 'last_message' in group:
|
||||
last_message_timestamp = group['last_message']['nodes'][0]['timestamp_precise']
|
||||
return Group(
|
||||
group['thread_key']['thread_fbid'],
|
||||
participants=[node['messaging_actor']['id'] for node in group['all_participants']['nodes']],
|
||||
participants=set([node['messaging_actor']['id'] for node in group['all_participants']['nodes']]),
|
||||
nicknames=c_info.get('nicknames'),
|
||||
color=c_info.get('color'),
|
||||
emoji=c_info.get('emoji'),
|
||||
photo=group['image'].get('uri'),
|
||||
name=group.get('name')
|
||||
name=group.get('name'),
|
||||
message_count=group.get('messages_count'),
|
||||
last_message_timestamp=last_message_timestamp
|
||||
)
|
||||
|
||||
def graphql_to_room(room):
|
||||
if room.get('image') is None:
|
||||
room['image'] = {}
|
||||
c_info = get_customization_info(room)
|
||||
return Room(
|
||||
room['thread_key']['thread_fbid'],
|
||||
participants=set([node['messaging_actor']['id'] for node in room['all_participants']['nodes']]),
|
||||
nicknames=c_info.get('nicknames'),
|
||||
color=c_info.get('color'),
|
||||
emoji=c_info.get('emoji'),
|
||||
photo=room['image'].get('uri'),
|
||||
name=room.get('name'),
|
||||
message_count=room.get('messages_count'),
|
||||
admins = set([node.get('id') for node in room.get('thread_admins')]),
|
||||
approval_mode = bool(room.get('approval_mode')),
|
||||
approval_requests = set(node.get('id') for node in room['thread_queue_metadata'].get('approval_requests', {}).get('nodes')),
|
||||
join_link = room['joinable_mode'].get('link'),
|
||||
privacy_mode = bool(room.get('privacy_mode')),
|
||||
)
|
||||
|
||||
def graphql_to_page(page):
|
||||
@@ -79,7 +255,8 @@ def graphql_to_page(page):
|
||||
city=page.get('city').get('name'),
|
||||
category=page.get('category_type'),
|
||||
photo=page['profile_picture'].get('uri'),
|
||||
name=page.get('name')
|
||||
name=page.get('name'),
|
||||
message_count=page.get('messages_count')
|
||||
)
|
||||
|
||||
def graphql_queries_to_json(*queries):
|
||||
@@ -92,7 +269,11 @@ def graphql_queries_to_json(*queries):
|
||||
return json.dumps(rtn)
|
||||
|
||||
def graphql_response_to_json(content):
|
||||
j = json.loads(content, cls=ConcatJSONDecoder)
|
||||
content = strip_to_json(content) # Usually only needed in some error cases
|
||||
try:
|
||||
j = json.loads(content, cls=ConcatJSONDecoder)
|
||||
except Exception:
|
||||
raise FBchatException('Error while parsing JSON: {}'.format(repr(content)))
|
||||
|
||||
rtn = [None]*(len(j))
|
||||
for x in j:
|
||||
@@ -112,7 +293,9 @@ def graphql_response_to_json(content):
|
||||
return rtn
|
||||
|
||||
class GraphQL(object):
|
||||
def __init__(self, query=None, doc_id=None, params={}):
|
||||
def __init__(self, query=None, doc_id=None, params=None):
|
||||
if params is None:
|
||||
params = {}
|
||||
if query is not None:
|
||||
self.value = {
|
||||
'priority': 0,
|
||||
@@ -125,7 +308,7 @@ class GraphQL(object):
|
||||
'query_params': params
|
||||
}
|
||||
else:
|
||||
raise Exception('A query or doc_id must be specified')
|
||||
raise FBchatUserError('A query or doc_id must be specified')
|
||||
|
||||
|
||||
FRAGMENT_USER = """
|
||||
@@ -160,6 +343,14 @@ class GraphQL(object):
|
||||
id
|
||||
}
|
||||
}
|
||||
},
|
||||
customization_info {
|
||||
participant_customizations {
|
||||
participant_id,
|
||||
nickname
|
||||
},
|
||||
outgoing_bubble_color,
|
||||
emoji
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
429
fbchat/models.py
429
fbchat/models.py
@@ -4,22 +4,47 @@ from __future__ import unicode_literals
|
||||
import enum
|
||||
|
||||
|
||||
class FBchatException(Exception):
|
||||
"""Custom exception thrown by fbchat. All exceptions in the fbchat module inherits this"""
|
||||
|
||||
class FBchatFacebookError(FBchatException):
|
||||
#: The error code that Facebook returned
|
||||
fb_error_code = None
|
||||
#: The error message that Facebook returned (In the user's own language)
|
||||
fb_error_message = None
|
||||
#: The status code that was sent in the http response (eg. 404) (Usually only set if not successful, aka. not 200)
|
||||
request_status_code = None
|
||||
def __init__(self, message, fb_error_code=None, fb_error_message=None, request_status_code=None):
|
||||
super(FBchatFacebookError, self).__init__(message)
|
||||
"""Thrown by fbchat when Facebook returns an error"""
|
||||
self.fb_error_code = str(fb_error_code)
|
||||
self.fb_error_message = fb_error_message
|
||||
self.request_status_code = request_status_code
|
||||
|
||||
class FBchatUserError(FBchatException):
|
||||
"""Thrown by fbchat when wrong values are entered"""
|
||||
|
||||
class Thread(object):
|
||||
#: The unique identifier of the thread. Can be used a `thread_id`. See :ref:`intro_threads` for more info
|
||||
uid = str
|
||||
uid = None
|
||||
#: Specifies the type of thread. Can be used a `thread_type`. See :ref:`intro_threads` for more info
|
||||
type = None
|
||||
#: The thread's picture
|
||||
photo = str
|
||||
#: A url to the thread's picture
|
||||
photo = None
|
||||
#: The name of the thread
|
||||
name = str
|
||||
|
||||
def __init__(self, _type, uid, photo=None, name=None):
|
||||
name = None
|
||||
#: Timestamp of last message
|
||||
last_message_timestamp = None
|
||||
#: Number of messages in the thread
|
||||
message_count = None
|
||||
def __init__(self, _type, uid, photo=None, name=None, last_message_timestamp=None, message_count=None):
|
||||
"""Represents a Facebook thread"""
|
||||
self.uid = str(uid)
|
||||
self.type = _type
|
||||
self.photo = photo
|
||||
self.name = name
|
||||
self.last_message_timestamp = last_message_timestamp
|
||||
self.message_count = message_count
|
||||
|
||||
def __repr__(self):
|
||||
return self.__unicode__()
|
||||
@@ -30,19 +55,27 @@ class Thread(object):
|
||||
|
||||
class User(Thread):
|
||||
#: The profile url
|
||||
url = str
|
||||
url = None
|
||||
#: The users first name
|
||||
first_name = str
|
||||
first_name = None
|
||||
#: The users last name
|
||||
last_name = str
|
||||
last_name = None
|
||||
#: Whether the user and the client are friends
|
||||
is_friend = bool
|
||||
is_friend = None
|
||||
#: The user's gender
|
||||
gender = str
|
||||
gender = None
|
||||
#: From 0 to 1. How close the client is to the user
|
||||
affinity = float
|
||||
affinity = None
|
||||
#: The user's nickname
|
||||
nickname = None
|
||||
#: The clients nickname, as seen by the user
|
||||
own_nickname = None
|
||||
#: A :class:`ThreadColor`. The message color
|
||||
color = None
|
||||
#: The default emoji
|
||||
emoji = None
|
||||
|
||||
def __init__(self, uid, url=None, first_name=None, last_name=None, is_friend=None, gender=None, affinity=None, **kwargs):
|
||||
def __init__(self, uid, url=None, first_name=None, last_name=None, is_friend=None, gender=None, affinity=None, nickname=None, own_nickname=None, color=None, emoji=None, **kwargs):
|
||||
"""Represents a Facebook user. Inherits `Thread`"""
|
||||
super(User, self).__init__(ThreadType.USER, uid, **kwargs)
|
||||
self.url = url
|
||||
@@ -51,29 +84,73 @@ class User(Thread):
|
||||
self.is_friend = is_friend
|
||||
self.gender = gender
|
||||
self.affinity = affinity
|
||||
self.nickname = nickname
|
||||
self.own_nickname = own_nickname
|
||||
self.color = color
|
||||
self.emoji = emoji
|
||||
|
||||
|
||||
class Group(Thread):
|
||||
#: List of the group thread's participant user IDs
|
||||
participants = list
|
||||
#: Unique list (set) of the group thread's participant user IDs
|
||||
participants = None
|
||||
#: A dict, containing user nicknames mapped to their IDs
|
||||
nicknames = None
|
||||
#: A :class:`ThreadColor`. The groups's message color
|
||||
color = None
|
||||
#: The groups's default emoji
|
||||
emoji = None
|
||||
|
||||
def __init__(self, uid, participants=[], **kwargs):
|
||||
def __init__(self, uid, participants=None, nicknames=None, color=None, emoji=None, **kwargs):
|
||||
"""Represents a Facebook group. Inherits `Thread`"""
|
||||
super(Group, self).__init__(ThreadType.GROUP, uid, **kwargs)
|
||||
if participants is None:
|
||||
participants = set()
|
||||
self.participants = participants
|
||||
if nicknames is None:
|
||||
nicknames = []
|
||||
self.nicknames = nicknames
|
||||
self.color = color
|
||||
self.emoji = emoji
|
||||
|
||||
|
||||
class Room(Group):
|
||||
# Set containing user IDs of thread admins
|
||||
admins = None
|
||||
# True if users need approval to join
|
||||
approval_mode = None
|
||||
# Set containing user IDs requesting to join
|
||||
approval_requests = None
|
||||
# Link for joining room
|
||||
join_link = None
|
||||
# True is room is not discoverable
|
||||
privacy_mode = None
|
||||
|
||||
def __init__(self, uid, admins=None, approval_mode=None, approval_requests=None, join_link=None, privacy_mode=None, **kwargs):
|
||||
"""Represents a Facebook room. Inherits `Group`"""
|
||||
super(Room, self).__init__(uid, **kwargs)
|
||||
self.type = ThreadType.ROOM
|
||||
if admins is None:
|
||||
admins = set()
|
||||
self.admins = admins
|
||||
self.approval_mode = approval_mode
|
||||
if approval_requests is None:
|
||||
approval_requests = set()
|
||||
self.approval_requests = approval_requests
|
||||
self.join_link = join_link
|
||||
self.privacy_mode = privacy_mode
|
||||
|
||||
|
||||
class Page(Thread):
|
||||
#: The page's custom url
|
||||
url = str
|
||||
url = None
|
||||
#: The name of the page's location city
|
||||
city = str
|
||||
city = None
|
||||
#: Amount of likes the page has
|
||||
likes = int
|
||||
likes = None
|
||||
#: Some extra information about the page
|
||||
sub_title = str
|
||||
sub_title = None
|
||||
#: The page's category
|
||||
category = str
|
||||
category = None
|
||||
|
||||
def __init__(self, uid, url=None, city=None, likes=None, sub_title=None, category=None, **kwargs):
|
||||
"""Represents a Facebook page. Inherits `Thread`"""
|
||||
@@ -86,52 +163,279 @@ class Page(Thread):
|
||||
|
||||
|
||||
class Message(object):
|
||||
#: The message ID
|
||||
uid = str
|
||||
#: ID of the sender
|
||||
author = int
|
||||
#: Timestamp of when the message was sent
|
||||
timestamp = str
|
||||
#: Whether the message is read
|
||||
is_read = bool
|
||||
#: A list of message reactions
|
||||
reactions = list
|
||||
#: The actual message
|
||||
text = str
|
||||
text = None
|
||||
#: A list of :class:`Mention` objects
|
||||
mentions = list
|
||||
#: An ID of a sent sticker
|
||||
sticker = str
|
||||
mentions = None
|
||||
#: A :class:`EmojiSize`. Size of a sent emoji
|
||||
emoji_size = None
|
||||
#: The message ID
|
||||
uid = None
|
||||
#: ID of the sender
|
||||
author = None
|
||||
#: Timestamp of when the message was sent
|
||||
timestamp = None
|
||||
#: Whether the message is read
|
||||
is_read = None
|
||||
#: A dict with user's IDs as keys, and their :class:`MessageReaction` as values
|
||||
reactions = None
|
||||
#: The actual message
|
||||
text = None
|
||||
#: A :class:`Sticker`
|
||||
sticker = None
|
||||
#: A list of attachments
|
||||
attachments = list
|
||||
attachments = None
|
||||
|
||||
def __init__(self, uid, author=None, timestamp=None, is_read=None, reactions=[], text=None, mentions=[], sticker=None, attachments=[]):
|
||||
def __init__(self, text=None, mentions=None, emoji_size=None, sticker=None, attachments=None):
|
||||
"""Represents a Facebook message"""
|
||||
self.uid = uid
|
||||
self.author = author
|
||||
self.timestamp = timestamp
|
||||
self.is_read = is_read
|
||||
self.reactions = reactions
|
||||
self.text = text
|
||||
if mentions is None:
|
||||
mentions = []
|
||||
self.mentions = mentions
|
||||
self.emoji_size = emoji_size
|
||||
self.sticker = sticker
|
||||
if attachments is None:
|
||||
attachments = []
|
||||
self.attachments = attachments
|
||||
self.reactions = {}
|
||||
|
||||
def __repr__(self):
|
||||
return self.__unicode__()
|
||||
|
||||
def __unicode__(self):
|
||||
return '<Message ({}): {}, mentions={} emoji_size={} attachments={}>'.format(self.uid, repr(self.text), self.mentions, self.emoji_size, self.attachments)
|
||||
|
||||
class Attachment(object):
|
||||
#: The attachment ID
|
||||
uid = None
|
||||
|
||||
def __init__(self, uid=None):
|
||||
"""Represents a Facebook attachment"""
|
||||
self.uid = uid
|
||||
|
||||
class Sticker(Attachment):
|
||||
#: The sticker-pack's ID
|
||||
pack = None
|
||||
#: Whether the sticker is animated
|
||||
is_animated = False
|
||||
|
||||
# If the sticker is animated, the following should be present
|
||||
#: URL to a medium spritemap
|
||||
medium_sprite_image = None
|
||||
#: URL to a large spritemap
|
||||
large_sprite_image = None
|
||||
#: The amount of frames present in the spritemap pr. row
|
||||
frames_per_row = None
|
||||
#: The amount of frames present in the spritemap pr. coloumn
|
||||
frames_per_col = None
|
||||
#: The frame rate the spritemap is intended to be played in
|
||||
frame_rate = None
|
||||
|
||||
#: URL to the sticker's image
|
||||
url = None
|
||||
#: Width of the sticker
|
||||
width = None
|
||||
#: Height of the sticker
|
||||
height = None
|
||||
#: The sticker's label/name
|
||||
label = None
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""Represents a Facebook sticker that has been sent to a Facebook thread as an attachment"""
|
||||
super(Sticker, self).__init__(*args, **kwargs)
|
||||
|
||||
class ShareAttachment(Attachment):
|
||||
def __init__(self, **kwargs):
|
||||
"""Represents a shared item (eg. URL) that has been sent as a Facebook attachment - *Currently Incomplete!*"""
|
||||
super(ShareAttachment, self).__init__(**kwargs)
|
||||
|
||||
class FileAttachment(Attachment):
|
||||
#: Url where you can download the file
|
||||
url = None
|
||||
#: Size of the file in bytes
|
||||
size = None
|
||||
#: Name of the file
|
||||
name = None
|
||||
#: Whether Facebook determines that this file may be harmful
|
||||
is_malicious = None
|
||||
|
||||
def __init__(self, url=None, size=None, name=None, is_malicious=None, **kwargs):
|
||||
"""Represents a file that has been sent as a Facebook attachment"""
|
||||
super(FileAttachment, self).__init__(**kwargs)
|
||||
self.url = url
|
||||
self.size = size
|
||||
self.name = name
|
||||
self.is_malicious = is_malicious
|
||||
|
||||
class AudioAttachment(Attachment):
|
||||
#: Name of the file
|
||||
filename = None
|
||||
#: Url of the audio file
|
||||
url = None
|
||||
#: Duration of the audioclip in milliseconds
|
||||
duration = None
|
||||
#: Audio type
|
||||
audio_type = None
|
||||
|
||||
def __init__(self, filename=None, url=None, duration=None, audio_type=None, **kwargs):
|
||||
"""Represents an audio file that has been sent as a Facebook attachment"""
|
||||
super(AudioAttachment, self).__init__(**kwargs)
|
||||
self.filename = filename
|
||||
self.url = url
|
||||
self.duration = duration
|
||||
self.audio_type = audio_type
|
||||
|
||||
class ImageAttachment(Attachment):
|
||||
#: The extension of the original image (eg. 'png')
|
||||
original_extension = None
|
||||
#: Width of original image
|
||||
width = None
|
||||
#: Height of original image
|
||||
height = None
|
||||
|
||||
#: Whether the image is animated
|
||||
is_animated = None
|
||||
|
||||
#: URL to a thumbnail of the image
|
||||
thumbnail_url = None
|
||||
|
||||
#: URL to a medium preview of the image
|
||||
preview_url = None
|
||||
#: Width of the medium preview image
|
||||
preview_width = None
|
||||
#: Height of the medium preview image
|
||||
preview_height = None
|
||||
|
||||
#: URL to a large preview of the image
|
||||
large_preview_url = None
|
||||
#: Width of the large preview image
|
||||
large_preview_width = None
|
||||
#: Height of the large preview image
|
||||
large_preview_height = None
|
||||
|
||||
#: URL to an animated preview of the image (eg. for gifs)
|
||||
animated_preview_url = None
|
||||
#: Width of the animated preview image
|
||||
animated_preview_width = None
|
||||
#: Height of the animated preview image
|
||||
animated_preview_height = None
|
||||
|
||||
def __init__(self, original_extension=None, width=None, height=None, is_animated=None, thumbnail_url=None, preview=None, large_preview=None, animated_preview=None, **kwargs):
|
||||
"""
|
||||
Represents an image that has been sent as a Facebook attachment
|
||||
To retrieve the full image url, use: :func:`fbchat.Client.fetchImageUrl`,
|
||||
and pass it the uid of the image attachment
|
||||
"""
|
||||
super(ImageAttachment, self).__init__(**kwargs)
|
||||
self.original_extension = original_extension
|
||||
if width is not None:
|
||||
width = int(width)
|
||||
self.width = width
|
||||
if height is not None:
|
||||
height = int(height)
|
||||
self.height = height
|
||||
self.is_animated = is_animated
|
||||
self.thumbnail_url = thumbnail_url
|
||||
|
||||
if preview is None:
|
||||
preview = {}
|
||||
self.preview_url = preview.get('uri')
|
||||
self.preview_width = preview.get('width')
|
||||
self.preview_height = preview.get('height')
|
||||
|
||||
if large_preview is None:
|
||||
large_preview = {}
|
||||
self.large_preview_url = large_preview.get('uri')
|
||||
self.large_preview_width = large_preview.get('width')
|
||||
self.large_preview_height = large_preview.get('height')
|
||||
|
||||
if animated_preview is None:
|
||||
animated_preview = {}
|
||||
self.animated_preview_url = animated_preview.get('uri')
|
||||
self.animated_preview_width = animated_preview.get('width')
|
||||
self.animated_preview_height = animated_preview.get('height')
|
||||
|
||||
class VideoAttachment(Attachment):
|
||||
#: Size of the original video in bytes
|
||||
size = None
|
||||
#: Width of original video
|
||||
width = None
|
||||
#: Height of original video
|
||||
height = None
|
||||
#: Length of video in milliseconds
|
||||
duration = None
|
||||
#: URL to very compressed preview video
|
||||
preview_url = None
|
||||
|
||||
#: URL to a small preview image of the video
|
||||
small_image_url = None
|
||||
#: Width of the small preview image
|
||||
small_image_width = None
|
||||
#: Height of the small preview image
|
||||
small_image_height = None
|
||||
|
||||
#: URL to a medium preview image of the video
|
||||
medium_image_url = None
|
||||
#: Width of the medium preview image
|
||||
medium_image_width = None
|
||||
#: Height of the medium preview image
|
||||
medium_image_height = None
|
||||
|
||||
#: URL to a large preview image of the video
|
||||
large_image_url = None
|
||||
#: Width of the large preview image
|
||||
large_image_width = None
|
||||
#: Height of the large preview image
|
||||
large_image_height = None
|
||||
|
||||
def __init__(self, size=None, width=None, height=None, duration=None, preview_url=None, small_image=None, medium_image=None, large_image=None, **kwargs):
|
||||
"""Represents a video that has been sent as a Facebook attachment"""
|
||||
super(VideoAttachment, self).__init__(**kwargs)
|
||||
self.size = size
|
||||
self.width = width
|
||||
self.height = height
|
||||
self.duration = duration
|
||||
self.preview_url = preview_url
|
||||
|
||||
if small_image is None:
|
||||
small_image = {}
|
||||
self.small_image_url = small_image.get('uri')
|
||||
self.small_image_width = small_image.get('width')
|
||||
self.small_image_height = small_image.get('height')
|
||||
|
||||
if medium_image is None:
|
||||
medium_image = {}
|
||||
self.medium_image_url = medium_image.get('uri')
|
||||
self.medium_image_width = medium_image.get('width')
|
||||
self.medium_image_height = medium_image.get('height')
|
||||
|
||||
if large_image is None:
|
||||
large_image = {}
|
||||
self.large_image_url = large_image.get('uri')
|
||||
self.large_image_width = large_image.get('width')
|
||||
self.large_image_height = large_image.get('height')
|
||||
|
||||
|
||||
class Mention(object):
|
||||
#: The user ID the mention is pointing at
|
||||
user_id = str
|
||||
#: The thread ID the mention is pointing at
|
||||
thread_id = None
|
||||
#: The character where the mention starts
|
||||
offset = int
|
||||
offset = None
|
||||
#: The length of the mention
|
||||
length = int
|
||||
length = None
|
||||
|
||||
def __init__(self, user_id, offset=0, length=10):
|
||||
def __init__(self, thread_id, offset=0, length=10):
|
||||
"""Represents a @mention"""
|
||||
self.user_id = user_id
|
||||
self.thread_id = thread_id
|
||||
self.offset = offset
|
||||
self.length = length
|
||||
|
||||
def __repr__(self):
|
||||
return self.__unicode__()
|
||||
|
||||
def __unicode__(self):
|
||||
return '<Mention {}: offset={} length={}>'.format(self.thread_id, self.offset, self.length)
|
||||
|
||||
class Enum(enum.Enum):
|
||||
"""Used internally by fbchat to support enumerations"""
|
||||
def __repr__(self):
|
||||
@@ -143,6 +447,14 @@ class ThreadType(Enum):
|
||||
USER = 1
|
||||
GROUP = 2
|
||||
PAGE = 3
|
||||
ROOM = 4
|
||||
|
||||
class ThreadLocation(Enum):
|
||||
"""Used to specify where a thread is located (inbox, pending, archived, other)."""
|
||||
INBOX = 'INBOX'
|
||||
PENDING = 'PENDING'
|
||||
ARCHIVED = 'ARCHIVED'
|
||||
OTHER = 'OTHER'
|
||||
|
||||
class TypingStatus(Enum):
|
||||
"""Used to specify whether the user is typing or has stopped typing"""
|
||||
@@ -157,7 +469,7 @@ class EmojiSize(Enum):
|
||||
|
||||
class ThreadColor(Enum):
|
||||
"""Used to specify a thread colors"""
|
||||
MESSENGER_BLUE = ''
|
||||
MESSENGER_BLUE = '#0084ff'
|
||||
VIKING = '#44bec7'
|
||||
GOLDEN_POPPY = '#ffc300'
|
||||
RADICAL_RED = '#fa3c4c'
|
||||
@@ -182,22 +494,3 @@ class MessageReaction(Enum):
|
||||
ANGRY = '😠'
|
||||
YES = '👍'
|
||||
NO = '👎'
|
||||
|
||||
LIKES = {
|
||||
'large': EmojiSize.LARGE,
|
||||
'medium': EmojiSize.MEDIUM,
|
||||
'small': EmojiSize.SMALL,
|
||||
'l': EmojiSize.LARGE,
|
||||
'm': EmojiSize.MEDIUM,
|
||||
's': EmojiSize.SMALL
|
||||
}
|
||||
|
||||
MessageReactionFix = {
|
||||
'😍': ('0001f60d', '%F0%9F%98%8D'),
|
||||
'😆': ('0001f606', '%F0%9F%98%86'),
|
||||
'😮': ('0001f62e', '%F0%9F%98%AE'),
|
||||
'😢': ('0001f622', '%F0%9F%98%A2'),
|
||||
'😠': ('0001f620', '%F0%9F%98%A0'),
|
||||
'👍': ('0001f44d', '%F0%9F%91%8D'),
|
||||
'👎': ('0001f44e', '%F0%9F%91%8E')
|
||||
}
|
||||
|
122
fbchat/utils.py
122
fbchat/utils.py
@@ -9,6 +9,13 @@ import warnings
|
||||
import logging
|
||||
from .models import *
|
||||
|
||||
try:
|
||||
from urllib.parse import urlencode
|
||||
basestring = (str, bytes)
|
||||
except ImportError:
|
||||
from urllib import urlencode
|
||||
basestring = basestring
|
||||
|
||||
# Python 2's `input` executes the input, whereas `raw_input` just returns the input
|
||||
try:
|
||||
input = raw_input
|
||||
@@ -32,12 +39,26 @@ USER_AGENTS = [
|
||||
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6"
|
||||
]
|
||||
|
||||
TYPES = {
|
||||
'Page': ThreadType.PAGE,
|
||||
'User': ThreadType.USER,
|
||||
'Group': ThreadType.GROUP
|
||||
LIKES = {
|
||||
'large': EmojiSize.LARGE,
|
||||
'medium': EmojiSize.MEDIUM,
|
||||
'small': EmojiSize.SMALL,
|
||||
'l': EmojiSize.LARGE,
|
||||
'm': EmojiSize.MEDIUM,
|
||||
's': EmojiSize.SMALL
|
||||
}
|
||||
|
||||
MessageReactionFix = {
|
||||
'😍': ('0001f60d', '%F0%9F%98%8D'),
|
||||
'😆': ('0001f606', '%F0%9F%98%86'),
|
||||
'😮': ('0001f62e', '%F0%9F%98%AE'),
|
||||
'😢': ('0001f622', '%F0%9F%98%A2'),
|
||||
'😠': ('0001f620', '%F0%9F%98%A0'),
|
||||
'👍': ('0001f44d', '%F0%9F%91%8D'),
|
||||
'👎': ('0001f44e', '%F0%9F%91%8E')
|
||||
}
|
||||
|
||||
|
||||
GENDERS = {
|
||||
# For standard requests
|
||||
0: 'unknown',
|
||||
@@ -54,19 +75,18 @@ GENDERS = {
|
||||
11: 'unknown_plural',
|
||||
|
||||
# For graphql requests
|
||||
#'': 'unknown',
|
||||
'UNKNOWN': 'unknown',
|
||||
'FEMALE': 'female_singular',
|
||||
'MALE': 'male_singular',
|
||||
#'': 'female_singular_guess',
|
||||
#'': 'male_singular_guess',
|
||||
#'': 'mixed',
|
||||
#'': 'neuter_singular',
|
||||
'NEUTER': 'neuter_singular',
|
||||
#'': 'unknown_singular',
|
||||
#'': 'female_plural',
|
||||
#'': 'male_plural',
|
||||
#'': 'neuter_plural',
|
||||
#'': 'unknown_plural',
|
||||
None: None
|
||||
}
|
||||
|
||||
class ReqUrl(object):
|
||||
@@ -74,7 +94,8 @@ class ReqUrl(object):
|
||||
SEARCH = "https://www.facebook.com/ajax/typeahead/search.php"
|
||||
LOGIN = "https://m.facebook.com/login.php?login_attempt=1"
|
||||
SEND = "https://www.facebook.com/messaging/send/"
|
||||
THREAD_SYNC = "https://www.facebook.com/ajax/mercury/thread_sync.php"
|
||||
UNREAD_THREADS = "https://www.facebook.com/ajax/mercury/unread_threads.php"
|
||||
UNSEEN_THREADS = "https://www.facebook.com/mercury/unseen_thread_ids/"
|
||||
THREADS = "https://www.facebook.com/ajax/mercury/threadlist_info.php"
|
||||
MESSAGES = "https://www.facebook.com/ajax/mercury/thread_info.php"
|
||||
READ_STATUS = "https://www.facebook.com/ajax/mercury/change_read_status.php"
|
||||
@@ -98,6 +119,19 @@ class ReqUrl(object):
|
||||
MESSAGE_REACTION = "https://www.facebook.com/webgraphql/mutation"
|
||||
TYPING = "https://www.facebook.com/ajax/messaging/typ.php"
|
||||
GRAPHQL = "https://www.facebook.com/api/graphqlbatch/"
|
||||
ATTACHMENT_PHOTO = "https://www.facebook.com/mercury/attachments/photo/"
|
||||
EVENT_REMINDER = "https://www.facebook.com/ajax/eventreminder/create"
|
||||
MODERN_SETTINGS_MENU = "https://www.facebook.com/bluebar/modern_settings_menu/"
|
||||
|
||||
pull_channel = 0
|
||||
|
||||
def change_pull_channel(self, channel=None):
|
||||
if channel is None:
|
||||
self.pull_channel = (self.pull_channel + 1) % 5 # Pull channel will be 0-4
|
||||
else:
|
||||
self.pull_channel = channel
|
||||
self.STICKY = "https://{}-edge-chat.facebook.com/pull".format(self.pull_channel)
|
||||
self.PING = "https://{}-edge-chat.facebook.com/active_ping".format(self.pull_channel)
|
||||
|
||||
|
||||
facebookEncoding = 'UTF-8'
|
||||
@@ -108,17 +142,20 @@ def now():
|
||||
def strip_to_json(text):
|
||||
try:
|
||||
return text[text.index('{'):]
|
||||
except ValueError as e:
|
||||
return None
|
||||
except ValueError:
|
||||
raise FBchatException('No JSON object found: {}, {}'.format(repr(text), text.index('{')))
|
||||
|
||||
def get_decoded(r):
|
||||
if not isinstance(r._content, str):
|
||||
return r._content.decode(facebookEncoding)
|
||||
else:
|
||||
return r._content
|
||||
def get_decoded_r(r):
|
||||
return get_decoded(r._content)
|
||||
|
||||
def get_decoded(content):
|
||||
return content.decode(facebookEncoding)
|
||||
|
||||
def parse_json(content):
|
||||
return json.loads(content)
|
||||
|
||||
def get_json(r):
|
||||
return json.loads(strip_to_json(get_decoded(r)))
|
||||
return json.loads(strip_to_json(get_decoded_r(r)))
|
||||
|
||||
def digitToChar(digit):
|
||||
if digit < 10:
|
||||
@@ -149,30 +186,51 @@ def generateOfflineThreadingID():
|
||||
return str(int(msgs, 2))
|
||||
|
||||
def check_json(j):
|
||||
if 'error' in j and j['error'] is not None:
|
||||
if 'errorDescription' in j:
|
||||
# 'errorDescription' is in the users own language!
|
||||
raise Exception('Error #{} when sending request: {}'.format(j['error'], j['errorDescription']))
|
||||
elif 'debug_info' in j['error']:
|
||||
raise Exception('Error #{} when sending request: {}'.format(j['error']['code'], repr(j['error']['debug_info'])))
|
||||
else:
|
||||
raise Exception('Error {} when sending request'.format(j['error']))
|
||||
if j.get('error') is None:
|
||||
return
|
||||
if 'errorDescription' in j:
|
||||
# 'errorDescription' is in the users own language!
|
||||
raise FBchatFacebookError('Error #{} when sending request: {}'.format(j['error'], j['errorDescription']), fb_error_code=j['error'], fb_error_message=j['errorDescription'])
|
||||
elif 'debug_info' in j['error'] and 'code' in j['error']:
|
||||
raise FBchatFacebookError('Error #{} when sending request: {}'.format(j['error']['code'], repr(j['error']['debug_info'])), fb_error_code=j['error']['code'], fb_error_message=j['error']['debug_info'])
|
||||
else:
|
||||
raise FBchatFacebookError('Error {} when sending request'.format(j['error']), fb_error_code=j['error'])
|
||||
|
||||
def checkRequest(r, do_json_check=True):
|
||||
def check_request(r, as_json=True):
|
||||
if not r.ok:
|
||||
raise Exception('Error when sending request: Got {} response'.format(r.status_code))
|
||||
raise FBchatFacebookError('Error when sending request: Got {} response'.format(r.status_code), request_status_code=r.status_code)
|
||||
|
||||
content = get_decoded(r)
|
||||
content = get_decoded_r(r)
|
||||
|
||||
if content is None or len(content) == 0:
|
||||
raise Exception('Error when sending request: Got empty response')
|
||||
raise FBchatFacebookError('Error when sending request: Got empty response')
|
||||
|
||||
if do_json_check:
|
||||
if as_json:
|
||||
content = strip_to_json(content)
|
||||
try:
|
||||
j = json.loads(strip_to_json(content))
|
||||
except Exception as e:
|
||||
raise Exception('Error while parsing JSON: {}'.format(repr(content)))
|
||||
j = json.loads(content)
|
||||
except ValueError:
|
||||
raise FBchatFacebookError('Error while parsing JSON: {}'.format(repr(content)))
|
||||
check_json(j)
|
||||
return j
|
||||
else:
|
||||
return content
|
||||
|
||||
def get_jsmods_require(j, index):
|
||||
if j.get('jsmods') and j['jsmods'].get('require'):
|
||||
try:
|
||||
return j['jsmods']['require'][0][index][0]
|
||||
except (KeyError, IndexError) as e:
|
||||
log.warning('Error when getting jsmods_require: {}. Facebook might have changed protocol'.format(j))
|
||||
return None
|
||||
|
||||
def get_emojisize_from_tags(tags):
|
||||
if tags is None:
|
||||
return None
|
||||
tmp = [tag for tag in tags if tag.startswith('hot_emoji_size:')]
|
||||
if len(tmp) > 0:
|
||||
try:
|
||||
return LIKES[tmp[0].split(':')[1]]
|
||||
except (KeyError, IndexError):
|
||||
log.exception('Could not determine emoji size from {} - {}'.format(tags, tmp))
|
||||
return None
|
||||
|
6
pytest.ini
Normal file
6
pytest.ini
Normal file
@@ -0,0 +1,6 @@
|
||||
[pytest]
|
||||
xfail_strict=true
|
||||
markers =
|
||||
offline: Offline tests, aka. tests that can be executed without the need of a client
|
||||
expensive: Expensive tests, which should be executed sparingly
|
||||
addopts = -m "not expensive"
|
@@ -1,3 +1,5 @@
|
||||
requests
|
||||
lxml
|
||||
beautifulsoup4
|
||||
enum34; python_version < '3.4'
|
||||
six
|
||||
|
19
setup.py
19
setup.py
@@ -4,22 +4,24 @@
|
||||
"""
|
||||
Setup script for fbchat
|
||||
"""
|
||||
|
||||
|
||||
import os
|
||||
try:
|
||||
from setuptools import setup
|
||||
except ImportError:
|
||||
from distutils.core import setup
|
||||
|
||||
|
||||
with open('README.rst') as f:
|
||||
readme_content = f.read().strip()
|
||||
|
||||
try:
|
||||
requirements = [line.rstrip('\n') for line in open('fbchat.egg-info/requires.txt')]
|
||||
except FileNotFoundError:
|
||||
requirements = [line.rstrip('\n') for line in open('requirements.txt')]
|
||||
requirements = [
|
||||
'requests',
|
||||
'lxml',
|
||||
'beautifulsoup4'
|
||||
]
|
||||
|
||||
extras_requirements = {
|
||||
':python_version < "3.4"': ['enum34']
|
||||
}
|
||||
|
||||
version = None
|
||||
author = None
|
||||
@@ -52,10 +54,8 @@ setup(
|
||||
classifiers=[
|
||||
'Development Status :: 2 - Pre-Alpha',
|
||||
'Intended Audience :: Developers',
|
||||
'Intended Audience :: Developers',
|
||||
'Intended Audience :: Information Technology',
|
||||
'License :: OSI Approved :: BSD License',
|
||||
'License :: OSI Approved :: BSD License',
|
||||
'Operating System :: OS Independent',
|
||||
'Programming Language :: Python :: 2.6',
|
||||
'Programming Language :: Python :: 2.7',
|
||||
@@ -75,6 +75,7 @@ setup(
|
||||
include_package_data=True,
|
||||
packages=['fbchat'],
|
||||
install_requires=requirements,
|
||||
extras_require=extras_requirements,
|
||||
url=source,
|
||||
version=version,
|
||||
zip_safe=True,
|
||||
|
231
tests.py
231
tests.py
@@ -1,231 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: UTF-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
import json
|
||||
import logging
|
||||
import unittest
|
||||
from getpass import getpass
|
||||
from sys import argv
|
||||
from os import path, chdir
|
||||
from glob import glob
|
||||
from fbchat import Client
|
||||
from fbchat.models import *
|
||||
import py_compile
|
||||
|
||||
logging_level = logging.ERROR
|
||||
|
||||
"""
|
||||
|
||||
Testing script for `fbchat`.
|
||||
Full documentation on https://fbchat.readthedocs.io/
|
||||
|
||||
"""
|
||||
|
||||
class CustomClient(Client):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.got_qprimer = False
|
||||
super(type(self), self).__init__(*args, **kwargs)
|
||||
|
||||
def onQprimer(self, msg, **kwargs):
|
||||
self.got_qprimer = True
|
||||
|
||||
class TestFbchat(unittest.TestCase):
|
||||
def test_examples(self):
|
||||
# Checks for syntax errors in the examples
|
||||
chdir('examples')
|
||||
for f in glob('*.txt'):
|
||||
print(f)
|
||||
with self.assertRaises(py_compile.PyCompileError):
|
||||
py_compile.compile(f)
|
||||
|
||||
chdir('..')
|
||||
|
||||
def test_loginFunctions(self):
|
||||
self.assertTrue(client.isLoggedIn())
|
||||
|
||||
client.logout()
|
||||
|
||||
self.assertFalse(client.isLoggedIn())
|
||||
|
||||
with self.assertRaises(Exception):
|
||||
client.login('<email>', '<password>', max_tries=1)
|
||||
|
||||
client.login(email, password)
|
||||
|
||||
self.assertTrue(client.isLoggedIn())
|
||||
|
||||
def test_sessions(self):
|
||||
global client
|
||||
session_cookies = client.getSession()
|
||||
client = CustomClient(email, password, session_cookies=session_cookies, logging_level=logging_level)
|
||||
|
||||
self.assertTrue(client.isLoggedIn())
|
||||
|
||||
def test_defaultThread(self):
|
||||
# setDefaultThread
|
||||
client.setDefaultThread(group_id, ThreadType.GROUP)
|
||||
self.assertTrue(client.sendMessage('test_default_recipient★'))
|
||||
|
||||
client.setDefaultThread(user_id, ThreadType.USER)
|
||||
self.assertTrue(client.sendMessage('test_default_recipient★'))
|
||||
|
||||
# resetDefaultThread
|
||||
client.resetDefaultThread()
|
||||
with self.assertRaises(ValueError):
|
||||
client.sendMessage('should_not_send')
|
||||
|
||||
def test_fetchAllUsers(self):
|
||||
users = client.fetchAllUsers()
|
||||
self.assertGreater(len(users), 0)
|
||||
|
||||
def test_searchForUsers(self):
|
||||
users = client.searchForUsers('Mark Zuckerberg')
|
||||
self.assertGreater(len(users), 0)
|
||||
|
||||
u = users[0]
|
||||
|
||||
# Test if values are set correctly
|
||||
self.assertEqual(u.uid, '4')
|
||||
self.assertEqual(u.type, ThreadType.USER)
|
||||
self.assertEqual(u.photo[:4], 'http')
|
||||
self.assertEqual(u.url[:4], 'http')
|
||||
self.assertEqual(u.name, 'Mark Zuckerberg')
|
||||
|
||||
def test_sendEmoji(self):
|
||||
self.assertIsNotNone(client.sendEmoji(size=EmojiSize.SMALL, thread_id=user_id, thread_type=ThreadType.USER))
|
||||
self.assertIsNotNone(client.sendEmoji(size=EmojiSize.MEDIUM, thread_id=user_id, thread_type=ThreadType.USER))
|
||||
self.assertIsNotNone(client.sendEmoji('😆', EmojiSize.LARGE, user_id, ThreadType.USER))
|
||||
|
||||
self.assertIsNotNone(client.sendEmoji(size=EmojiSize.SMALL, thread_id=group_id, thread_type=ThreadType.GROUP))
|
||||
self.assertIsNotNone(client.sendEmoji(size=EmojiSize.MEDIUM, thread_id=group_id, thread_type=ThreadType.GROUP))
|
||||
self.assertIsNotNone(client.sendEmoji('😆', EmojiSize.LARGE, group_id, ThreadType.GROUP))
|
||||
|
||||
def test_sendMessage(self):
|
||||
self.assertIsNotNone(client.sendMessage('test_send_user★', user_id, ThreadType.USER))
|
||||
self.assertIsNotNone(client.sendMessage('test_send_group★', group_id, ThreadType.GROUP))
|
||||
with self.assertRaises(Exception):
|
||||
client.sendMessage('test_send_user_should_fail★', user_id, ThreadType.GROUP)
|
||||
with self.assertRaises(Exception):
|
||||
client.sendMessage('test_send_group_should_fail★', group_id, ThreadType.USER)
|
||||
|
||||
def test_sendImages(self):
|
||||
image_url = 'https://cdn4.iconfinder.com/data/icons/ionicons/512/icon-image-128.png'
|
||||
image_local_url = path.join(path.dirname(__file__), 'tests/image.png')
|
||||
self.assertTrue(client.sendRemoteImage(image_url, 'test_send_user_images_remote★', user_id, ThreadType.USER))
|
||||
self.assertTrue(client.sendRemoteImage(image_url, 'test_send_group_images_remote★', group_id, ThreadType.GROUP))
|
||||
self.assertTrue(client.sendLocalImage(image_local_url, 'test_send_group_images_local★', user_id, ThreadType.USER))
|
||||
self.assertTrue(client.sendLocalImage(image_local_url, 'test_send_group_images_local★', group_id, ThreadType.GROUP))
|
||||
|
||||
def test_fetchThreadList(self):
|
||||
client.fetchThreadList(offset=0, limit=20)
|
||||
|
||||
def test_fetchThreadMessages(self):
|
||||
client.sendMessage('test_user_getThreadInfo★', thread_id=user_id, thread_type=ThreadType.USER)
|
||||
|
||||
messages = client.fetchThreadMessages(thread_id=user_id, limit=1)
|
||||
self.assertEqual(messages[0].author, client.uid)
|
||||
self.assertEqual(messages[0].text, 'test_user_getThreadInfo★')
|
||||
|
||||
client.sendMessage('test_group_getThreadInfo★', thread_id=group_id, thread_type=ThreadType.GROUP)
|
||||
|
||||
messages = client.fetchThreadMessages(thread_id=group_id, limit=1)
|
||||
self.assertEqual(messages[0].author, client.uid)
|
||||
self.assertEqual(messages[0].text, 'test_group_getThreadInfo★')
|
||||
|
||||
def test_listen(self):
|
||||
client.startListening()
|
||||
client.doOneListen()
|
||||
client.stopListening()
|
||||
|
||||
self.assertTrue(client.got_qprimer)
|
||||
|
||||
def test_fetchUserInfo(self):
|
||||
info = client.fetchUserInfo('4')['4']
|
||||
self.assertEqual(info.name, 'Mark Zuckerberg')
|
||||
|
||||
def test_removeAddFromGroup(self):
|
||||
client.removeUserFromGroup(user_id, thread_id=group_id)
|
||||
client.addUsersToGroup(user_id, thread_id=group_id)
|
||||
|
||||
def test_changeThreadTitle(self):
|
||||
client.changeThreadTitle('test_changeThreadTitle★', thread_id=group_id, thread_type=ThreadType.GROUP)
|
||||
client.changeThreadTitle('test_changeThreadTitle★', thread_id=user_id, thread_type=ThreadType.USER)
|
||||
|
||||
def test_changeNickname(self):
|
||||
client.changeNickname('test_changeNicknameSelf★', client.uid, thread_id=user_id, thread_type=ThreadType.USER)
|
||||
client.changeNickname('test_changeNicknameOther★', user_id, thread_id=user_id, thread_type=ThreadType.USER)
|
||||
client.changeNickname('test_changeNicknameSelf★', client.uid, thread_id=group_id, thread_type=ThreadType.GROUP)
|
||||
client.changeNickname('test_changeNicknameOther★', user_id, thread_id=group_id, thread_type=ThreadType.GROUP)
|
||||
|
||||
def test_changeThreadEmoji(self):
|
||||
client.changeThreadEmoji('😀', group_id)
|
||||
client.changeThreadEmoji('😀', user_id)
|
||||
client.changeThreadEmoji('😆', group_id)
|
||||
client.changeThreadEmoji('😆', user_id)
|
||||
|
||||
def test_changeThreadColor(self):
|
||||
client.changeThreadColor(ThreadColor.BRILLIANT_ROSE, group_id)
|
||||
client.changeThreadColor(ThreadColor.MESSENGER_BLUE, group_id)
|
||||
client.changeThreadColor(ThreadColor.BRILLIANT_ROSE, user_id)
|
||||
client.changeThreadColor(ThreadColor.MESSENGER_BLUE, user_id)
|
||||
|
||||
def test_reactToMessage(self):
|
||||
mid = client.sendMessage('test_reactToMessage★', user_id, ThreadType.USER)
|
||||
client.reactToMessage(mid, MessageReaction.LOVE)
|
||||
mid = client.sendMessage('test_reactToMessage★', group_id, ThreadType.GROUP)
|
||||
client.reactToMessage(mid, MessageReaction.LOVE)
|
||||
|
||||
def test_setTypingStatus(self):
|
||||
client.setTypingStatus(TypingStatus.TYPING, thread_id=user_id, thread_type=ThreadType.USER)
|
||||
client.setTypingStatus(TypingStatus.STOPPED, thread_id=user_id, thread_type=ThreadType.USER)
|
||||
client.setTypingStatus(TypingStatus.TYPING, thread_id=group_id, thread_type=ThreadType.GROUP)
|
||||
client.setTypingStatus(TypingStatus.STOPPED, thread_id=group_id, thread_type=ThreadType.GROUP)
|
||||
|
||||
|
||||
def start_test(param_client, param_group_id, param_user_id, tests=[]):
|
||||
global client
|
||||
global group_id
|
||||
global user_id
|
||||
|
||||
client = param_client
|
||||
group_id = param_group_id
|
||||
user_id = param_user_id
|
||||
|
||||
tests = ['test_' + test if 'test_' != test[:5] else test for test in tests]
|
||||
|
||||
if len(tests) == 0:
|
||||
suite = unittest.TestLoader().loadTestsFromTestCase(TestFbchat)
|
||||
else:
|
||||
suite = unittest.TestSuite(map(TestFbchat, tests))
|
||||
print('Starting test(s)')
|
||||
unittest.TextTestRunner(verbosity=2).run(suite)
|
||||
|
||||
|
||||
client = None
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Python 3 does not use raw_input, whereas Python 2 does
|
||||
try:
|
||||
input = raw_input
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
try:
|
||||
with open(path.join(path.dirname(__file__), 'tests/my_data.json'), 'r') as f:
|
||||
json = json.load(f)
|
||||
email = json['email']
|
||||
password = json['password']
|
||||
user_id = json['user_thread_id']
|
||||
group_id = json['group_thread_id']
|
||||
except (IOError, IndexError) as e:
|
||||
email = input('Email: ')
|
||||
password = getpass()
|
||||
group_id = input('Please enter a group thread id (To test group functionality): ')
|
||||
user_id = input('Please enter a user thread id (To test kicking/adding functionality): ')
|
||||
|
||||
print('Logging in...')
|
||||
client = CustomClient(email, password, logging_level=logging_level)
|
||||
|
||||
# Warning! Taking user input directly like this could be dangerous! Use only for testing purposes!
|
||||
start_test(client, group_id, user_id, argv[1:])
|
101
tests/conftest.py
Normal file
101
tests/conftest.py
Normal file
@@ -0,0 +1,101 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
import json
|
||||
|
||||
from utils import *
|
||||
from contextlib import contextmanager
|
||||
from fbchat.models import ThreadType
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def user(client2):
|
||||
return {"id": client2.uid, "type": ThreadType.USER}
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def group(pytestconfig):
|
||||
return {"id": load_variable("group_id", pytestconfig.cache), "type": ThreadType.GROUP}
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", params=["user", "group"])
|
||||
def thread(request, user, group):
|
||||
return user if request.param == "user" else group
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def client1(pytestconfig):
|
||||
with load_client(1, pytestconfig.cache) as c:
|
||||
yield c
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def client2(pytestconfig):
|
||||
with load_client(2, pytestconfig.cache) as c:
|
||||
yield c
|
||||
|
||||
|
||||
@pytest.fixture # (scope="session")
|
||||
def client(client1, thread):
|
||||
client1.setDefaultThread(thread["id"], thread["type"])
|
||||
yield client1
|
||||
client1.resetDefaultThread()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", params=["client1", "client2"])
|
||||
def client_all(request, client1, client2):
|
||||
return client1 if request.param == "client1" else client2
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def catch_event(client2):
|
||||
t = ClientThread(client2)
|
||||
t.start()
|
||||
|
||||
@contextmanager
|
||||
def inner(method_name):
|
||||
caught = CaughtValue()
|
||||
old_method = getattr(client2, method_name)
|
||||
|
||||
# Will be called by the other thread
|
||||
def catch_value(*args, **kwargs):
|
||||
old_method(*args, **kwargs)
|
||||
# Make sure the `set` is only called once
|
||||
if not caught.is_set():
|
||||
caught.set(kwargs)
|
||||
|
||||
setattr(client2, method_name, catch_value)
|
||||
yield caught
|
||||
caught.wait()
|
||||
if not caught.is_set():
|
||||
raise ValueError("The value could not be caught")
|
||||
setattr(client2, method_name, old_method)
|
||||
|
||||
yield inner
|
||||
|
||||
t.should_stop.set()
|
||||
|
||||
try:
|
||||
# Make the client send a messages to itself, so the blocking pull request will return
|
||||
# This is probably not safe, since the client is making two requests simultaneously
|
||||
client2.sendMessage("Shutdown", client2.uid)
|
||||
finally:
|
||||
t.join()
|
||||
|
||||
|
||||
@pytest.fixture # (scope="session")
|
||||
def compare(client, thread):
|
||||
def inner(caught_event, **kwargs):
|
||||
d = {
|
||||
"author_id": client.uid,
|
||||
"thread_id": client.uid
|
||||
if thread["type"] == ThreadType.USER
|
||||
else thread["id"],
|
||||
"thread_type": thread["type"],
|
||||
}
|
||||
d.update(kwargs)
|
||||
return subset(caught_event.res, **d)
|
||||
|
||||
return inner
|
55
tests/test_base.py
Normal file
55
tests/test_base.py
Normal file
@@ -0,0 +1,55 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
import py_compile
|
||||
|
||||
from glob import glob
|
||||
from os import path, environ
|
||||
from fbchat import Client
|
||||
from fbchat.models import FBchatUserError, Message
|
||||
|
||||
|
||||
@pytest.mark.offline
|
||||
def test_examples():
|
||||
# Compiles the examples, to check for syntax errors
|
||||
for name in glob(path.join(path.dirname(__file__), "../examples", "*.py")):
|
||||
py_compile.compile(name)
|
||||
|
||||
|
||||
@pytest.mark.trylast
|
||||
@pytest.mark.expensive
|
||||
def test_login(client1):
|
||||
assert client1.isLoggedIn()
|
||||
email = client1.email
|
||||
password = client1.password
|
||||
|
||||
client1.logout()
|
||||
|
||||
assert not client1.isLoggedIn()
|
||||
|
||||
with pytest.raises(FBchatUserError):
|
||||
client1.login("<invalid email>", "<invalid password>", max_tries=1)
|
||||
|
||||
client1.login(email, password)
|
||||
|
||||
assert client1.isLoggedIn()
|
||||
|
||||
|
||||
@pytest.mark.trylast
|
||||
def test_sessions(client1):
|
||||
session = client1.getSession()
|
||||
Client("no email needed", "no password needed", session_cookies=session)
|
||||
client1.setSession(session)
|
||||
assert client1.isLoggedIn()
|
||||
|
||||
|
||||
@pytest.mark.tryfirst
|
||||
def test_default_thread(client1, thread):
|
||||
client1.setDefaultThread(thread["id"], thread["type"])
|
||||
assert client1.send(Message(text="Sent to the specified thread"))
|
||||
|
||||
client1.resetDefaultThread()
|
||||
with pytest.raises(ValueError):
|
||||
client1.send(Message(text="Should not be sent"))
|
79
tests/test_fetch.py
Normal file
79
tests/test_fetch.py
Normal file
@@ -0,0 +1,79 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
|
||||
from os import path
|
||||
from fbchat.models import ThreadType, Message, Mention, EmojiSize, Sticker
|
||||
from utils import subset
|
||||
|
||||
|
||||
def test_fetch_all_users(client):
|
||||
users = client.fetchAllUsers()
|
||||
assert len(users) > 0
|
||||
|
||||
|
||||
def test_fetch_thread_list(client):
|
||||
threads = client.fetchThreadList(limit=2)
|
||||
assert len(threads) == 2
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"emoji, emoji_size",
|
||||
[
|
||||
("😆", EmojiSize.SMALL),
|
||||
("😆", EmojiSize.MEDIUM),
|
||||
("😆", EmojiSize.LARGE),
|
||||
# These fail because the emoji is made into a sticker
|
||||
# This should be fixed
|
||||
pytest.mark.xfail((None, EmojiSize.SMALL)),
|
||||
pytest.mark.xfail((None, EmojiSize.MEDIUM)),
|
||||
pytest.mark.xfail((None, EmojiSize.LARGE)),
|
||||
],
|
||||
)
|
||||
def test_fetch_message_emoji(client, emoji, emoji_size):
|
||||
mid = client.sendEmoji(emoji, emoji_size)
|
||||
message, = client.fetchThreadMessages(limit=1)
|
||||
|
||||
assert subset(
|
||||
vars(message), uid=mid, author=client.uid, text=emoji, emoji_size=emoji_size
|
||||
)
|
||||
|
||||
|
||||
def test_fetch_message_mentions(client):
|
||||
text = "This is a test of fetchThreadMessages"
|
||||
mentions = [Mention(client.uid, offset=10, length=4)]
|
||||
|
||||
mid = client.send(Message(text, mentions=mentions))
|
||||
message, = client.fetchThreadMessages(limit=1)
|
||||
|
||||
assert subset(vars(message), uid=mid, author=client.uid, text=text)
|
||||
for i, m in enumerate(mentions):
|
||||
assert vars(message.mentions[i]) == vars(m)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("sticker_id", ["767334476626295"])
|
||||
def test_fetch_message_sticker(client, sticker_id):
|
||||
mid = client.send(Message(sticker=Sticker(sticker_id)))
|
||||
message, = client.fetchThreadMessages(limit=1)
|
||||
|
||||
assert subset(vars(message), uid=mid, author=client.uid)
|
||||
assert subset(vars(message.sticker), uid=sticker_id)
|
||||
|
||||
|
||||
def test_fetch_info(client1, group):
|
||||
info = client1.fetchUserInfo("4")["4"]
|
||||
assert info.name == "Mark Zuckerberg"
|
||||
|
||||
info = client1.fetchGroupInfo(group["id"])[group["id"]]
|
||||
assert info.type == ThreadType.GROUP
|
||||
|
||||
|
||||
def test_fetch_image_url(client):
|
||||
url = path.join(path.dirname(__file__), "image.png")
|
||||
|
||||
client.sendLocalImage(url)
|
||||
message, = client.fetchThreadMessages(limit=1)
|
||||
|
||||
assert client.fetchImageUrl(message.attachments[0].uid)
|
12
tests/test_message_management.py
Normal file
12
tests/test_message_management.py
Normal file
@@ -0,0 +1,12 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
|
||||
from fbchat.models import Message, MessageReaction
|
||||
|
||||
|
||||
def test_set_reaction(client):
|
||||
mid = client.send(Message(text="This message will be reacted to"))
|
||||
client.reactToMessage(mid, MessageReaction.LOVE)
|
18
tests/test_search.py
Normal file
18
tests/test_search.py
Normal file
@@ -0,0 +1,18 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from fbchat.models import ThreadType
|
||||
|
||||
|
||||
def test_search_for(client1):
|
||||
users = client1.searchForUsers("Mark Zuckerberg")
|
||||
assert len(users) > 0
|
||||
|
||||
u = users[0]
|
||||
|
||||
assert u.uid == "4"
|
||||
assert u.type == ThreadType.USER
|
||||
assert u.photo[:4] == "http"
|
||||
assert u.url[:4] == "http"
|
||||
assert u.name == "Mark Zuckerberg"
|
110
tests/test_send.py
Normal file
110
tests/test_send.py
Normal file
@@ -0,0 +1,110 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
|
||||
from os import path
|
||||
from fbchat.models import Message, Mention, EmojiSize, FBchatFacebookError, Sticker
|
||||
from utils import subset
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"text",
|
||||
[
|
||||
"test_send",
|
||||
"😆",
|
||||
"\\\n\t%?&'\"",
|
||||
"ˁҭʚ¹Ʋջوװ՞ޱɣࠚԹБɑȑңКએ֭ʗыԈٌʼőԈ×௴nચϚࠖణٔє܅Ԇޑط",
|
||||
"a" * 20000, # Maximum amount of characters you can send
|
||||
],
|
||||
)
|
||||
def test_send_text(client, catch_event, compare, text):
|
||||
with catch_event("onMessage") as x:
|
||||
mid = client.sendMessage(text)
|
||||
|
||||
assert compare(x, mid=mid, message=text)
|
||||
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid, text=text)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"emoji, emoji_size",
|
||||
[
|
||||
("😆", EmojiSize.SMALL),
|
||||
("😆", EmojiSize.MEDIUM),
|
||||
("😆", EmojiSize.LARGE),
|
||||
# These fail because the emoji is made into a sticker
|
||||
# This should be fixed
|
||||
pytest.mark.xfail((None, EmojiSize.SMALL)),
|
||||
pytest.mark.xfail((None, EmojiSize.MEDIUM)),
|
||||
pytest.mark.xfail((None, EmojiSize.LARGE)),
|
||||
],
|
||||
)
|
||||
def test_send_emoji(client, catch_event, compare, emoji, emoji_size):
|
||||
with catch_event("onMessage") as x:
|
||||
mid = client.sendEmoji(emoji, emoji_size)
|
||||
|
||||
assert compare(x, mid=mid, message=emoji)
|
||||
assert subset(
|
||||
vars(x.res["message_object"]),
|
||||
uid=mid,
|
||||
author=client.uid,
|
||||
text=emoji,
|
||||
emoji_size=emoji_size,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.xfail(raises=FBchatFacebookError)
|
||||
@pytest.mark.parametrize("message", [Message("a" * 20001)])
|
||||
def test_send_invalid(client, message):
|
||||
client.send(message)
|
||||
|
||||
|
||||
def test_send_mentions(client, client2, thread, catch_event, compare):
|
||||
text = "Hi there @me, @other and @thread"
|
||||
mentions = [
|
||||
dict(thread_id=client.uid, offset=9, length=3),
|
||||
dict(thread_id=client2.uid, offset=14, length=6),
|
||||
dict(thread_id=thread["id"], offset=26, length=7),
|
||||
]
|
||||
with catch_event("onMessage") as x:
|
||||
mid = client.send(Message(text, mentions=[Mention(**d) for d in mentions]))
|
||||
|
||||
assert compare(x, mid=mid, message=text)
|
||||
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid, text=text)
|
||||
# The mentions are not ordered by offset
|
||||
for m in x.res["message_object"].mentions:
|
||||
assert vars(m) in mentions
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"sticker_id",
|
||||
["767334476626295", pytest.mark.xfail("0", raises=FBchatFacebookError)],
|
||||
)
|
||||
def test_send_sticker(client, catch_event, compare, sticker_id):
|
||||
with catch_event("onMessage") as x:
|
||||
mid = client.send(Message(sticker=Sticker(sticker_id)))
|
||||
|
||||
assert compare(x, mid=mid)
|
||||
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid)
|
||||
assert subset(vars(x.res["message_object"].sticker), uid=sticker_id)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"method_name, url",
|
||||
[
|
||||
(
|
||||
"sendRemoteImage",
|
||||
"https://github.com/carpedm20/fbchat/raw/master/tests/image.png",
|
||||
),
|
||||
("sendLocalImage", path.join(path.dirname(__file__), "image.png")),
|
||||
],
|
||||
)
|
||||
def test_send_images(client, catch_event, compare, method_name, url):
|
||||
text = "An image sent with {}".format(method_name)
|
||||
with catch_event("onMessage") as x:
|
||||
mid = getattr(client, method_name)(url, Message(text))
|
||||
|
||||
assert compare(x, mid=mid, message=text)
|
||||
assert subset(vars(x.res["message_object"]), uid=mid, author=client.uid, text=text)
|
||||
assert x.res["message_object"].attachments[0]
|
12
tests/test_tests.py
Normal file
12
tests/test_tests.py
Normal file
@@ -0,0 +1,12 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def test_catch_event(client2, catch_event):
|
||||
mid = "test"
|
||||
with catch_event("onMessage") as x:
|
||||
client2.onMessage(mid=mid)
|
||||
assert x.res['mid'] == mid
|
100
tests/test_thread_interraction.py
Normal file
100
tests/test_thread_interraction.py
Normal file
@@ -0,0 +1,100 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import pytest
|
||||
|
||||
from fbchat.models import (
|
||||
Message,
|
||||
ThreadType,
|
||||
FBchatFacebookError,
|
||||
TypingStatus,
|
||||
ThreadColor,
|
||||
)
|
||||
from utils import random_hex, subset
|
||||
from os import environ
|
||||
|
||||
|
||||
def test_remove_from_and_add_to_group(client1, client2, group, catch_event):
|
||||
# Test both methods, while ensuring that the user gets added to the group
|
||||
try:
|
||||
with catch_event("onPersonRemoved") as x:
|
||||
client1.removeUserFromGroup(client2.uid, group["id"])
|
||||
assert subset(
|
||||
x.res, removed_id=client2.uid, author_id=client1.uid, thread_id=group["id"]
|
||||
)
|
||||
finally:
|
||||
with catch_event("onPeopleAdded") as x:
|
||||
client1.addUsersToGroup(client2.uid, group["id"])
|
||||
assert subset(
|
||||
x.res, added_ids=[client2.uid], author_id=client1.uid, thread_id=group["id"]
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
raises=FBchatFacebookError, reason="Apparently changeThreadTitle is broken"
|
||||
)
|
||||
def test_change_title(client1, catch_event, group):
|
||||
title = random_hex()
|
||||
with catch_event("onTitleChange") as x:
|
||||
mid = client1.changeThreadTitle(title, group["id"])
|
||||
assert subset(
|
||||
x.res,
|
||||
mid=mid,
|
||||
author_id=client1.uid,
|
||||
new_title=title,
|
||||
thread_id=group["id"],
|
||||
thread_type=ThreadType.GROUP,
|
||||
)
|
||||
|
||||
|
||||
def test_change_nickname(client, client_all, catch_event, compare):
|
||||
nickname = random_hex()
|
||||
with catch_event("onNicknameChange") as x:
|
||||
client.changeNickname(nickname, client_all.uid)
|
||||
assert compare(x, changed_for=client_all.uid, new_nickname=nickname)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("emoji", ["😀", "😂", "😕", "😍"])
|
||||
def test_change_emoji(client, catch_event, compare, emoji):
|
||||
with catch_event("onEmojiChange") as x:
|
||||
client.changeThreadEmoji(emoji)
|
||||
assert compare(x, new_emoji=emoji)
|
||||
|
||||
|
||||
@pytest.mark.xfail(raises=FBchatFacebookError)
|
||||
@pytest.mark.parametrize("emoji", ["🙃", "not an emoji"])
|
||||
def test_change_emoji_invalid(client, emoji):
|
||||
client.changeThreadEmoji(emoji)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"color",
|
||||
[
|
||||
x
|
||||
if x in [ThreadColor.MESSENGER_BLUE, ThreadColor.PUMPKIN]
|
||||
else pytest.mark.expensive(x)
|
||||
for x in ThreadColor
|
||||
],
|
||||
)
|
||||
def test_change_color(client, catch_event, compare, color):
|
||||
with catch_event("onColorChange") as x:
|
||||
client.changeThreadColor(color)
|
||||
assert compare(x, new_color=color)
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
raises=FBchatFacebookError, strict=False, reason="Should fail, but doesn't"
|
||||
)
|
||||
def test_change_color_invalid(client):
|
||||
class InvalidColor:
|
||||
value = "#0077ff"
|
||||
|
||||
client.changeThreadColor(InvalidColor())
|
||||
|
||||
|
||||
@pytest.mark.parametrize("status", TypingStatus)
|
||||
def test_typing_status(client, catch_event, compare, status):
|
||||
with catch_event("onTyping") as x:
|
||||
client.setTypingStatus(status)
|
||||
assert compare(x, status=status)
|
83
tests/utils.py
Normal file
83
tests/utils.py
Normal file
@@ -0,0 +1,83 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import threading
|
||||
import logging
|
||||
import six
|
||||
|
||||
from os import environ
|
||||
from random import randrange
|
||||
from contextlib import contextmanager
|
||||
from six import viewitems
|
||||
from fbchat import Client
|
||||
from fbchat.models import ThreadType
|
||||
|
||||
log = logging.getLogger("fbchat.tests").addHandler(logging.NullHandler())
|
||||
|
||||
|
||||
class ClientThread(threading.Thread):
|
||||
def __init__(self, client, *args, **kwargs):
|
||||
self.client = client
|
||||
self.should_stop = threading.Event()
|
||||
super(ClientThread, self).__init__(*args, **kwargs)
|
||||
|
||||
def start(self):
|
||||
self.client.startListening()
|
||||
self.client.doOneListen() # QPrimer, Facebook now knows we're about to start pulling
|
||||
super(ClientThread, self).start()
|
||||
|
||||
def run(self):
|
||||
while not self.should_stop.is_set() and self.client.doOneListen():
|
||||
pass
|
||||
|
||||
self.client.stopListening()
|
||||
|
||||
|
||||
if six.PY2:
|
||||
event_class = threading._Event
|
||||
else:
|
||||
event_class = threading.Event
|
||||
|
||||
|
||||
class CaughtValue(event_class):
|
||||
def set(self, res):
|
||||
self.res = res
|
||||
super(CaughtValue, self).set()
|
||||
|
||||
def wait(self, timeout=3):
|
||||
super(CaughtValue, self).wait(timeout=timeout)
|
||||
|
||||
|
||||
def random_hex(length=20):
|
||||
return "{:X}".format(randrange(16 ** length))
|
||||
|
||||
|
||||
def subset(a, **b):
|
||||
print(a)
|
||||
print(b)
|
||||
return viewitems(b) <= viewitems(a)
|
||||
|
||||
|
||||
def load_variable(name, cache):
|
||||
var = environ.get(name, None)
|
||||
if var is not None:
|
||||
if cache.get(name, None) != var:
|
||||
cache.set(name, var)
|
||||
return var
|
||||
|
||||
var = cache.get(name, None)
|
||||
if var is None:
|
||||
raise ValueError("Variable {!r} neither in environment nor cache".format(name))
|
||||
return var
|
||||
|
||||
|
||||
@contextmanager
|
||||
def load_client(n, cache):
|
||||
client = Client(
|
||||
load_variable("client{}_email".format(n), cache),
|
||||
load_variable("client{}_password".format(n), cache),
|
||||
session_cookies=cache.get("client{}_session".format(n), None),
|
||||
)
|
||||
yield client
|
||||
cache.set("client{}_session".format(n), client.getSession())
|
Reference in New Issue
Block a user