Stop mutating models

This commit is contained in:
Mads Marquart
2019-12-11 14:14:47 +01:00
parent b03d0ae3b7
commit 27ae1c9f88
10 changed files with 278 additions and 266 deletions

View File

@@ -43,8 +43,19 @@ class ShareAttachment(Attachment):
def _from_graphql(cls, data): def _from_graphql(cls, data):
from . import _file from . import _file
image = None
original_image_url = None
media = data.get("media")
if media and media.get("image"):
image = Image._from_uri(media["image"])
original_image_url = (
_util.get_url_parameter(image.url, "url")
if "/safe_image.php" in image.url
else image.url
)
url = data.get("url") url = data.get("url")
rtn = cls( return cls(
uid=data.get("deduplication_key"), uid=data.get("deduplication_key"),
author=data["target"]["actors"][0]["id"] author=data["target"]["actors"][0]["id"]
if data["target"].get("actors") if data["target"].get("actors")
@@ -58,18 +69,10 @@ class ShareAttachment(Attachment):
if data.get("description") if data.get("description")
else None, else None,
source=data["source"].get("text") if data.get("source") else None, source=data["source"].get("text") if data.get("source") else None,
image=image,
original_image_url=original_image_url,
attachments=[ attachments=[
_file.graphql_to_subattachment(attachment) _file.graphql_to_subattachment(attachment)
for attachment in data.get("subattachments") for attachment in data.get("subattachments")
], ],
) )
media = data.get("media")
if media and media.get("image"):
image = media["image"]
rtn.image = Image._from_uri(image)
rtn.original_image_url = (
_util.get_url_parameter(rtn.image.url, "url")
if "/safe_image.php" in rtn.image.url
else rtn.image.url
)
return rtn

View File

@@ -686,22 +686,14 @@ class Client:
if j.get("message_thread") is None: if j.get("message_thread") is None:
raise FBchatException("Could not fetch thread {}: {}".format(thread_id, j)) raise FBchatException("Could not fetch thread {}: {}".format(thread_id, j))
read_receipts = j["message_thread"]["read_receipts"]["nodes"]
messages = [ messages = [
Message._from_graphql(message) Message._from_graphql(message, read_receipts)
for message in j["message_thread"]["messages"]["nodes"] for message in j["message_thread"]["messages"]["nodes"]
] ]
messages.reverse() messages.reverse()
read_receipts = j["message_thread"]["read_receipts"]["nodes"]
for message in messages:
for receipt in read_receipts:
if (
_util.millis_to_datetime(int(receipt["watermark"]))
>= message.created_at
):
message.read_by.append(receipt["actor"]["id"])
return messages return messages
def fetch_thread_list( def fetch_thread_list(
@@ -1009,11 +1001,16 @@ class Client:
Raises: Raises:
FBchatException: If request failed FBchatException: If request failed
""" """
quick_reply.is_response = True
if isinstance(quick_reply, QuickReplyText): if isinstance(quick_reply, QuickReplyText):
return self.send( new = QuickReplyText(
Message(text=quick_reply.title, quick_replies=[quick_reply]) payload=quick_reply.payload,
external_payload=quick_reply.external_payload,
data=quick_reply.data,
is_response=True,
title=quick_reply.title,
image_url=quick_reply.image_url,
) )
return self.send(Message(text=quick_reply.title, quick_replies=[new]))
elif isinstance(quick_reply, QuickReplyLocation): elif isinstance(quick_reply, QuickReplyLocation):
if not isinstance(payload, LocationAttachment): if not isinstance(payload, LocationAttachment):
raise TypeError( raise TypeError(
@@ -1023,17 +1020,23 @@ class Client:
payload, thread_id=thread_id, thread_type=thread_type payload, thread_id=thread_id, thread_type=thread_type
) )
elif isinstance(quick_reply, QuickReplyEmail): elif isinstance(quick_reply, QuickReplyEmail):
if not payload: new = QuickReplyEmail(
payload = self.get_emails()[0] payload=payload if payload else self.get_emails()[0],
quick_reply.external_payload = quick_reply.payload external_payload=quick_reply.payload,
quick_reply.payload = payload data=quick_reply.data,
return self.send(Message(text=payload, quick_replies=[quick_reply])) is_response=True,
image_url=quick_reply.image_url,
)
return self.send(Message(text=payload, quick_replies=[new]))
elif isinstance(quick_reply, QuickReplyPhoneNumber): elif isinstance(quick_reply, QuickReplyPhoneNumber):
if not payload: new = QuickReplyPhoneNumber(
payload = self.get_phone_numbers()[0] payload=payload if payload else self.get_phone_numbers()[0],
quick_reply.external_payload = quick_reply.payload external_payload=quick_reply.payload,
quick_reply.payload = payload data=quick_reply.data,
return self.send(Message(text=payload, quick_replies=[quick_reply])) is_response=True,
image_url=quick_reply.image_url,
)
return self.send(Message(text=payload, quick_replies=[new]))
def unsend(self, mid): def unsend(self, mid):
"""Unsend message by it's ID (removes it for everyone). """Unsend message by it's ID (removes it for everyone).
@@ -2533,9 +2536,8 @@ class Client:
i = d["deltaMessageReply"] i = d["deltaMessageReply"]
metadata = i["message"]["messageMetadata"] metadata = i["message"]["messageMetadata"]
thread_id, thread_type = get_thread_id_and_thread_type(metadata) thread_id, thread_type = get_thread_id_and_thread_type(metadata)
message = Message._from_reply(i["message"]) replied_to = Message._from_reply(i["repliedToMessage"])
message.replied_to = Message._from_reply(i["repliedToMessage"]) message = Message._from_reply(i["message"], replied_to)
message.reply_to_id = message.replied_to.uid
self.on_message( self.on_message(
mid=message.uid, mid=message.uid,
author_id=message.author, author_id=message.author,

View File

@@ -18,9 +18,10 @@ class FileAttachment(Attachment):
is_malicious = attr.ib(None) is_malicious = attr.ib(None)
@classmethod @classmethod
def _from_graphql(cls, data): def _from_graphql(cls, data, size=None):
return cls( return cls(
url=data.get("url"), url=data.get("url"),
size=size,
name=data.get("filename"), name=data.get("filename"),
is_malicious=data.get("is_malicious"), is_malicious=data.get("is_malicious"),
uid=data.get("message_file_fbid"), uid=data.get("message_file_fbid"),
@@ -130,8 +131,9 @@ class VideoAttachment(Attachment):
large_image = attr.ib(None) large_image = attr.ib(None)
@classmethod @classmethod
def _from_graphql(cls, data): def _from_graphql(cls, data, size=None):
return cls( return cls(
size=size,
width=data.get("original_dimensions", {}).get("width"), width=data.get("original_dimensions", {}).get("width"),
height=data.get("original_dimensions", {}).get("height"), height=data.get("original_dimensions", {}).get("height"),
duration=_util.millis_to_timedelta(data.get("playable_duration_in_ms")), duration=_util.millis_to_timedelta(data.get("playable_duration_in_ms")),
@@ -165,16 +167,16 @@ class VideoAttachment(Attachment):
) )
def graphql_to_attachment(data): def graphql_to_attachment(data, size=None):
_type = data["__typename"] _type = data["__typename"]
if _type in ["MessageImage", "MessageAnimatedImage"]: if _type in ["MessageImage", "MessageAnimatedImage"]:
return ImageAttachment._from_graphql(data) return ImageAttachment._from_graphql(data)
elif _type == "MessageVideo": elif _type == "MessageVideo":
return VideoAttachment._from_graphql(data) return VideoAttachment._from_graphql(data, size=size)
elif _type == "MessageAudio": elif _type == "MessageAudio":
return AudioAttachment._from_graphql(data) return AudioAttachment._from_graphql(data)
elif _type == "MessageFile": elif _type == "MessageFile":
return FileAttachment._from_graphql(data) return FileAttachment._from_graphql(data, size=size)
return Attachment(uid=data.get("legacy_attachment_id")) return Attachment(uid=data.get("legacy_attachment_id"))

View File

@@ -31,17 +31,17 @@ class LocationAttachment(Attachment):
address = None address = None
except ValueError: except ValueError:
latitude, longitude = None, None latitude, longitude = None, None
rtn = cls(
return cls(
uid=int(data["deduplication_key"]), uid=int(data["deduplication_key"]),
latitude=latitude, latitude=latitude,
longitude=longitude, longitude=longitude,
image=Image._from_uri_or_none(data["media"].get("image"))
if data.get("media")
else None,
url=url,
address=address, address=address,
) )
media = data.get("media")
if media and media.get("image"):
rtn.image = Image._from_uri(media["image"])
rtn.url = url
return rtn
@attr.s @attr.s
@@ -73,7 +73,13 @@ class LiveLocationAttachment(LocationAttachment):
@classmethod @classmethod
def _from_graphql(cls, data): def _from_graphql(cls, data):
target = data["target"] target = data["target"]
rtn = cls(
image = None
media = data.get("media")
if media and media.get("image"):
image = Image._from_uri(media["image"])
return cls(
uid=int(target["live_location_id"]), uid=int(target["live_location_id"]),
latitude=target["coordinate"]["latitude"] latitude=target["coordinate"]["latitude"]
if target.get("coordinate") if target.get("coordinate")
@@ -81,12 +87,9 @@ class LiveLocationAttachment(LocationAttachment):
longitude=target["coordinate"]["longitude"] longitude=target["coordinate"]["longitude"]
if target.get("coordinate") if target.get("coordinate")
else None, else None,
image=image,
url=data.get("url"),
name=data["title_with_entities"]["text"], name=data["title_with_entities"]["text"],
expires_at=_util.seconds_to_datetime(target.get("expiration_time")), expires_at=_util.seconds_to_datetime(target.get("expiration_time")),
is_expired=target.get("is_expired"), is_expired=target.get("is_expired"),
) )
media = data.get("media")
if media and media.get("image"):
rtn.image = Image._from_uri(media["image"])
rtn.url = data.get("url")
return rtn

View File

@@ -139,8 +139,7 @@ class Message:
) )
offset += len(name) offset += len(name)
message = cls(text=result, mentions=mentions) return cls(text=result, mentions=mentions)
return message
@staticmethod @staticmethod
def _get_forwarded_from_tags(tags): def _get_forwarded_from_tags(tags):
@@ -197,14 +196,43 @@ class Message:
return data return data
@staticmethod
def _parse_quick_replies(data):
if data:
data = json.loads(data).get("quick_replies")
if isinstance(data, list):
return [_quick_reply.graphql_to_quick_reply(q) for q in data]
elif isinstance(data, dict):
return [_quick_reply.graphql_to_quick_reply(data, is_response=True)]
return []
@classmethod @classmethod
def _from_graphql(cls, data): def _from_graphql(cls, data, read_receipts=None):
if data.get("message_sender") is None: if data.get("message_sender") is None:
data["message_sender"] = {} data["message_sender"] = {}
if data.get("message") is None: if data.get("message") is None:
data["message"] = {} data["message"] = {}
tags = data.get("tags_list") tags = data.get("tags_list")
rtn = cls(
created_at = _util.millis_to_datetime(int(data.get("timestamp_precise")))
attachments = [
_file.graphql_to_attachment(attachment)
for attachment in data["blob_attachments"] or ()
]
unsent = False
if data.get("extensible_attachment") is not None:
attachment = graphql_to_extensible_attachment(data["extensible_attachment"])
if isinstance(attachment, _attachment.UnsentMessage):
unsent = True
elif attachment:
attachments.append(attachment)
replied_to = None
if data.get("replied_to_message"):
replied_to = cls._from_graphql(data["replied_to_message"]["message"])
return cls(
text=data["message"].get("text"), text=data["message"].get("text"),
mentions=[ mentions=[
Mention( Mention(
@@ -215,107 +243,80 @@ class Message:
for m in data["message"].get("ranges") or () for m in data["message"].get("ranges") or ()
], ],
emoji_size=EmojiSize._from_tags(tags), emoji_size=EmojiSize._from_tags(tags),
uid=str(data["message_id"]),
author=str(data["message_sender"]["id"]),
created_at=created_at,
is_read=not data["unread"] if data.get("unread") is not None else None,
read_by=[
receipt["actor"]["id"]
for receipt in read_receipts or ()
if _util.millis_to_datetime(int(receipt["watermark"])) >= created_at
],
reactions={
str(r["user"]["id"]): MessageReaction._extend_if_invalid(r["reaction"])
for r in data["message_reactions"]
},
sticker=_sticker.Sticker._from_graphql(data.get("sticker")), sticker=_sticker.Sticker._from_graphql(data.get("sticker")),
attachments=attachments,
quick_replies=cls._parse_quick_replies(data.get("platform_xmd_encoded")),
unsent=unsent,
reply_to_id=replied_to.uid if replied_to else None,
replied_to=replied_to,
forwarded=cls._get_forwarded_from_tags(tags),
) )
rtn.forwarded = cls._get_forwarded_from_tags(tags)
rtn.uid = str(data["message_id"])
rtn.author = str(data["message_sender"]["id"])
rtn.created_at = _util.millis_to_datetime(int(data.get("timestamp_precise")))
rtn.unsent = False
if data.get("unread") is not None:
rtn.is_read = not data["unread"]
rtn.reactions = {
str(r["user"]["id"]): MessageReaction._extend_if_invalid(r["reaction"])
for r in data["message_reactions"]
}
if data.get("blob_attachments") is not None:
rtn.attachments = [
_file.graphql_to_attachment(attachment)
for attachment in data["blob_attachments"]
]
if data.get("platform_xmd_encoded"):
quick_replies = json.loads(data["platform_xmd_encoded"]).get(
"quick_replies"
)
if isinstance(quick_replies, list):
rtn.quick_replies = [
_quick_reply.graphql_to_quick_reply(q) for q in quick_replies
]
elif isinstance(quick_replies, dict):
rtn.quick_replies = [
_quick_reply.graphql_to_quick_reply(quick_replies, is_response=True)
]
if data.get("extensible_attachment") is not None:
attachment = graphql_to_extensible_attachment(data["extensible_attachment"])
if isinstance(attachment, _attachment.UnsentMessage):
rtn.unsent = True
elif attachment:
rtn.attachments.append(attachment)
if data.get("replied_to_message") is not None:
rtn.replied_to = cls._from_graphql(data["replied_to_message"]["message"])
rtn.reply_to_id = rtn.replied_to.uid
return rtn
@classmethod @classmethod
def _from_reply(cls, data): def _from_reply(cls, data, replied_to=None):
tags = data["messageMetadata"].get("tags") tags = data["messageMetadata"].get("tags")
rtn = cls( metadata = data.get("messageMetadata", {})
attachments = []
unsent = False
sticker = None
for attachment in data.get("attachments") or ():
attachment = json.loads(attachment["mercuryJSON"])
if attachment.get("blob_attachment"):
attachments.append(
_file.graphql_to_attachment(attachment["blob_attachment"])
)
if attachment.get("extensible_attachment"):
extensible_attachment = graphql_to_extensible_attachment(
attachment["extensible_attachment"]
)
if isinstance(extensible_attachment, _attachment.UnsentMessage):
unsent = True
else:
attachments.append(extensible_attachment)
if attachment.get("sticker_attachment"):
sticker = _sticker.Sticker._from_graphql(
attachment["sticker_attachment"]
)
return cls(
text=data.get("body"), text=data.get("body"),
mentions=[ mentions=[
Mention(m.get("i"), offset=m.get("o"), length=m.get("l")) Mention(m.get("i"), offset=m.get("o"), length=m.get("l"))
for m in json.loads(data.get("data", {}).get("prng", "[]")) for m in json.loads(data.get("data", {}).get("prng", "[]"))
], ],
emoji_size=EmojiSize._from_tags(tags), emoji_size=EmojiSize._from_tags(tags),
uid=metadata.get("messageId"),
author=str(metadata.get("actorFbId")),
created_at=_util.millis_to_datetime(metadata.get("timestamp")),
sticker=sticker,
attachments=attachments,
quick_replies=cls._parse_quick_replies(data.get("platform_xmd_encoded")),
unsent=unsent,
reply_to_id=replied_to.uid if replied_to else None,
replied_to=replied_to,
forwarded=cls._get_forwarded_from_tags(tags),
) )
metadata = data.get("messageMetadata", {})
rtn.forwarded = cls._get_forwarded_from_tags(tags)
rtn.uid = metadata.get("messageId")
rtn.author = str(metadata.get("actorFbId"))
rtn.created_at = _util.millis_to_datetime(metadata.get("timestamp"))
rtn.unsent = False
if data.get("data", {}).get("platform_xmd"):
quick_replies = json.loads(data["data"]["platform_xmd"]).get(
"quick_replies"
)
if isinstance(quick_replies, list):
rtn.quick_replies = [
_quick_reply.graphql_to_quick_reply(q) for q in quick_replies
]
elif isinstance(quick_replies, dict):
rtn.quick_replies = [
_quick_reply.graphql_to_quick_reply(quick_replies, is_response=True)
]
if data.get("attachments") is not None:
for attachment in data["attachments"]:
attachment = json.loads(attachment["mercuryJSON"])
if attachment.get("blob_attachment"):
rtn.attachments.append(
_file.graphql_to_attachment(attachment["blob_attachment"])
)
if attachment.get("extensible_attachment"):
extensible_attachment = graphql_to_extensible_attachment(
attachment["extensible_attachment"]
)
if isinstance(extensible_attachment, _attachment.UnsentMessage):
rtn.unsent = True
else:
rtn.attachments.append(extensible_attachment)
if attachment.get("sticker_attachment"):
rtn.sticker = _sticker.Sticker._from_graphql(
attachment["sticker_attachment"]
)
return rtn
@classmethod @classmethod
def _from_pull(cls, data, mid=None, tags=None, author=None, created_at=None): def _from_pull(cls, data, mid=None, tags=None, author=None, created_at=None):
rtn = cls(text=data.get("body")) mentions = []
rtn.uid = mid
rtn.author = author
rtn.created_at = created_at
if data.get("data") and data["data"].get("prng"): if data.get("data") and data["data"].get("prng"):
try: try:
rtn.mentions = [ mentions = [
Mention( Mention(
str(mention.get("i")), str(mention.get("i")),
offset=mention.get("o"), offset=mention.get("o"),
@@ -326,50 +327,53 @@ class Message:
except Exception: except Exception:
log.exception("An exception occured while reading attachments") log.exception("An exception occured while reading attachments")
if data.get("attachments"): attachments = []
try: unsent = False
for a in data["attachments"]: sticker = None
mercury = a["mercury"] try:
if mercury.get("blob_attachment"): for a in data.get("attachments") or ():
image_metadata = a.get("imageMetadata", {}) mercury = a["mercury"]
attach_type = mercury["blob_attachment"]["__typename"] if mercury.get("blob_attachment"):
attachment = _file.graphql_to_attachment( image_metadata = a.get("imageMetadata", {})
mercury["blob_attachment"] attach_type = mercury["blob_attachment"]["__typename"]
) attachment = _file.graphql_to_attachment(
mercury["blob_attachment"], a["fileSize"]
if attach_type in [
"MessageFile",
"MessageVideo",
"MessageAudio",
]:
# TODO: Add more data here for audio files
attachment.size = int(a["fileSize"])
rtn.attachments.append(attachment)
elif mercury.get("sticker_attachment"):
rtn.sticker = _sticker.Sticker._from_graphql(
mercury["sticker_attachment"]
)
elif mercury.get("extensible_attachment"):
attachment = graphql_to_extensible_attachment(
mercury["extensible_attachment"]
)
if isinstance(attachment, _attachment.UnsentMessage):
rtn.unsent = True
elif attachment:
rtn.attachments.append(attachment)
except Exception:
log.exception(
"An exception occured while reading attachments: {}".format(
data["attachments"]
) )
) attachments.append(attachment)
rtn.emoji_size = EmojiSize._from_tags(tags) elif mercury.get("sticker_attachment"):
rtn.forwarded = cls._get_forwarded_from_tags(tags) sticker = _sticker.Sticker._from_graphql(
return rtn mercury["sticker_attachment"]
)
elif mercury.get("extensible_attachment"):
attachment = graphql_to_extensible_attachment(
mercury["extensible_attachment"]
)
if isinstance(attachment, _attachment.UnsentMessage):
unsent = True
elif attachment:
attachments.append(attachment)
except Exception:
log.exception(
"An exception occured while reading attachments: {}".format(
data["attachments"]
)
)
return cls(
text=data.get("body"),
mentions=mentions,
emoji_size=EmojiSize._from_tags(tags),
uid=mid,
author=author,
created_at=created_at,
sticker=sticker,
attachments=attachments,
unsent=unsent,
forwarded=cls._get_forwarded_from_tags(tags),
)
def graphql_to_extensible_attachment(data): def graphql_to_extensible_attachment(data):

View File

@@ -58,44 +58,41 @@ class Plan:
@classmethod @classmethod
def _from_pull(cls, data): def _from_pull(cls, data):
rtn = cls( return cls(
uid=data.get("event_id"),
time=_util.seconds_to_datetime(int(data.get("event_time"))), time=_util.seconds_to_datetime(int(data.get("event_time"))),
title=data.get("event_title"), title=data.get("event_title"),
location=data.get("event_location_name"), location=data.get("event_location_name"),
location_id=data.get("event_location_id"), location_id=data.get("event_location_id"),
author_id=data.get("event_creator_id"),
guests={
x["node"]["id"]: GuestStatus[x["guest_list_state"]]
for x in json.loads(data["guest_state_list"])
},
) )
rtn.uid = data.get("event_id")
rtn.author_id = data.get("event_creator_id")
rtn.guests = {
x["node"]["id"]: GuestStatus[x["guest_list_state"]]
for x in json.loads(data["guest_state_list"])
}
return rtn
@classmethod @classmethod
def _from_fetch(cls, data): def _from_fetch(cls, data):
rtn = cls( return cls(
uid=data.get("oid"),
time=_util.seconds_to_datetime(data.get("event_time")), time=_util.seconds_to_datetime(data.get("event_time")),
title=data.get("title"), title=data.get("title"),
location=data.get("location_name"), location=data.get("location_name"),
location_id=str(data["location_id"]) if data.get("location_id") else None, location_id=str(data["location_id"]) if data.get("location_id") else None,
author_id=data.get("creator_id"),
guests={id_: GuestStatus[s] for id_, s in data["event_members"].items()},
) )
rtn.uid = data.get("oid")
rtn.author_id = data.get("creator_id")
rtn.guests = {id_: GuestStatus[s] for id_, s in data["event_members"].items()}
return rtn
@classmethod @classmethod
def _from_graphql(cls, data): def _from_graphql(cls, data):
rtn = cls( return cls(
uid=data.get("id"),
time=_util.seconds_to_datetime(data.get("time")), time=_util.seconds_to_datetime(data.get("time")),
title=data.get("event_title"), title=data.get("event_title"),
location=data.get("location_name"), location=data.get("location_name"),
author_id=data["lightweight_event_creator"].get("id"),
guests={
x["node"]["id"]: GuestStatus[x["guest_list_state"]]
for x in data["event_reminder_members"]["edges"]
},
) )
rtn.uid = data.get("id")
rtn.author_id = data["lightweight_event_creator"].get("id")
rtn.guests = {
x["node"]["id"]: GuestStatus[x["guest_list_state"]]
for x in data["event_reminder_members"]["edges"]
}
return rtn

View File

@@ -9,7 +9,7 @@ class QuickReply:
#: Payload of the quick reply #: Payload of the quick reply
payload = attr.ib(None) payload = attr.ib(None)
#: External payload for responses #: External payload for responses
external_payload = attr.ib(None, init=False) external_payload = attr.ib(None)
#: Additional data #: Additional data
data = attr.ib(None) data = attr.ib(None)
#: Whether it's a response for a quick reply #: Whether it's a response for a quick reply

View File

@@ -33,17 +33,20 @@ class Sticker(Attachment):
def _from_graphql(cls, data): def _from_graphql(cls, data):
if not data: if not data:
return None return None
self = cls(uid=data["id"])
if data.get("pack"): return cls(
self.pack = data["pack"].get("id") uid=data["id"],
if data.get("sprite_image"): pack=data["pack"].get("id") if data.get("pack") else None,
self.is_animated = True is_animated=bool(data.get("sprite_image")),
self.medium_sprite_image = data["sprite_image"].get("uri") medium_sprite_image=data["sprite_image"].get("uri")
self.large_sprite_image = data["sprite_image_2x"].get("uri") if data.get("sprite_image")
self.frames_per_row = data.get("frames_per_row") else None,
self.frames_per_col = data.get("frames_per_column") large_sprite_image=data["sprite_image_2x"].get("uri")
self.frame_rate = data.get("frame_rate") if data.get("sprite_image_2x")
self.image = Image._from_url_or_none(data) else None,
if data.get("label"): frames_per_row=data.get("frames_per_row"),
self.label = data["label"] frames_per_col=data.get("frames_per_column"),
return self frame_rate=data.get("frame_rate"),
image=Image._from_url_or_none(data),
label=data["label"] if data.get("label") else None,
)

View File

@@ -3,13 +3,16 @@ from fbchat._plan import GuestStatus, Plan
def test_plan_properties(): def test_plan_properties():
plan = Plan(time=..., title=...) plan = Plan(
plan.guests = { time=...,
"1234": GuestStatus.INVITED, title=...,
"2345": GuestStatus.INVITED, guests={
"3456": GuestStatus.GOING, "1234": GuestStatus.INVITED,
"4567": GuestStatus.DECLINED, "2345": GuestStatus.INVITED,
} "3456": GuestStatus.GOING,
"4567": GuestStatus.DECLINED,
},
)
assert set(plan.invited) == {"1234", "2345"} assert set(plan.invited) == {"1234", "2345"}
assert plan.going == ["3456"] assert plan.going == ["3456"]
assert plan.declined == ["4567"] assert plan.declined == ["4567"]
@@ -32,19 +35,18 @@ def test_plan_from_pull():
'{"guest_list_state":"GOING","node":{"id":"4567"}}]' '{"guest_list_state":"GOING","node":{"id":"4567"}}]'
), ),
} }
plan = Plan( assert Plan(
uid="1111",
time=datetime.datetime(2017, 7, 14, 2, 40, tzinfo=datetime.timezone.utc), time=datetime.datetime(2017, 7, 14, 2, 40, tzinfo=datetime.timezone.utc),
title="abc", title="abc",
) author_id="1234",
plan.uid = "1111" guests={
plan.author_id = "1234" "1234": GuestStatus.INVITED,
plan.guests = { "2356": GuestStatus.INVITED,
"1234": GuestStatus.INVITED, "3456": GuestStatus.DECLINED,
"2356": GuestStatus.INVITED, "4567": GuestStatus.GOING,
"3456": GuestStatus.DECLINED, },
"4567": GuestStatus.GOING, ) == Plan._from_pull(data)
}
assert plan == Plan._from_pull(data)
def test_plan_from_fetch(): def test_plan_from_fetch():
@@ -90,21 +92,20 @@ def test_plan_from_fetch():
"4567": "GOING", "4567": "GOING",
}, },
} }
plan = Plan( assert Plan(
uid=1111,
time=datetime.datetime(2017, 7, 14, 2, 40, tzinfo=datetime.timezone.utc), time=datetime.datetime(2017, 7, 14, 2, 40, tzinfo=datetime.timezone.utc),
title="abc", title="abc",
location="", location="",
location_id="", location_id="",
) author_id=1234,
plan.uid = 1111 guests={
plan.author_id = 1234 "1234": GuestStatus.INVITED,
plan.guests = { "2356": GuestStatus.INVITED,
"1234": GuestStatus.INVITED, "3456": GuestStatus.DECLINED,
"2356": GuestStatus.INVITED, "4567": GuestStatus.GOING,
"3456": GuestStatus.DECLINED, },
"4567": GuestStatus.GOING, ) == Plan._from_fetch(data)
}
assert plan == Plan._from_fetch(data)
def test_plan_from_graphql(): def test_plan_from_graphql():
@@ -133,18 +134,17 @@ def test_plan_from_graphql():
] ]
}, },
} }
plan = Plan( assert Plan(
time=datetime.datetime(2017, 7, 14, 2, 40, tzinfo=datetime.timezone.utc), time=datetime.datetime(2017, 7, 14, 2, 40, tzinfo=datetime.timezone.utc),
title="abc", title="abc",
location="", location="",
location_id="", location_id="",
) uid="1111",
plan.uid = "1111" author_id="1234",
plan.author_id = "1234" guests={
plan.guests = { "1234": GuestStatus.INVITED,
"1234": GuestStatus.INVITED, "2356": GuestStatus.INVITED,
"2356": GuestStatus.INVITED, "3456": GuestStatus.DECLINED,
"3456": GuestStatus.DECLINED, "4567": GuestStatus.GOING,
"4567": GuestStatus.GOING, },
} ) == Plan._from_graphql(data)
assert plan == Plan._from_graphql(data)

View File

@@ -16,11 +16,9 @@ def test_from_graphql_normal():
uid="369239383222810", uid="369239383222810",
pack="227877430692340", pack="227877430692340",
is_animated=False, is_animated=False,
medium_sprite_image=None, frames_per_row=1,
large_sprite_image=None, frames_per_col=1,
frames_per_row=None, frame_rate=83,
frames_per_col=None,
frame_rate=None,
image=fbchat.Image( image=fbchat.Image(
url="https://scontent-arn2-1.xx.fbcdn.net/v/redacted.png", url="https://scontent-arn2-1.xx.fbcdn.net/v/redacted.png",
width=274, width=274,