Merge branch 'v1'

This commit is contained in:
Mads Marquart
2020-01-21 22:04:22 +01:00
6 changed files with 98 additions and 18 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.9.4
current_version = 1.9.6
commit = True
tag = True

View File

@@ -59,6 +59,7 @@ from ._delta_class import (
MessagesDelivered,
ThreadsRead,
MessageEvent,
ThreadFolder,
)
from ._delta_type import (
ColorSet,
@@ -84,7 +85,7 @@ from ._mqtt import Listener
from ._client import Client
__title__ = "fbchat"
__version__ = "1.9.4"
__version__ = "1.9.6"
__description__ = "Facebook Chat (Messenger) for Python"
__copyright__ = "Copyright 2015 - 2019 by Taehoon Kim"

View File

@@ -3,7 +3,7 @@ import datetime
from ._event_common import attrs_event, Event, UnknownEvent, ThreadEvent
from . import _util, _user, _group, _thread, _message
from typing import Sequence
from typing import Sequence, Optional
@attrs_event
@@ -79,12 +79,14 @@ class UnfetchedThreadEvent(Event):
#: The thread the message was sent to
thread = attr.ib(type=_thread.ThreadABC)
#: The message
message = attr.ib(type=_message.Message)
message = attr.ib(type=Optional[_message.Message])
@classmethod
def _parse(cls, session, data):
thread = ThreadEvent._get_thread(session, data)
message = _message.Message(thread=thread, id=data["messageId"])
message = None
if "messageId" in data:
message = _message.Message(thread=thread, id=data["messageId"])
return cls(thread=thread, message=message)
@@ -156,6 +158,28 @@ class MessageEvent(ThreadEvent):
return cls(author=author, thread=thread, message=message, at=at)
@attrs_event
class ThreadFolder(Event):
"""A thread was created in a folder.
Somebody that isn't connected with you on either Facebook or Messenger sends a
message. After that, you need to use `ThreadABC.fetch_messages` to actually read it.
"""
# TODO: Finish this
#: The created thread
thread = attr.ib(type=_thread.ThreadABC)
#: The folder/location
folder = attr.ib(type=_thread.ThreadLocation)
@classmethod
def _parse(cls, session, data):
thread = ThreadEvent._get_thread(session, data)
folder = _thread.ThreadLocation._parse(data["folder"])
return cls(thread=thread, folder=folder)
def parse_delta(session, data):
class_ = data.get("class")
if class_ == "ParticipantsAddedToGroupThread":
@@ -164,10 +188,7 @@ def parse_delta(session, data):
return PersonRemoved._parse(session, data)
elif class_ == "MarkFolderSeen":
# TODO: Finish this
folders = [
_thread.ThreadLocation(folder.lstrip("FOLDER_"))
for folder in data["folders"]
]
folders = [_thread.ThreadLocation._parse(folder) for folder in data["folders"]]
at = _util.millis_to_datetime(int(data["timestamp"]))
return None
elif class_ == "ThreadName":
@@ -187,4 +208,6 @@ def parse_delta(session, data):
return X._parse(session, data)
elif class_ == "NewMessage":
return MessageEvent._parse(session, data)
elif class_ == "ThreadFolder":
return ThreadFolder._parse(session, data)
return UnknownEvent(source="Delta class", data=data)

View File

@@ -99,6 +99,8 @@ class Listener:
log.exception("Failed parsing MQTT data on %s as JSON", message.topic)
return
log.debug("MQTT payload: %s, %s", message.topic, j)
if message.topic == "/t_ms":
# Update sync_token when received
# This is received in the first message after we've created a messenger
@@ -106,18 +108,33 @@ class Listener:
if "syncToken" in j and "firstDeltaSeqId" in j:
self._sync_token = j["syncToken"]
self._sequence_id = j["firstDeltaSeqId"]
return
if "errorCode" in j:
error = j["errorCode"]
# TODO: 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00'
if error in ("ERROR_QUEUE_NOT_FOUND", "ERROR_QUEUE_OVERFLOW"):
# ERROR_QUEUE_NOT_FOUND means that the queue was deleted, since too
# much time passed, or that it was simply missing
# ERROR_QUEUE_OVERFLOW means that the sequence id was too small, so
# the desired events could not be retrieved
log.error(
"The MQTT listener was disconnected for too long,"
" events may have been lost"
)
self._sync_token = None
self._sequence_id = self._fetch_sequence_id(self._session)
self._messenger_queue_publish()
# TODO: Signal to the user that they should reload their data!
return
log.error("MQTT error code %s received", error)
return
# Update last sequence id when received
if "lastIssuedSeqId" in j:
self._sequence_id = j["lastIssuedSeqId"]
if "errorCode" in j:
# Known types: ERROR_QUEUE_OVERFLOW | ERROR_QUEUE_NOT_FOUND
# 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00'
log.error("MQTT error code %s received", j["errorCode"])
# TODO: Consider resetting the sync_token and sequence ID here?
log.debug("MQTT payload: %s, %s", message.topic, j)
else:
log.error("Missing last sequence id: %s", j)
try:
# TODO: Don't handle this in a callback
@@ -153,6 +170,9 @@ class Listener:
if rc != 0:
return # Don't try to send publish if the connection failed
self._messenger_queue_publish()
def _messenger_queue_publish(self):
# configure receiving messages.
payload = {
"sync_api_version": 10,
@@ -195,6 +215,10 @@ class Listener:
"/br_sr",
# Response to /br_sr
"/sr_res",
# Data about user-to-user calls
# TODO: Investigate the response from this! (A bunch of binary data)
# "/t_rtc",
# TODO: Find out what this does!
# TODO: Investigate the response from this! (A bunch of binary data)
# "/t_p",
# TODO: Find out what this does!
@@ -210,7 +234,6 @@ class Listener:
"/messaging_events",
"/orca_message_notifications",
"/pp",
"/t_rtc",
"/webrtc_response",
]

View File

@@ -16,6 +16,10 @@ class ThreadLocation(enum.Enum):
ARCHIVED = "ARCHIVED"
OTHER = "OTHER"
@classmethod
def _parse(cls, value: str):
return cls(value.lstrip("FOLDER_"))
DEFAULT_COLOR = "#0084ff"
SETABLE_COLORS = (

View File

@@ -4,6 +4,7 @@ from fbchat import (
ParseError,
User,
Group,
ThreadLocation,
UnknownEvent,
PeopleAdded,
PersonRemoved,
@@ -12,6 +13,7 @@ from fbchat import (
MessagesDelivered,
ThreadsRead,
MessageEvent,
ThreadFolder,
)
from fbchat._message import Message, MessageData
from fbchat._delta_class import parse_delta
@@ -133,6 +135,19 @@ def test_forced_fetch(session):
) == parse_delta(session, data)
def test_forced_fetch_pending(session):
data = {
"forceInsert": False,
"irisSeqId": "1111",
"isLazy": False,
"threadKey": {"threadFbId": "1234"},
"class": "ForcedFetch",
}
assert UnfetchedThreadEvent(
thread=Group(session=session, id="1234"), message=None
) == parse_delta(session, data)
def test_delivery_receipt_group(session):
data = {
"actorFbId": "1234",
@@ -289,6 +304,20 @@ def test_new_message_group(session):
) == parse_delta(session, data)
def test_thread_folder(session):
data = {
"class": "ThreadFolder",
"folder": "FOLDER_PENDING",
"irisSeqId": "1111",
"irisTags": ["DeltaThreadFolder", "is_from_iris_fanout"],
"requestContext": {"apiArgs": {}},
"threadKey": {"otherUserFbId": "1234"},
}
assert ThreadFolder(
thread=User(session=session, id="1234"), folder=ThreadLocation.PENDING
) == parse_delta(session, data)
def test_noop(session):
assert parse_delta(session, {"class": "NoOp"}) is None