See commit description

- Deprecated `sendMessage` and `sendEmoji` in favor of `send`
- (Almost) Fully integrated attachment support
- Updated tests
- General cleanup
This commit is contained in:
Mads Marquart
2017-10-21 17:59:44 +02:00
parent dda75c6099
commit 7ecf229db5
10 changed files with 337 additions and 280 deletions

View File

@@ -70,8 +70,8 @@ The same method can be applied to some user accounts, though if they've set a cu
Here's an snippet showing the usage of thread IDs and thread types, where ``<user id>`` and ``<group id>``
corresponds to the ID of a single user, and the ID of a group respectively::
client.sendMessage('<message>', thread_id='<user id>', thread_type=ThreadType.USER)
client.sendMessage('<message>', thread_id='<group id>', thread_type=ThreadType.GROUP)
client.send(Message(text='<message>'), thread_id='<user id>', thread_type=ThreadType.USER)
client.send(Message(text='<message>'), thread_id='<group id>', thread_type=ThreadType.GROUP)
Some functions (e.g. :func:`Client.changeThreadColor`) don't require a thread type, so in these cases you just provide the thread ID::
@@ -91,7 +91,7 @@ Some of `fbchat`'s functions require these ID's, like :func:`Client.reactToMessa
and some of then provide this ID, like :func:`Client.sendMessage`.
This snippet shows how to send a message, and then use the returned ID to react to that message with a 😍 emoji::
message_id = client.sendMessage('message', thread_id=thread_id, thread_type=thread_type)
message_id = client.send(Message(text='message'), thread_id=thread_id, thread_type=thread_type)
client.reactToMessage(message_id, MessageReaction.LOVE)
@@ -108,7 +108,7 @@ like adding users to and removing users from a group chat, logically only works
The simplest way of using `fbchat` is to send a message.
The following snippet will, as you've probably already figured out, send the message `test message` to your account::
message_id = client.sendMessage('test message', thread_id=client.uid, thread_type=ThreadType.USER)
message_id = client.send(Message(text='test message'), thread_id=client.uid, thread_type=ThreadType.USER)
You can see a full example showing all the possible thread interactions with `fbchat` by going to :ref:`examples`
@@ -176,7 +176,7 @@ The event actions can be changed by subclassing the :class:`Client`, and then ov
class CustomClient(Client):
def onMessage(self, mid, author_id, message_object, thread_id, thread_type, ts, metadata, msg, **kwargs):
# Do something with the message_object here
# Do something with message_object here
pass
client = CustomClient('<email>', '<password>')
@@ -185,7 +185,7 @@ The event actions can be changed by subclassing the :class:`Client`, and then ov
class CustomClient(Client):
def onMessage(self, message_object, author_id, thread_id, thread_type, **kwargs):
# Do something with the message here
# Do something with message_object here
pass
client = CustomClient('<email>', '<password>')

View File

@@ -7,6 +7,6 @@ client = Client('<email>', '<password>')
print('Own id: {}'.format(client.uid))
client.sendMessage('Hi me!', thread_id=client.uid, thread_type=ThreadType.USER)
client.send(Message(text='Hi me!'), thread_id=client.uid, thread_type=ThreadType.USER)
client.logout()

View File

@@ -10,9 +10,9 @@ class EchoBot(Client):
log.info("{} from {} in {}".format(message_object, thread_id, thread_type.name))
# If you're not the author, and the message was a message containing text, echo
if author_id != self.uid and message_object.text is not None:
self.sendMessage(message_object.text, thread_id=thread_id, thread_type=thread_type)
# If you're not the author, echo
if author_id != self.uid:
self.send(message_object, thread_id=thread_id, thread_type=thread_type)
client = EchoBot("<email>", "<password>")
client.listen()

View File

@@ -9,19 +9,25 @@ thread_id = '1234567890'
thread_type = ThreadType.GROUP
# Will send a message to the thread
client.sendMessage('<message>', thread_id=thread_id, thread_type=thread_type)
client.send(Message(text='<message>'), thread_id=thread_id, thread_type=thread_type)
# Will send the default `like` emoji
client.sendEmoji(emoji=None, size=EmojiSize.LARGE, thread_id=thread_id, thread_type=thread_type)
client.send(Message(emoji_size=EmojiSize.LARGE), thread_id=thread_id, thread_type=thread_type)
# Will send the emoji `👍`
client.sendEmoji(emoji='👍', size=EmojiSize.LARGE, thread_id=thread_id, thread_type=thread_type)
client.send(Message(text='👍', emoji_size=EmojiSize.LARGE), thread_id=thread_id, thread_type=thread_type)
# Will send the sticker with ID `767334476626295`
client.send(Message(sticker=Sticker('767334476626295')), thread_id=thread_id, thread_type=thread_type)
# Will send a message with a mention
client.send(Message(text='This is a @mention', mentions=[Mention(thread_id, offset=10, length=8)]), thread_id=thread_id, thread_type=thread_type)
# Will send the image located at `<image path>`
client.sendLocalImage('<image path>', message='This is a local image', thread_id=thread_id, thread_type=thread_type)
client.sendLocalImage('<image path>', message=Message(text='This is a local image'), thread_id=thread_id, thread_type=thread_type)
# Will download the image at the url `<image url>`, and then send it
client.sendRemoteImage('<image url>', message='This is a remote image', thread_id=thread_id, thread_type=thread_type)
client.sendRemoteImage('<image url>', message=Message(text='This is a remote image'), thread_id=thread_id, thread_type=thread_type)
# Only do these actions if the thread is a group

View File

@@ -17,7 +17,7 @@ from .client import *
__copyright__ = 'Copyright 2015 - {} by Taehoon Kim'.format(datetime.now().year)
__version__ = '1.0.25'
__version__ = '1.1.0'
__license__ = 'BSD'
__author__ = 'Taehoon Kim; Moreels Pieter-Jan; Mads Marquart'
__email__ = 'carpedm20@gmail.com'

View File

@@ -5,7 +5,6 @@ import requests
import urllib
from uuid import uuid1
from random import choice
from datetime import datetime
from bs4 import BeautifulSoup as bs
from mimetypes import guess_type
from .utils import *
@@ -730,7 +729,7 @@ class Client(object):
"""
Get the last messages in a thread
:param thread_id: User/Group ID to default to. See :ref:`intro_threads`
:param thread_id: User/Group ID to get messages from. See :ref:`intro_threads`
:param limit: Max. number of messages to retrieve
:param before: A timestamp, indicating from which point to retrieve messages
:type limit: int
@@ -740,6 +739,8 @@ class Client(object):
:raises: FBchatException if request failed
"""
thread_id, thread_type = self._getThread(thread_id, None)
j = self.graphql_request(GraphQL(doc_id='1386147188135407', params={
'id': thread_id,
'message_limit': limit,
@@ -867,45 +868,53 @@ class Client(object):
SEND METHODS
"""
def _getSendData(self, thread_id=None, thread_type=ThreadType.USER):
def _oldMessage(self, message):
return message if isinstance(message, Message) else Message(text=message)
def _getSendData(self, message=None, thread_id=None, thread_type=ThreadType.USER):
"""Returns the data needed to send a request to `SendURL`"""
messageAndOTID = generateOfflineThreadingID()
timestamp = now()
date = datetime.now()
data = {
'client': self.client,
'author' : 'fbid:' + str(self.uid),
'timestamp' : timestamp,
'timestamp_absolute' : 'Today',
'timestamp_relative' : str(date.hour) + ":" + str(date.minute).zfill(2),
'timestamp_time_passed' : '0',
'is_unread' : False,
'is_cleared' : False,
'is_forward' : False,
'is_filtered_content' : False,
'is_filtered_content_bh': False,
'is_filtered_content_account': False,
'is_filtered_content_quasar': False,
'is_filtered_content_invalid_app': False,
'is_spoof_warning' : False,
'source' : 'source:chat:web',
'source_tags[0]' : 'source:chat',
'html_body' : False,
'ui_push_phase' : 'V3',
'status' : '0',
'offline_threading_id': messageAndOTID,
'message_id' : messageAndOTID,
'threading_id': generateMessageID(self.client_id),
'ephemeral_ttl_mode:': '0',
'manual_retry_cnt' : '0',
'signatureID' : getSignatureID()
'ephemeral_ttl_mode:': '0'
}
# Set recipient
if thread_type in [ThreadType.USER, ThreadType.PAGE]:
data["other_user_fbid"] = thread_id
data['other_user_fbid'] = thread_id
elif thread_type == ThreadType.GROUP:
data["thread_fbid"] = thread_id
data['thread_fbid'] = thread_id
if message is None:
message = Message()
if message.text or message.sticker or message.emoji_size:
data['action_type'] = 'ma-type:user-generated-message'
if message.text:
data['body'] = message.text
for i, mention in enumerate(message.mentions):
data['profile_xmd[{}][id]'.format(i)] = mention.thread_id
data['profile_xmd[{}][offset]'.format(i)] = mention.offset
data['profile_xmd[{}][length]'.format(i)] = mention.length
data['profile_xmd[{}][type]'.format(i)] = 'p'
if message.emoji_size:
if message.text:
data['tags[0]'] = 'hot_emoji_size:' + message.emoji_size.name.lower()
else:
data['sticker_id'] = message.emoji_size.value
if message.sticker:
data['sticker_id'] = message.sticker.uid
return data
@@ -928,67 +937,34 @@ class Client(object):
return message_id
def sendMessage(self, message, mention=None, thread_id=None,
thread_type=ThreadType.USER):
def send(self, message, thread_id=None, thread_type=ThreadType.USER):
"""
Sends a message to a thread
:param message: Message to send
:param thread_id: User/Group ID to send to. See :ref:`intro_threads`
:param thread_type: See :ref:`intro_threads`
:type message: models.Message
:type thread_type: models.ThreadType
:mention is in this format {userID : (start, end)},
where start is relative start position of @mention in a message
and end is relative end position of @mention
:return: :ref:`Message ID <intro_message_ids>` of the sent message
:raises: FBchatException if request failed
"""
thread_id, thread_type = self._getThread(thread_id, thread_type)
data = self._getSendData(thread_id, thread_type)
data['action_type'] = 'ma-type:user-generated-message'
data['body'] = message or ''
if mention:
n = 0
for key, value in mention.items():
data['profile_xmd[%d][id]'%n] = key
data['profile_xmd[%d][offset]'%n] = value[0]
data['profile_xmd[%d][length]'%n] = value[1] - value[0]
data['profile_xmd[%d][type]'%n] = 'p'
n += 1
data['has_attachment'] = False
data['specific_to_list[0]'] = 'fbid:' + thread_id
data['specific_to_list[1]'] = 'fbid:' + self.uid
data = self._getSendData(message=message, thread_id=thread_id, thread_type=thread_type)
return self._doSendRequest(data)
def sendMessage(self, message, thread_id=None, thread_type=ThreadType.USER):
"""
Deprecated. Use :func:`fbchat.Client.send` instead
"""
return self.send(Message(text=message), thread_id=thread_id, thread_type=thread_type)
def sendEmoji(self, emoji=None, size=EmojiSize.SMALL, thread_id=None, thread_type=ThreadType.USER):
"""
Sends an emoji to a thread
:param emoji: The chosen emoji to send. If not specified, the default `like` emoji is sent
:param size: If not specified, a small emoji is sent
:param thread_id: User/Group ID to send to. See :ref:`intro_threads`
:param thread_type: See :ref:`intro_threads`
:type size: models.EmojiSize
:type thread_type: models.ThreadType
:return: :ref:`Message ID <intro_message_ids>` of the sent emoji
:raises: FBchatException if request failed
Deprecated. Use :func:`fbchat.Client.send` instead
"""
thread_id, thread_type = self._getThread(thread_id, thread_type)
data = self._getSendData(thread_id, thread_type)
data['action_type'] = 'ma-type:user-generated-message'
data['has_attachment'] = False
data['specific_to_list[0]'] = 'fbid:' + thread_id
data['specific_to_list[1]'] = 'fbid:' + self.uid
if emoji:
data['body'] = emoji
data['tags[0]'] = 'hot_emoji_size:' + size.name.lower()
else:
data["sticker_id"] = size.value
return self._doSendRequest(data)
return self.send(Message(text=emoji, emoji_size=size), thread_id=thread_id, thread_type=thread_type)
def _uploadImage(self, image_path, data, mimetype):
"""Upload an image and get the image_id for sending in a message"""
@@ -1008,25 +984,13 @@ class Client(object):
def sendImage(self, image_id, message=None, thread_id=None, thread_type=ThreadType.USER, is_gif=False):
"""
Sends an already uploaded image to a thread. (Used by :func:`Client.sendRemoteImage` and :func:`Client.sendLocalImage`)
:param image_id: ID of an image that's already uploaded to Facebook
:param message: Additional message
:param thread_id: User/Group ID to send to. See :ref:`intro_threads`
:param thread_type: See :ref:`intro_threads`
:param is_gif: if sending GIF, True, else False
:type thread_type: models.ThreadType
:return: :ref:`Message ID <intro_message_ids>` of the sent image
:raises: FBchatException if request failed
Deprecated. Use :func:`fbchat.Client.send` instead
"""
thread_id, thread_type = self._getThread(thread_id, thread_type)
data = self._getSendData(thread_id, thread_type)
data = self._getSendData(message=message, thread_id=thread_id, thread_type=thread_type)
data['action_type'] = 'ma-type:user-generated-message'
data['body'] = message or ''
data['has_attachment'] = True
data['specific_to_list[0]'] = 'fbid:' + str(thread_id)
data['specific_to_list[1]'] = 'fbid:' + str(self.uid)
if not is_gif:
data['image_ids[0]'] = image_id
@@ -1083,7 +1047,7 @@ class Client(object):
:raises: FBchatException if request failed
"""
thread_id, thread_type = self._getThread(thread_id, None)
data = self._getSendData(thread_id, ThreadType.GROUP)
data = self._getSendData(thread_id=thread_id, thread_type=ThreadType.GROUP)
data['action_type'] = 'ma-type:log-message'
data['log_message_type'] = 'log:subscribe'
@@ -1138,7 +1102,7 @@ class Client(object):
# The thread is a user, so we change the user's nickname
return self.changeNickname(title, thread_id, thread_id=thread_id, thread_type=thread_type)
else:
data = self._getSendData(thread_id, thread_type)
data = self._getSendData(thread_id=thread_id, thread_type=thread_type)
data['action_type'] = 'ma-type:log-message'
data['log_message_data[name]'] = title
@@ -1240,11 +1204,17 @@ class Client(object):
"""
Sets an event reminder
:param thread_id: :ref:`Thread ID <intro_thread_ids>` to send event to
..warning::
Does not work in Python2.7
..todo::
Make this work in Python2.7
:param thread_id: User/Group ID to send event to. See :ref:`intro_threads`
:param time: Event time (unix time stamp)
:param title: Event title
:param location: Event location
:param location_ir: Event location ID
:param location: Event location name
:param location_id: Event location ID
:raises: FBchatException if request failed
"""
full_data = {
@@ -1506,82 +1476,43 @@ class Client(object):
except Exception:
log.exception('An exception occured while reading attachments')
sticker = None
attachments = []
if delta.get('attachments'):
try:
for a in delta['attachments']:
mercury = a['mercury']
blob = mercury.get('blob_attachment', {})
image_metadata = a.get('imageMetadata', {})
attach_type = mercury['attach_type']
if attach_type in ['photo', 'animated_image']:
attachments.append(ImageAttachment(
original_extension=blob.get('original_extension') or (blob['filename'].split('-')[0] if blob.get('filename') else None),
width=int(image_metadata['width']),
height=int(image_metadata['height']),
is_animated=attach_type=='animated_image',
thumbnail_url=mercury.get('thumbnail_url'),
preview=blob.get('preview') or blob.get('preview_image'),
large_preview=blob.get('large_preview'),
animated_preview=blob.get('animated_image'),
uid=a['id']
))
elif attach_type == 'file':
# Add more data here for audio files
attachments.append(FileAttachment(
url=mercury.get('url'),
size=int(a['fileSize']),
name=mercury.get('name'),
is_malicious=blob.get('is_malicious'),
uid=a['id']
))
elif attach_type == 'video':
attachments.append(VideoAttachment(
size=int(a['fileSize']),
width=int(image_metadata['width']),
height=int(image_metadata['height']),
duration=blob.get('playable_duration_in_ms'),
preview_url=blob.get('playable_url'),
small_image=blob.get('chat_image'),
medium_image=blob.get('inbox_image'),
large_image=blob.get('large_image'),
uid=a['id']
))
elif attach_type == 'sticker':
# Add more data here for stickers
attachments.append(StickerAttachment(
uid=mercury.get('metadata', {}).get('stickerID')
))
elif attach_type == 'share':
# Add more data here for shared stuff (URLs, events and so on)
attachments.append(ShareAttachment(
uid=a.get('id')
))
else:
attachments.append(Attachment(
uid=a.get('id')
))
if mercury.get('attach_type'):
image_metadata = a.get('imageMetadata', {})
attach_type = mercury['attach_type']
attachment = graphql_to_attachment(mercury.get('blob_attachment', {}))
if attach_type == ['file', 'video']:
# TODO: Add more data here for audio files
attachment.size = int(a['fileSize'])
elif attach_type == 'share':
# TODO: Add more data here for shared stuff (URLs, events and so on)
pass
attachments.append(attachment)
if a['mercury'].get('sticker_attachment'):
sticker = graphql_to_sticker(a['mercury']['sticker_attachment'])
except Exception:
log.exception('An exception occured while reading attachments: {}'.format(delta['attachments']))
emoji_size = None
if metadata and metadata.get('tags'):
for tag in metadata['tags']:
if tag.startswith('hot_emoji_size:'):
emoji_size = LIKES[tag.split(':')[1]]
break
emoji_size = get_emojisize_from_tags(metadata.get('tags'))
message = Message(
text=delta.get('body'),
mentions=mentions,
emoji_size=emoji_size
emoji_size=emoji_size,
sticker=sticker,
attachments=attachments
)
message.uid = mid
message.author = author_id
message.timestamp = ts
message.attachments = attachments
#message.is_read = None
#message.reactions = []
#message.reactions = {}
thread_id, thread_type = getThreadIdAndThreadType(metadata)
self.onMessage(mid=mid, author_id=author_id, message=delta.get('body', ''), message_object=message,
thread_id=thread_id, thread_type=thread_type, ts=ts, metadata=metadata, msg=m)

View File

@@ -61,26 +61,88 @@ def get_customization_info(thread):
rtn['own_nickname'] = pc[1].get('nickname')
return rtn
def graphql_to_sticker(s):
if not s:
return None
sticker = Sticker(
uid=s['id']
)
if s.get('pack'):
sticker.pack = s['pack'].get('id')
if s.get('sprite_image'):
sticker.is_animated = True
sticker.medium_sprite_image = s['sprite_image'].get('uri')
sticker.large_sprite_image = s['sprite_image_2x'].get('uri')
sticker.frames_per_row = s.get('frames_per_row')
sticker.frames_per_col = s.get('frames_per_column')
sticker.frame_rate = s.get('frame_rate')
sticker.url = s.get('url')
sticker.width = s.get('width')
sticker.height = s.get('height')
if s.get('label'):
sticker.label = s['label']
return sticker
def graphql_to_attachment(a):
_type = a['__typename']
if _type in ['MessageImage', 'MessageAnimatedImage']:
return ImageAttachment(
original_extension=a.get('original_extension') or (a['filename'].split('-')[0] if a.get('filename') else None),
width=a.get('original_dimensions', {}).get('width'),
height=a.get('original_dimensions', {}).get('height'),
is_animated=_type=='MessageAnimatedImage',
thumbnail_url=a.get('thumbnail', {}).get('uri'),
preview=a.get('preview') or a.get('preview_image'),
large_preview=a.get('large_preview'),
animated_preview=a.get('animated_image'),
uid=a.get('legacy_attachment_id')
)
elif _type == 'MessageVideo':
return VideoAttachment(
width=a.get('original_dimensions', {}).get('width'),
height=a.get('original_dimensions', {}).get('height'),
duration=a.get('playable_duration_in_ms'),
preview_url=a.get('playable_url'),
small_image=a.get('chat_image'),
medium_image=a.get('inbox_image'),
large_image=a.get('large_image'),
uid=a.get('legacy_attachment_id')
)
elif _type == 'MessageFile':
return FileAttachment(
url=a.get('url'),
name=a.get('filename'),
is_malicious=a.get('is_malicious'),
uid=a.get('message_file_fbid')
)
else:
return Attachment(
uid=a.get('legacy_attachment_id')
)
def graphql_to_message(message):
if message.get('message_sender') is None:
message['message_sender'] = {}
if message.get('message') is None:
message['message'] = {}
is_read = None
if message.get('unread') is not None:
is_read = not message['unread']
return Message(
message.get('message_id'),
author=message.get('message_sender').get('id'),
timestamp=message.get('timestamp_precise'),
is_read=is_read,
reactions=message.get('message_reactions'),
rtn = Message(
text=message.get('message').get('text'),
mentions=[Mention(m.get('entity', {}).get('id'), offset=m.get('offset'), length=m.get('length')) for m in message.get('message').get('ranges', [])],
sticker=message.get('sticker'),
attachments=message.get('blob_attachments'),
extensible_attachment=message.get('extensible_attachment')
emoji_size=get_emojisize_from_tags(message.get('tags_list')),
sticker=graphql_to_sticker(message.get('sticker'))
)
rtn.uid = str(message.get('message_id'))
rtn.author = str(message.get('message_sender').get('id'))
rtn.timestamp = message.get('timestamp_precise')
if message.get('unread') is not None:
rtn.is_read = not message['unread']
rtn.reactions = {str(r['user']['id']):MessageReaction(r['reaction']) for r in message.get('message_reactions')}
if message.get('blob_attachments') is not None:
rtn.attachments = [graphql_to_attachment(attachment) for attachment in message['blob_attachments']]
# TODO: This is still missing parsing:
# message.get('extensible_attachment')
return rtn
def graphql_to_user(user):
if user.get('profile_picture') is None:

View File

@@ -164,51 +164,40 @@ class Page(Thread):
class Message(object):
#: The actual message
text = str
text = None
#: A list of :class:`Mention` objects
mentions = list
#: The message ID
uid = str
#: ID of the sender
author = int
#: Timestamp of when the message was sent
timestamp = str
#: Whether the message is read
is_read = bool
#: A list of message reactions
reactions = list
#: The actual message
text = str
#: A list of :class:`Mention` objects
mentions = list
#: An ID of a sent sticker
sticker = str
mentions = []
#: A :class:`EmojiSize`. Size of a sent emoji
emoji_size = None
#: The message ID
uid = None
#: ID of the sender
author = None
#: Timestamp of when the message was sent
timestamp = None
#: Whether the message is read
is_read = None
#: A dict with user's IDs as keys, and their :class:`MessageReaction` as values
reactions = {}
#: The actual message
text = None
#: A :class:`Sticker`
sticker = None
#: A list of attachments
attachments = list
attachments = []
def __init__(self, uid, author=None, timestamp=None, is_read=None, reactions=None, text=None, mentions=None, sticker=None, attachments=None, extensible_attachment=None, emoji_size=None):
def __init__(self, text=None, mentions=None, emoji_size=None, sticker=None, attachments=None):
"""Represents a Facebook message"""
self.uid = uid
self.author = author
self.timestamp = timestamp
self.is_read = is_read
if reactions is None:
reactions = []
self.reactions = reactions
self.text = text
if mentions is None:
mentions = []
self.mentions = mentions
self.emoji_size = emoji_size
self.sticker = sticker
if attachments is None:
attachments = []
self.attachments = attachments
if extensible_attachment is None:
extensible_attachment = {}
self.extensible_attachment = extensible_attachment
self.emoji_size = emoji_size
self.reactions = {}
def __repr__(self):
return self.__unicode__()
@@ -220,14 +209,40 @@ class Attachment(object):
#: The attachment ID
uid = str
def __init__(self, uid=None, mime_type=None):
def __init__(self, uid=None):
"""Represents a Facebook attachment"""
self.uid = uid
class StickerAttachment(Attachment):
def __init__(self, **kwargs):
"""Represents a sticker that has been sent as a Facebook attachment - *Currently Incomplete!*"""
super(StickerAttachment, self).__init__(**kwargs)
class Sticker(Attachment):
#: The sticker-pack's ID
pack = None
#: Whether the sticker is animated
is_animated = False
# If the sticker is animated, the following should be present
#: URL to a medium spritemap
medium_sprite_image = None
#: URL to a large spritemap
large_sprite_image = None
#: The amount of frames present in the spritemap pr. row
frames_per_row = None
#: The amount of frames present in the spritemap pr. coloumn
frames_per_col = None
#: The frame rate the spritemap is intended to be played in
frame_rate = None
#: URL to the sticker's image
url = None
#: Width of the sticker
width = None
#: Height of the sticker
height = None
#: The sticker's label/name
label = None
def __init__(self, *args, **kwargs):
"""Represents a Facebook sticker that has been sent to a Facebook thread as an attachment"""
super(Sticker, self).__init__(*args, **kwargs)
class ShareAttachment(Attachment):
def __init__(self, **kwargs):
@@ -268,7 +283,7 @@ class ImageAttachment(Attachment):
#: Whether the image is animated
is_animated = bool
#: Url to a thumbnail of the image
#: URL to a thumbnail of the image
thumbnail_url = str
#: URL to a medium preview of the image
@@ -300,7 +315,11 @@ class ImageAttachment(Attachment):
"""
super(ImageAttachment, self).__init__(**kwargs)
self.original_extension = original_extension
if width is not None:
width = int(width)
self.width = width
if height is not None:
height = int(height)
self.height = height
self.is_animated = is_animated
self.thumbnail_url = thumbnail_url
@@ -385,19 +404,25 @@ class VideoAttachment(Attachment):
class Mention(object):
#: The user ID the mention is pointing at
user_id = str
#: The thread ID the mention is pointing at
thread_id = str
#: The character where the mention starts
offset = int
#: The length of the mention
length = int
def __init__(self, user_id, offset=0, length=10):
def __init__(self, thread_id, offset=0, length=10):
"""Represents a @mention"""
self.user_id = user_id
self.thread_id = thread_id
self.offset = offset
self.length = length
def __repr__(self):
return self.__unicode__()
def __unicode__(self):
return '<Mention {}: offset={} length={}>'.format(self.thread_id, self.offset, self.length)
class Enum(enum.Enum):
"""Used internally by fbchat to support enumerations"""
def __repr__(self):

View File

@@ -215,3 +215,14 @@ def get_jsmods_require(j, index):
except (KeyError, IndexError) as e:
log.warning('Error when getting jsmods_require: {}. Facebook might have changed protocol'.format(j))
return None
def get_emojisize_from_tags(tags):
if tags is None:
return None
tmp = [tag for tag in tags if tag.startswith('hot_emoji_size:')]
if len(tmp) > 0:
try:
return LIKES[tmp[0].split(':')[1]]
except (KeyError, IndexError):
log.exception('Could not determine emoji size from {} - {}'.format(tags, tmp))
return None

150
tests.py
View File

@@ -22,6 +22,8 @@ Full documentation on https://fbchat.readthedocs.io/
"""
test_sticker_id = '767334476626295'
class CustomClient(Client):
def __init__(self, *args, **kwargs):
self.got_qprimer = False
@@ -64,16 +66,14 @@ class TestFbchat(unittest.TestCase):
def test_defaultThread(self):
# setDefaultThread
client.setDefaultThread(group_id, ThreadType.GROUP)
self.assertTrue(client.sendMessage('test_default_recipient★'))
client.setDefaultThread(user_id, ThreadType.USER)
self.assertTrue(client.sendMessage('test_default_recipient★'))
for thread in threads:
client.setDefaultThread(thread_id=thread['id'], thread_type=thread['type'])
self.assertTrue(client.send(Message(text='test_default_recipient★')))
# resetDefaultThread
client.resetDefaultThread()
with self.assertRaises(ValueError):
client.sendMessage('should_not_send')
client.send(Message(text='should_not_send'))
def test_fetchAllUsers(self):
users = client.fetchAllUsers()
@@ -96,46 +96,43 @@ class TestFbchat(unittest.TestCase):
groups = client.searchForGroups('')
self.assertGreater(len(groups), 0)
def test_sendEmoji(self):
self.assertIsNotNone(client.sendEmoji(size=EmojiSize.SMALL, thread_id=user_id, thread_type=ThreadType.USER))
self.assertIsNotNone(client.sendEmoji(size=EmojiSize.MEDIUM, thread_id=user_id, thread_type=ThreadType.USER))
self.assertIsNotNone(client.sendEmoji('😆', EmojiSize.LARGE, user_id, ThreadType.USER))
def test_send(self):
for thread in threads:
client.setDefaultThread(thread_id=thread['id'], thread_type=thread['type'])
self.assertIsNotNone(client.sendEmoji(size=EmojiSize.SMALL, thread_id=group_id, thread_type=ThreadType.GROUP))
self.assertIsNotNone(client.sendEmoji(size=EmojiSize.MEDIUM, thread_id=group_id, thread_type=ThreadType.GROUP))
self.assertIsNotNone(client.sendEmoji('😆', EmojiSize.LARGE, group_id, ThreadType.GROUP))
self.assertIsNotNone(client.send(Message(emoji_size=EmojiSize.SMALL)))
self.assertIsNotNone(client.send(Message(emoji_size=EmojiSize.MEDIUM)))
self.assertIsNotNone(client.send(Message(text='😆', emoji_size=EmojiSize.LARGE)))
def test_sendMessage(self):
self.assertIsNotNone(client.sendMessage('test_send_user★', user_id, ThreadType.USER))
self.assertIsNotNone(client.sendMessage('test_send_group', group_id, ThreadType.GROUP))
with self.assertRaises(Exception):
client.sendMessage('test_send_user_should_fail★', user_id, ThreadType.GROUP)
with self.assertRaises(Exception):
client.sendMessage('test_send_group_should_fail★', group_id, ThreadType.USER)
self.assertIsNotNone(client.send(Message(text='test_send★')))
with self.assertRaises(FBchatFacebookError):
self.assertIsNotNone(client.send(Message(text='test_send_should_fail'), thread_id=thread['id'], thread_type=(ThreadType.GROUP if thread['type'] == ThreadType.USER else ThreadType.USER)))
self.assertIsNotNone(client.send(Message(text='Hi there @user', mentions=[Mention(user_id, offset=9, length=5)])))
self.assertIsNotNone(client.send(Message(text='Hi there @group', mentions=[Mention(group_id, offset=9, length=6)])))
self.assertIsNotNone(client.send(Message(sticker=Sticker(test_sticker_id))))
def test_sendImages(self):
image_url = 'https://cdn4.iconfinder.com/data/icons/ionicons/512/icon-image-128.png'
image_local_url = path.join(path.dirname(__file__), 'tests/image.png')
self.assertTrue(client.sendRemoteImage(image_url, 'test_send_user_images_remote★', user_id, ThreadType.USER))
self.assertTrue(client.sendRemoteImage(image_url, 'test_send_group_images_remote★', group_id, ThreadType.GROUP))
self.assertTrue(client.sendLocalImage(image_local_url, 'test_send_group_images_local★', user_id, ThreadType.USER))
self.assertTrue(client.sendLocalImage(image_local_url, 'test_send_group_images_local★', group_id, ThreadType.GROUP))
for thread in threads:
client.setDefaultThread(thread_id=thread['id'], thread_type=thread['type'])
mentions = [Mention(thread['id'], offset=26, length=4)]
self.assertTrue(client.sendRemoteImage(image_url, Message(text='test_send_image_remote_to_@you★', mentions=mentions)))
self.assertTrue(client.sendLocalImage(image_local_url, Message(text='test_send_image_local_to__@you★', mentions=mentions)))
def test_fetchThreadList(self):
client.fetchThreadList(offset=0, limit=20)
def test_fetchThreadMessages(self):
client.sendMessage('test_user_getThreadInfo★', thread_id=user_id, thread_type=ThreadType.USER)
for thread in threads:
client.setDefaultThread(thread_id=thread['id'], thread_type=thread['type'])
client.send(Message(text='test_getThreadInfo★'))
messages = client.fetchThreadMessages(thread_id=user_id, limit=1)
self.assertEqual(messages[0].author, client.uid)
self.assertEqual(messages[0].text, 'test_user_getThreadInfo★')
client.sendMessage('test_group_getThreadInfo★', thread_id=group_id, thread_type=ThreadType.GROUP)
messages = client.fetchThreadMessages(thread_id=group_id, limit=1)
self.assertEqual(messages[0].author, client.uid)
self.assertEqual(messages[0].text, 'test_group_getThreadInfo★')
messages = client.fetchThreadMessages(limit=1)
self.assertEqual(messages[0].author, client.uid)
self.assertEqual(messages[0].text, 'test_getThreadInfo★')
def test_listen(self):
client.startListening()
@@ -156,48 +153,48 @@ class TestFbchat(unittest.TestCase):
client.addUsersToGroup(user_id, thread_id=group_id)
def test_changeThreadTitle(self):
client.changeThreadTitle('test_changeThreadTitle★', thread_id=group_id, thread_type=ThreadType.GROUP)
client.changeThreadTitle('test_changeThreadTitle★', thread_id=user_id, thread_type=ThreadType.USER)
for thread in threads:
client.changeThreadTitle('test_changeThreadTitle★', thread_id=thread['id'], thread_type=thread['type'])
def test_changeNickname(self):
client.changeNickname('test_changeNicknameSelf★', client.uid, thread_id=user_id, thread_type=ThreadType.USER)
client.changeNickname('test_changeNicknameOther★', user_id, thread_id=user_id, thread_type=ThreadType.USER)
client.changeNickname('test_changeNicknameSelf★', client.uid, thread_id=group_id, thread_type=ThreadType.GROUP)
client.changeNickname('test_changeNicknameOther★', user_id, thread_id=group_id, thread_type=ThreadType.GROUP)
for thread in threads:
client.setDefaultThread(thread_id=thread['id'], thread_type=thread['type'])
client.changeNickname('test_changeNicknameSelf★', client.uid)
client.changeNickname('test_changeNicknameOther★', user_id)
def test_changeThreadEmoji(self):
client.changeThreadEmoji('😀', group_id)
client.changeThreadEmoji('😀', user_id)
client.changeThreadEmoji('😆', group_id)
client.changeThreadEmoji('😆', user_id)
for thread in threads:
client.changeThreadEmoji('😀', thread_id=thread['id'])
client.changeThreadEmoji('😀', thread_id=thread['id'])
def test_changeThreadColor(self):
client.changeThreadColor(ThreadColor.BRILLIANT_ROSE, group_id)
client.changeThreadColor(ThreadColor.MESSENGER_BLUE, group_id)
client.changeThreadColor(ThreadColor.BRILLIANT_ROSE, user_id)
client.changeThreadColor(ThreadColor.MESSENGER_BLUE, user_id)
for thread in threads:
client.setDefaultThread(thread_id=thread['id'], thread_type=thread['type'])
client.changeThreadColor(ThreadColor.BRILLIANT_ROSE)
client.changeThreadColor(ThreadColor.MESSENGER_BLUE)
def test_reactToMessage(self):
mid = client.sendMessage('test_reactToMessage★', user_id, ThreadType.USER)
client.reactToMessage(mid, MessageReaction.LOVE)
mid = client.sendMessage('test_reactToMessage★', group_id, ThreadType.GROUP)
client.reactToMessage(mid, MessageReaction.LOVE)
for thread in threads:
mid = client.send(Message(text='test_reactToMessage★'), thread_id=thread['id'], thread_type=thread['type'])
client.reactToMessage(mid, MessageReaction.LOVE)
def test_setTypingStatus(self):
client.setTypingStatus(TypingStatus.TYPING, thread_id=user_id, thread_type=ThreadType.USER)
client.setTypingStatus(TypingStatus.STOPPED, thread_id=user_id, thread_type=ThreadType.USER)
client.setTypingStatus(TypingStatus.TYPING, thread_id=group_id, thread_type=ThreadType.GROUP)
client.setTypingStatus(TypingStatus.STOPPED, thread_id=group_id, thread_type=ThreadType.GROUP)
for thread in threads:
client.setDefaultThread(thread_id=thread['id'], thread_type=thread['type'])
client.setTypingStatus(TypingStatus.TYPING)
client.setTypingStatus(TypingStatus.STOPPED)
def start_test(param_client, param_group_id, param_user_id, tests=[]):
def start_test(param_client, param_group_id, param_user_id, param_threads, tests=[]):
global client
global group_id
global user_id
global threads
client = param_client
group_id = param_group_id
user_id = param_user_id
threads = param_threads
tests = ['test_' + test if 'test_' != test[:5] else test for test in tests]
@@ -220,19 +217,44 @@ if __name__ == '__main__':
try:
with open(path.join(path.dirname(__file__), 'tests/my_data.json'), 'r') as f:
json = json.load(f)
email = json['email']
password = json['password']
user_id = json['user_thread_id']
group_id = json['group_thread_id']
j = json.load(f)
email = j['email']
password = j['password']
user_id = j['user_thread_id']
group_id = j['group_thread_id']
session = j.get('session')
except (IOError, IndexError) as e:
email = input('Email: ')
password = getpass()
group_id = input('Please enter a group thread id (To test group functionality): ')
user_id = input('Please enter a user thread id (To test kicking/adding functionality): ')
threads = [
{
'id': user_id,
'type': ThreadType.USER
},
{
'id': group_id,
'type': ThreadType.GROUP
}
]
print('Logging in...')
client = CustomClient(email, password, logging_level=logging_level)
client = CustomClient(email, password, logging_level=logging_level, session_cookies=session)
# Warning! Taking user input directly like this could be dangerous! Use only for testing purposes!
start_test(client, group_id, user_id, argv[1:])
start_test(client, group_id, user_id, threads, argv[1:])
with open(path.join(path.dirname(__file__), 'tests/my_data.json'), 'w') as f:
session = None
try:
session = client.getSession()
except Exception:
print('Unable to fetch client session!')
json.dump({
'email': email,
'password': password,
'user_thread_id': user_id,
'group_thread_id': group_id,
'session': session
}, f)