Fix MQTT errors after being offline for too long
This commit is contained in:
@@ -79,6 +79,8 @@ class Mqtt(object):
|
|||||||
log.exception("Failed parsing MQTT data on %s as JSON", message.topic)
|
log.exception("Failed parsing MQTT data on %s as JSON", message.topic)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
log.debug("MQTT payload: %s, %s", message.topic, j)
|
||||||
|
|
||||||
if message.topic == "/t_ms":
|
if message.topic == "/t_ms":
|
||||||
# Update sync_token when received
|
# Update sync_token when received
|
||||||
# This is received in the first message after we've created a messenger
|
# This is received in the first message after we've created a messenger
|
||||||
@@ -86,18 +88,31 @@ class Mqtt(object):
|
|||||||
if "syncToken" in j and "firstDeltaSeqId" in j:
|
if "syncToken" in j and "firstDeltaSeqId" in j:
|
||||||
self._sync_token = j["syncToken"]
|
self._sync_token = j["syncToken"]
|
||||||
self._sequence_id = j["firstDeltaSeqId"]
|
self._sequence_id = j["firstDeltaSeqId"]
|
||||||
|
return
|
||||||
|
|
||||||
# Update last sequence id when received
|
# Update last sequence id when received
|
||||||
if "lastIssuedSeqId" in j:
|
if "lastIssuedSeqId" in j:
|
||||||
self._sequence_id = j["lastIssuedSeqId"]
|
self._sequence_id = j["lastIssuedSeqId"]
|
||||||
|
|
||||||
if "errorCode" in j:
|
if "errorCode" in j:
|
||||||
# Known types: ERROR_QUEUE_OVERFLOW | ERROR_QUEUE_NOT_FOUND
|
error = j["errorCode"]
|
||||||
# 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00'
|
# TODO: 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00'
|
||||||
log.error("MQTT error code %s received", j["errorCode"])
|
if error in ("ERROR_QUEUE_NOT_FOUND", "ERROR_QUEUE_OVERFLOW"):
|
||||||
# TODO: Consider resetting the sync_token and sequence ID here?
|
# ERROR_QUEUE_NOT_FOUND means that the queue was deleted, since too
|
||||||
|
# much time passed, or that it was simply missing
|
||||||
log.debug("MQTT payload: %s, %s", message.topic, j)
|
# ERROR_QUEUE_OVERFLOW means that the sequence id was too small, so
|
||||||
|
# the desired events could not be retrieved
|
||||||
|
log.error(
|
||||||
|
"The MQTT listener was disconnected for too long,"
|
||||||
|
" events may have been lost"
|
||||||
|
)
|
||||||
|
self._sync_token = None
|
||||||
|
self._sequence_id = self._fetch_sequence_id(self._state)
|
||||||
|
self._messenger_queue_publish()
|
||||||
|
# TODO: Signal to the user that they should reload their data!
|
||||||
|
return
|
||||||
|
log.error("MQTT error code %s received", error)
|
||||||
|
return
|
||||||
|
|
||||||
# Call the external callback
|
# Call the external callback
|
||||||
self._on_message(message.topic, j)
|
self._on_message(message.topic, j)
|
||||||
@@ -129,6 +144,9 @@ class Mqtt(object):
|
|||||||
if rc != 0:
|
if rc != 0:
|
||||||
return # Don't try to send publish if the connection failed
|
return # Don't try to send publish if the connection failed
|
||||||
|
|
||||||
|
self._messenger_queue_publish()
|
||||||
|
|
||||||
|
def _messenger_queue_publish(self):
|
||||||
# configure receiving messages.
|
# configure receiving messages.
|
||||||
payload = {
|
payload = {
|
||||||
"sync_api_version": 10,
|
"sync_api_version": 10,
|
||||||
|
Reference in New Issue
Block a user