|
|
|
@@ -79,6 +79,8 @@ class Mqtt(object):
|
|
|
|
|
log.exception("Failed parsing MQTT data on %s as JSON", message.topic)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
log.debug("MQTT payload: %s, %s", message.topic, j)
|
|
|
|
|
|
|
|
|
|
if message.topic == "/t_ms":
|
|
|
|
|
# Update sync_token when received
|
|
|
|
|
# This is received in the first message after we've created a messenger
|
|
|
|
@@ -86,18 +88,31 @@ class Mqtt(object):
|
|
|
|
|
if "syncToken" in j and "firstDeltaSeqId" in j:
|
|
|
|
|
self._sync_token = j["syncToken"]
|
|
|
|
|
self._sequence_id = j["firstDeltaSeqId"]
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
# Update last sequence id when received
|
|
|
|
|
if "lastIssuedSeqId" in j:
|
|
|
|
|
self._sequence_id = j["lastIssuedSeqId"]
|
|
|
|
|
|
|
|
|
|
if "errorCode" in j:
|
|
|
|
|
# Known types: ERROR_QUEUE_OVERFLOW | ERROR_QUEUE_NOT_FOUND
|
|
|
|
|
# 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00'
|
|
|
|
|
log.error("MQTT error code %s received", j["errorCode"])
|
|
|
|
|
# TODO: Consider resetting the sync_token and sequence ID here?
|
|
|
|
|
|
|
|
|
|
log.debug("MQTT payload: %s, %s", message.topic, j)
|
|
|
|
|
error = j["errorCode"]
|
|
|
|
|
# TODO: 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00'
|
|
|
|
|
if error in ("ERROR_QUEUE_NOT_FOUND", "ERROR_QUEUE_OVERFLOW"):
|
|
|
|
|
# ERROR_QUEUE_NOT_FOUND means that the queue was deleted, since too
|
|
|
|
|
# much time passed, or that it was simply missing
|
|
|
|
|
# ERROR_QUEUE_OVERFLOW means that the sequence id was too small, so
|
|
|
|
|
# the desired events could not be retrieved
|
|
|
|
|
log.error(
|
|
|
|
|
"The MQTT listener was disconnected for too long,"
|
|
|
|
|
" events may have been lost"
|
|
|
|
|
)
|
|
|
|
|
self._sync_token = None
|
|
|
|
|
self._sequence_id = self._fetch_sequence_id(self._state)
|
|
|
|
|
self._messenger_queue_publish()
|
|
|
|
|
# TODO: Signal to the user that they should reload their data!
|
|
|
|
|
return
|
|
|
|
|
log.error("MQTT error code %s received", error)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
# Call the external callback
|
|
|
|
|
self._on_message(message.topic, j)
|
|
|
|
@@ -115,11 +130,10 @@ class Mqtt(object):
|
|
|
|
|
log.debug("Fetching MQTT sequence ID")
|
|
|
|
|
# Same request as in `Client.fetchThreadList`
|
|
|
|
|
(j,) = state._graphql_requests(_graphql.from_doc_id("1349387578499440", params))
|
|
|
|
|
try:
|
|
|
|
|
return int(j["viewer"]["message_threads"]["sync_sequence_id"])
|
|
|
|
|
except (KeyError, ValueError):
|
|
|
|
|
# TODO: Proper exceptions
|
|
|
|
|
raise
|
|
|
|
|
sequence_id = j["viewer"]["message_threads"]["sync_sequence_id"]
|
|
|
|
|
if not sequence_id:
|
|
|
|
|
raise _exception.FBchatNotLoggedIn("Failed fetching sequence id")
|
|
|
|
|
return int(sequence_id)
|
|
|
|
|
|
|
|
|
|
def _on_connect_handler(self, client, userdata, flags, rc):
|
|
|
|
|
if rc == 21:
|
|
|
|
@@ -129,6 +143,9 @@ class Mqtt(object):
|
|
|
|
|
if rc != 0:
|
|
|
|
|
return # Don't try to send publish if the connection failed
|
|
|
|
|
|
|
|
|
|
self._messenger_queue_publish()
|
|
|
|
|
|
|
|
|
|
def _messenger_queue_publish(self):
|
|
|
|
|
# configure receiving messages.
|
|
|
|
|
payload = {
|
|
|
|
|
"sync_api_version": 10,
|
|
|
|
@@ -171,6 +188,10 @@ class Mqtt(object):
|
|
|
|
|
"/br_sr",
|
|
|
|
|
# Response to /br_sr
|
|
|
|
|
"/sr_res",
|
|
|
|
|
# Data about user-to-user calls
|
|
|
|
|
# TODO: Investigate the response from this! (A bunch of binary data)
|
|
|
|
|
# "/t_rtc",
|
|
|
|
|
# TODO: Find out what this does!
|
|
|
|
|
# TODO: Investigate the response from this! (A bunch of binary data)
|
|
|
|
|
# "/t_p",
|
|
|
|
|
# TODO: Find out what this does!
|
|
|
|
@@ -186,7 +207,6 @@ class Mqtt(object):
|
|
|
|
|
"/messaging_events",
|
|
|
|
|
"/orca_message_notifications",
|
|
|
|
|
"/pp",
|
|
|
|
|
"/t_rtc",
|
|
|
|
|
"/webrtc_response",
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
@@ -266,6 +286,8 @@ class Mqtt(object):
|
|
|
|
|
# This error is wrongly classified
|
|
|
|
|
# See https://github.com/eclipse/paho.mqtt.python/issues/340
|
|
|
|
|
log.warning("Connection error, retrying")
|
|
|
|
|
elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED:
|
|
|
|
|
raise _exception.FBchatNotLoggedIn("MQTT connection refused")
|
|
|
|
|
else:
|
|
|
|
|
err = paho.mqtt.client.error_string(rc)
|
|
|
|
|
log.error("MQTT Error: %s", err)
|
|
|
|
|