From b4618739f30d9f227070c140297406fa746f78da Mon Sep 17 00:00:00 2001 From: Mads Marquart Date: Tue, 21 Jan 2020 18:51:32 +0100 Subject: [PATCH] Fix MQTT errors after being offline for too long --- fbchat/_mqtt.py | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/fbchat/_mqtt.py b/fbchat/_mqtt.py index 3b4f5f1..9bb361a 100644 --- a/fbchat/_mqtt.py +++ b/fbchat/_mqtt.py @@ -79,6 +79,8 @@ class Mqtt(object): log.exception("Failed parsing MQTT data on %s as JSON", message.topic) return + log.debug("MQTT payload: %s, %s", message.topic, j) + if message.topic == "/t_ms": # Update sync_token when received # This is received in the first message after we've created a messenger @@ -86,18 +88,31 @@ class Mqtt(object): if "syncToken" in j and "firstDeltaSeqId" in j: self._sync_token = j["syncToken"] self._sequence_id = j["firstDeltaSeqId"] + return # Update last sequence id when received if "lastIssuedSeqId" in j: self._sequence_id = j["lastIssuedSeqId"] if "errorCode" in j: - # Known types: ERROR_QUEUE_OVERFLOW | ERROR_QUEUE_NOT_FOUND - # 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00' - log.error("MQTT error code %s received", j["errorCode"]) - # TODO: Consider resetting the sync_token and sequence ID here? - - log.debug("MQTT payload: %s, %s", message.topic, j) + error = j["errorCode"] + # TODO: 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00' + if error in ("ERROR_QUEUE_NOT_FOUND", "ERROR_QUEUE_OVERFLOW"): + # ERROR_QUEUE_NOT_FOUND means that the queue was deleted, since too + # much time passed, or that it was simply missing + # ERROR_QUEUE_OVERFLOW means that the sequence id was too small, so + # the desired events could not be retrieved + log.error( + "The MQTT listener was disconnected for too long," + " events may have been lost" + ) + self._sync_token = None + self._sequence_id = self._fetch_sequence_id(self._state) + self._messenger_queue_publish() + # TODO: Signal to the user that they should reload their data! + return + log.error("MQTT error code %s received", error) + return # Call the external callback self._on_message(message.topic, j) @@ -129,6 +144,9 @@ class Mqtt(object): if rc != 0: return # Don't try to send publish if the connection failed + self._messenger_queue_publish() + + def _messenger_queue_publish(self): # configure receiving messages. payload = { "sync_api_version": 10,