Move pull delta parsing into _parseDelta (commit 1)

This commit is contained in:
Mads Marquart
2019-02-24 20:45:01 +01:00
parent 76171408cc
commit 80300cd160

View File

@@ -2526,6 +2526,607 @@ class Client(object):
self.seq = j.get("seq", "0")
return j
def _parseDelta(self, m):
def getThreadIdAndThreadType(msg_metadata):
"""Returns a tuple consisting of thread ID and thread type"""
id_thread = None
type_thread = None
if "threadFbId" in msg_metadata["threadKey"]:
id_thread = str(msg_metadata["threadKey"]["threadFbId"])
type_thread = ThreadType.GROUP
elif "otherUserFbId" in msg_metadata["threadKey"]:
id_thread = str(msg_metadata["threadKey"]["otherUserFbId"])
type_thread = ThreadType.USER
return id_thread, type_thread
delta = m["delta"]
delta_type = delta.get("type")
delta_class = delta.get("class")
metadata = delta.get("messageMetadata")
if metadata:
mid = metadata["messageId"]
author_id = str(metadata["actorFbId"])
ts = int(metadata.get("timestamp"))
# Added participants
if "addedParticipants" in delta:
added_ids = [str(x["userFbId"]) for x in delta["addedParticipants"]]
thread_id = str(metadata["threadKey"]["threadFbId"])
self.onPeopleAdded(
mid=mid,
added_ids=added_ids,
author_id=author_id,
thread_id=thread_id,
ts=ts,
msg=m,
)
# Left/removed participants
elif "leftParticipantFbId" in delta:
removed_id = str(delta["leftParticipantFbId"])
thread_id = str(metadata["threadKey"]["threadFbId"])
self.onPersonRemoved(
mid=mid,
removed_id=removed_id,
author_id=author_id,
thread_id=thread_id,
ts=ts,
msg=m,
)
# Color change
elif delta_type == "change_thread_theme":
new_color = graphql_color_to_enum(delta["untypedData"]["theme_color"])
thread_id, thread_type = getThreadIdAndThreadType(metadata)
self.onColorChange(
mid=mid,
author_id=author_id,
new_color=new_color,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
# Emoji change
elif delta_type == "change_thread_icon":
new_emoji = delta["untypedData"]["thread_icon"]
thread_id, thread_type = getThreadIdAndThreadType(metadata)
self.onEmojiChange(
mid=mid,
author_id=author_id,
new_emoji=new_emoji,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
# Thread title change
elif delta_class == "ThreadName":
new_title = delta["name"]
thread_id, thread_type = getThreadIdAndThreadType(metadata)
self.onTitleChange(
mid=mid,
author_id=author_id,
new_title=new_title,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
# Forced fetch
elif delta_class == "ForcedFetch":
mid = delta.get("messageId")
if mid is None:
self.onUnknownMesssageType(msg=m)
else:
thread_id = str(delta["threadKey"]["threadFbId"])
fetch_info = self._forcedFetch(thread_id, mid)
fetch_data = fetch_info["message"]
author_id = fetch_data["message_sender"]["id"]
ts = fetch_data["timestamp_precise"]
if fetch_data.get("__typename") == "ThreadImageMessage":
# Thread image change
image_metadata = fetch_data.get("image_with_metadata")
image_id = (
int(image_metadata["legacy_attachment_id"])
if image_metadata
else None
)
self.onImageChange(
mid=mid,
author_id=author_id,
new_image=image_id,
thread_id=thread_id,
thread_type=ThreadType.GROUP,
ts=ts,
msg=m,
)
# Nickname change
elif delta_type == "change_thread_nickname":
changed_for = str(delta["untypedData"]["participant_id"])
new_nickname = delta["untypedData"]["nickname"]
thread_id, thread_type = getThreadIdAndThreadType(metadata)
self.onNicknameChange(
mid=mid,
author_id=author_id,
changed_for=changed_for,
new_nickname=new_nickname,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
# Admin added or removed in a group thread
elif delta_type == "change_thread_admins":
thread_id, thread_type = getThreadIdAndThreadType(metadata)
target_id = delta["untypedData"]["TARGET_ID"]
admin_event = delta["untypedData"]["ADMIN_EVENT"]
if admin_event == "add_admin":
self.onAdminAdded(
mid=mid,
added_id=target_id,
author_id=author_id,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
)
elif admin_event == "remove_admin":
self.onAdminRemoved(
mid=mid,
removed_id=target_id,
author_id=author_id,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
)
# Group approval mode change
elif delta_type == "change_thread_approval_mode":
thread_id, thread_type = getThreadIdAndThreadType(metadata)
approval_mode = bool(int(delta["untypedData"]["APPROVAL_MODE"]))
self.onApprovalModeChange(
mid=mid,
approval_mode=approval_mode,
author_id=author_id,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
)
# Message delivered
elif delta_class == "DeliveryReceipt":
message_ids = delta["messageIds"]
delivered_for = str(
delta.get("actorFbId") or delta["threadKey"]["otherUserFbId"]
)
ts = int(delta["deliveredWatermarkTimestampMs"])
thread_id, thread_type = getThreadIdAndThreadType(delta)
self.onMessageDelivered(
msg_ids=message_ids,
delivered_for=delivered_for,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
# Message seen
elif delta_class == "ReadReceipt":
seen_by = str(delta.get("actorFbId") or delta["threadKey"]["otherUserFbId"])
seen_ts = int(delta["actionTimestampMs"])
delivered_ts = int(delta["watermarkTimestampMs"])
thread_id, thread_type = getThreadIdAndThreadType(delta)
self.onMessageSeen(
seen_by=seen_by,
thread_id=thread_id,
thread_type=thread_type,
seen_ts=seen_ts,
ts=delivered_ts,
metadata=metadata,
msg=m,
)
# Messages marked as seen
elif delta_class == "MarkRead":
seen_ts = int(
delta.get("actionTimestampMs") or delta.get("actionTimestamp")
)
delivered_ts = int(
delta.get("watermarkTimestampMs") or delta.get("watermarkTimestamp")
)
threads = []
if "folders" not in delta:
threads = [
getThreadIdAndThreadType({"threadKey": thr})
for thr in delta.get("threadKeys")
]
# thread_id, thread_type = getThreadIdAndThreadType(delta)
self.onMarkedSeen(
threads=threads, seen_ts=seen_ts, ts=delivered_ts, metadata=delta, msg=m
)
# Game played
elif delta_type == "instant_game_update":
game_id = delta["untypedData"]["game_id"]
game_name = delta["untypedData"]["game_name"]
score = delta["untypedData"].get("score")
if score is not None:
score = int(score)
leaderboard = delta["untypedData"].get("leaderboard")
if leaderboard is not None:
leaderboard = json.loads(leaderboard)["scores"]
thread_id, thread_type = getThreadIdAndThreadType(metadata)
self.onGamePlayed(
mid=mid,
author_id=author_id,
game_id=game_id,
game_name=game_name,
score=score,
leaderboard=leaderboard,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
# Group call started/ended
elif delta_type == "rtc_call_log":
thread_id, thread_type = getThreadIdAndThreadType(metadata)
call_status = delta["untypedData"]["event"]
call_duration = int(delta["untypedData"]["call_duration"])
is_video_call = bool(int(delta["untypedData"]["is_video_call"]))
if call_status == "call_started":
self.onCallStarted(
mid=mid,
caller_id=author_id,
is_video_call=is_video_call,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
elif call_status == "call_ended":
self.onCallEnded(
mid=mid,
caller_id=author_id,
is_video_call=is_video_call,
call_duration=call_duration,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
# User joined to group call
elif delta_type == "participant_joined_group_call":
thread_id, thread_type = getThreadIdAndThreadType(metadata)
is_video_call = bool(int(delta["untypedData"]["group_call_type"]))
self.onUserJoinedCall(
mid=mid,
joined_id=author_id,
is_video_call=is_video_call,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
# Group poll event
elif delta_type == "group_poll":
thread_id, thread_type = getThreadIdAndThreadType(metadata)
event_type = delta["untypedData"]["event_type"]
poll_json = json.loads(delta["untypedData"]["question_json"])
poll = graphql_to_poll(poll_json)
if event_type == "question_creation":
# User created group poll
self.onPollCreated(
mid=mid,
poll=poll,
author_id=author_id,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
elif event_type == "update_vote":
# User voted on group poll
added_options = json.loads(delta["untypedData"]["added_option_ids"])
removed_options = json.loads(delta["untypedData"]["removed_option_ids"])
self.onPollVoted(
mid=mid,
poll=poll,
added_options=added_options,
removed_options=removed_options,
author_id=author_id,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
# Plan created
elif delta_type == "lightweight_event_create":
thread_id, thread_type = getThreadIdAndThreadType(metadata)
plan = graphql_to_plan(delta["untypedData"])
self.onPlanCreated(
mid=mid,
plan=plan,
author_id=author_id,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
# Plan ended
elif delta_type == "lightweight_event_notify":
thread_id, thread_type = getThreadIdAndThreadType(metadata)
plan = graphql_to_plan(delta["untypedData"])
self.onPlanEnded(
mid=mid,
plan=plan,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
# Plan edited
elif delta_type == "lightweight_event_update":
thread_id, thread_type = getThreadIdAndThreadType(metadata)
plan = graphql_to_plan(delta["untypedData"])
self.onPlanEdited(
mid=mid,
plan=plan,
author_id=author_id,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
# Plan deleted
elif delta_type == "lightweight_event_delete":
thread_id, thread_type = getThreadIdAndThreadType(metadata)
plan = graphql_to_plan(delta["untypedData"])
self.onPlanDeleted(
mid=mid,
plan=plan,
author_id=author_id,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
# Plan participation change
elif delta_type == "lightweight_event_rsvp":
thread_id, thread_type = getThreadIdAndThreadType(metadata)
plan = graphql_to_plan(delta["untypedData"])
take_part = delta["untypedData"]["guest_status"] == "GOING"
self.onPlanParticipation(
mid=mid,
plan=plan,
take_part=take_part,
author_id=author_id,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
# Client payload (that weird numbers)
elif delta_class == "ClientPayload":
payload = json.loads("".join(chr(z) for z in delta["payload"]))
ts = m.get("ofd_ts")
for d in payload.get("deltas", []):
# Message reaction
if d.get("deltaMessageReaction"):
i = d["deltaMessageReaction"]
thread_id, thread_type = getThreadIdAndThreadType(i)
mid = i["messageId"]
author_id = str(i["userId"])
reaction = (
MessageReaction(i["reaction"]) if i.get("reaction") else None
)
add_reaction = not bool(i["action"])
if add_reaction:
self.onReactionAdded(
mid=mid,
reaction=reaction,
author_id=author_id,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
)
else:
self.onReactionRemoved(
mid=mid,
author_id=author_id,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
)
# Viewer status change
elif d.get("deltaChangeViewerStatus"):
i = d["deltaChangeViewerStatus"]
thread_id, thread_type = getThreadIdAndThreadType(i)
author_id = str(i["actorFbid"])
reason = i["reason"]
can_reply = i["canViewerReply"]
if reason == 2:
if can_reply:
self.onUnblock(
author_id=author_id,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
)
else:
self.onBlock(
author_id=author_id,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
)
# Live location info
elif d.get("liveLocationData"):
i = d["liveLocationData"]
thread_id, thread_type = getThreadIdAndThreadType(i)
for l in i["messageLiveLocations"]:
mid = l["messageId"]
author_id = str(l["senderId"])
location = graphql_to_live_location(l)
self.onLiveLocation(
mid=mid,
location=location,
author_id=author_id,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
)
# Message deletion
elif d.get("deltaRecallMessageData"):
i = d["deltaRecallMessageData"]
thread_id, thread_type = getThreadIdAndThreadType(i)
mid = i["messageID"]
ts = i["deletionTimestamp"]
author_id = str(i["senderID"])
self.onMessageUnsent(
mid=mid,
author_id=author_id,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
)
# New message
elif delta.get("class") == "NewMessage":
mentions = []
if delta.get("data") and delta["data"].get("prng"):
try:
mentions = [
Mention(
str(mention.get("i")),
offset=mention.get("o"),
length=mention.get("l"),
)
for mention in parse_json(delta["data"]["prng"])
]
except Exception:
log.exception("An exception occured while reading attachments")
sticker = None
attachments = []
unsent = False
if delta.get("attachments"):
try:
for a in delta["attachments"]:
mercury = a["mercury"]
if mercury.get("blob_attachment"):
image_metadata = a.get("imageMetadata", {})
attach_type = mercury["blob_attachment"]["__typename"]
attachment = graphql_to_attachment(
mercury["blob_attachment"]
)
if attach_type in [
"MessageFile",
"MessageVideo",
"MessageAudio",
]:
# TODO: Add more data here for audio files
attachment.size = int(a["fileSize"])
attachments.append(attachment)
elif mercury.get("sticker_attachment"):
sticker = graphql_to_sticker(mercury["sticker_attachment"])
elif mercury.get("extensible_attachment"):
attachment = graphql_to_extensible_attachment(
mercury["extensible_attachment"]
)
if isinstance(attachment, UnsentMessage):
unsent = True
elif attachment:
attachments.append(attachment)
except Exception:
log.exception(
"An exception occured while reading attachments: {}".format(
delta["attachments"]
)
)
if metadata and metadata.get("tags"):
emoji_size = get_emojisize_from_tags(metadata.get("tags"))
message = Message(
text=delta.get("body"),
mentions=mentions,
emoji_size=emoji_size,
sticker=sticker,
attachments=attachments,
)
message.uid = mid
message.author = author_id
message.timestamp = ts
# message.reactions = {}
message.unsent = unsent
thread_id, thread_type = getThreadIdAndThreadType(metadata)
self.onMessage(
mid=mid,
author_id=author_id,
message=delta.get("body", ""),
message_object=message,
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
)
# Unknown message type
else:
self.onUnknownMesssageType(msg=m)
def _parseMessage(self, content):
"""Get message and author name from content. May contain multiple messages in the content."""