Compare commits

..

13 Commits

Author SHA1 Message Date
Mads Marquart
b9b4d57b25 Bump version: 1.9.5 → 1.9.6 2020-01-21 19:50:57 +01:00
Mads Marquart
b4618739f3 Fix MQTT errors after being offline for too long 2020-01-21 19:39:59 +01:00
Mads Marquart
22c6c82c0e Disable /t_rtc MQTT topic 2020-01-20 14:54:25 +01:00
Mads Marquart
19c875c18a Bump version: 1.9.4 → 1.9.5 2020-01-20 09:32:30 +01:00
Mateusz Soszyński
12bbc0058c Add onPendingMessage (#512) 2020-01-20 09:28:41 +01:00
Mads Marquart
9c81806b95 Bump version: 1.9.3 → 1.9.4 2020-01-14 23:29:58 +01:00
Mads Marquart
45303005b8 Fix onFriendRequest 2020-01-14 23:27:50 +01:00
Mads Marquart
881aa9adce Bump version: 1.9.2 → 1.9.3 2020-01-08 09:38:18 +01:00
Mads Marquart
4714be5697 Fix MQTT JSON decoding 2020-01-08 09:35:26 +01:00
Mads Marquart
cb7f4a72d7 Bump version: 1.9.1 → 1.9.2 2020-01-08 08:47:16 +01:00
Mads Marquart
fb63ff0db8 Fix cookie header extraction
Only worked when the cookies were loaded from file, hence the reason I
didn't spot it the first time. Thanks to @gave92 for the suggestion.

Fixes #495
2020-01-08 08:46:22 +01:00
Mads Marquart
c5f447e20b Bump version: 1.9.0 → 1.9.1 2020-01-06 13:23:39 +01:00
Mads Marquart
b4d3769fd5 Fix MQTT error handling
- Fix "Out of memory" errors
- Fix typo
2020-01-06 13:14:07 +01:00
5 changed files with 103 additions and 30 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.9.0
current_version = 1.9.6
commit = True
tag = True

View File

@@ -13,7 +13,7 @@ from ._client import Client
from ._util import log # TODO: Remove this (from examples too)
__title__ = "fbchat"
__version__ = "1.9.0"
__version__ = "1.9.6"
__description__ = "Facebook Chat (Messenger) for Python"
__copyright__ = "Copyright 2015 - 2019 by Taehoon Kim"

View File

@@ -2271,7 +2271,17 @@ class Client(object):
elif delta_class == "ForcedFetch":
mid = delta.get("messageId")
if mid is None:
self.onUnknownMesssageType(msg=delta)
if delta["threadKey"] is not None:
# Looks like the whole delta is metadata in this case
thread_id, thread_type = getThreadIdAndThreadType(delta)
self.onPendingMessage(
thread_id=thread_id,
thread_type=thread_type,
metadata=delta,
msg=delta,
)
else:
self.onUnknownMesssageType(msg=delta)
else:
thread_id = str(delta["threadKey"]["threadFbId"])
fetch_info = self._forcedFetch(thread_id, mid)
@@ -2727,6 +2737,14 @@ class Client(object):
msg=delta,
)
# New pending message
elif delta_class == "ThreadFolder" and delta.get("folder") == "FOLDER_PENDING":
# Looks like the whole delta is metadata in this case
thread_id, thread_type = getThreadIdAndThreadType(delta)
self.onPendingMessage(
thread_id=thread_id, thread_type=thread_type, metadata=delta, msg=delta
)
# Unknown message type
else:
self.onUnknownMesssageType(msg=delta)
@@ -2768,9 +2786,15 @@ class Client(object):
msg=m,
)
elif topic == "jewel_requests_add":
from_id = m["from"]
self.onFriendRequest(from_id=from_id, msg=m)
# Other notifications
elif topic == "/legacy_web":
# Friend request
if m["type"] == "jewel_requests_add":
from_id = m["from"]
# TODO: from_id = str(from_id)
self.onFriendRequest(from_id=from_id, msg=m)
else:
self.onUnknownMesssageType(msg=m)
# Chat timestamp / Buddylist overlay
elif topic == "/orca_presence":
@@ -2835,7 +2859,7 @@ class Client(object):
self._mqtt.set_chat_on(self._markAlive)
# TODO: Remove on_error param
return self._mqtt.loop_once(on_error=self.onListenError)
return self._mqtt.loop_once(on_error=lambda e: self.onListenError(exception=e))
def stopListening(self):
"""Stop the listening loop."""
@@ -2943,6 +2967,21 @@ class Client(object):
"""
log.info("{} from {} in {}".format(message_object, thread_id, thread_type.name))
def onPendingMessage(
self, thread_id=None, thread_type=None, metadata=None, msg=None
):
"""Called when the client is listening, and somebody that isn't
connected with you on either Facebook or Messenger sends a message.
After that, you need to use fetchThreadList to actually read the message.
Args:
thread_id: Thread ID that the message was sent to. See :ref:`intro_threads`
thread_type (ThreadType): Type of thread that the message was sent to. See :ref:`intro_threads`
metadata: Extra metadata about the message
msg: A full set of the data received
"""
log.info("New pending message from {}".format(thread_id))
def onColorChange(
self,
mid=None,

View File

@@ -31,9 +31,9 @@ class Mqtt(object):
transport="websockets",
)
mqtt.enable_logger()
# mqtt.max_inflight_messages_set(20)
# mqtt.max_queued_messages_set(0) # unlimited
# mqtt.message_retry_set(5)
# mqtt.max_inflight_messages_set(20) # The rest will get queued
# mqtt.max_queued_messages_set(0) # Unlimited messages can be queued
# mqtt.message_retry_set(20) # Retry sending for at least 20 seconds
# mqtt.reconnect_delay_set(min_delay=1, max_delay=120)
# TODO: Is region (lla | atn | odn | others?) important?
mqtt.tls_set()
@@ -74,11 +74,13 @@ class Mqtt(object):
def _on_message_handler(self, client, userdata, message):
# Parse payload JSON
try:
j = _util.parse_json(message.payload)
except _exception.FBchatFacebookError:
j = _util.parse_json(message.payload.decode("utf-8"))
except (_exception.FBchatFacebookError, UnicodeDecodeError):
log.exception("Failed parsing MQTT data on %s as JSON", message.topic)
return
log.debug("MQTT payload: %s, %s", message.topic, j)
if message.topic == "/t_ms":
# Update sync_token when received
# This is received in the first message after we've created a messenger
@@ -86,18 +88,31 @@ class Mqtt(object):
if "syncToken" in j and "firstDeltaSeqId" in j:
self._sync_token = j["syncToken"]
self._sequence_id = j["firstDeltaSeqId"]
return
# Update last sequence id when received
if "lastIssuedSeqId" in j:
self._sequence_id = j["lastIssuedSeqId"]
if "errorCode" in j:
# Known types: ERROR_QUEUE_OVERFLOW | ERROR_QUEUE_NOT_FOUND
# 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00'
log.error("MQTT error code %s received", j["errorCode"])
# TODO: Consider resetting the sync_token and sequence ID here?
log.debug("MQTT payload: %s, %s", message.topic, j)
error = j["errorCode"]
# TODO: 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00'
if error in ("ERROR_QUEUE_NOT_FOUND", "ERROR_QUEUE_OVERFLOW"):
# ERROR_QUEUE_NOT_FOUND means that the queue was deleted, since too
# much time passed, or that it was simply missing
# ERROR_QUEUE_OVERFLOW means that the sequence id was too small, so
# the desired events could not be retrieved
log.error(
"The MQTT listener was disconnected for too long,"
" events may have been lost"
)
self._sync_token = None
self._sequence_id = self._fetch_sequence_id(self._state)
self._messenger_queue_publish()
# TODO: Signal to the user that they should reload their data!
return
log.error("MQTT error code %s received", error)
return
# Call the external callback
self._on_message(message.topic, j)
@@ -122,6 +137,16 @@ class Mqtt(object):
raise
def _on_connect_handler(self, client, userdata, flags, rc):
if rc == 21:
raise _exception.FBchatException(
"Failed connecting. Maybe your cookies are wrong?"
)
if rc != 0:
return # Don't try to send publish if the connection failed
self._messenger_queue_publish()
def _messenger_queue_publish(self):
# configure receiving messages.
payload = {
"sync_api_version": 10,
@@ -164,6 +189,10 @@ class Mqtt(object):
"/br_sr",
# Response to /br_sr
"/sr_res",
# Data about user-to-user calls
# TODO: Investigate the response from this! (A bunch of binary data)
# "/t_rtc",
# TODO: Find out what this does!
# TODO: Investigate the response from this! (A bunch of binary data)
# "/t_p",
# TODO: Find out what this does!
@@ -179,7 +208,6 @@ class Mqtt(object):
"/messaging_events",
"/orca_message_notifications",
"/pp",
"/t_rtc",
"/webrtc_response",
]
@@ -228,7 +256,9 @@ class Mqtt(object):
headers = {
# TODO: Make this access thread safe
"Cookie": _util.get_cookie_header(self._state._session, self._HOST),
"Cookie": _util.get_cookie_header(
self._state._session, "https://edge-chat.facebook.com/chat"
),
"User-Agent": self._state._session.headers["User-Agent"],
"Origin": "https://www.facebook.com",
"Host": self._HOST,
@@ -250,16 +280,19 @@ class Mqtt(object):
return False # Stop listening
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
err = paho.mqtt.client.error_string(rc)
# If known/expected error
if rc in [paho.mqtt.client.MQTT_ERR_CONN_LOST]:
log.warning(err)
if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST:
log.warning("Connection lost, retrying")
elif rc == paho.mqtt.client.MQTT_ERR_NOMEM:
# This error is wrongly classified
# See https://github.com/eclipse/paho.mqtt.python/issues/340
log.warning("Connection error, retrying")
else:
log.warning("MQTT Error: %s", err)
err = paho.mqtt.client.error_string(rc)
log.error("MQTT Error: %s", err)
# For backwards compatibility
if on_error:
on_error(exception=FBchatException("MQTT Error {}".format(err)))
on_error(_exception.FBchatException("MQTT Error {}".format(err)))
# Wait before reconnecting
self._mqtt._reconnect_wait()
@@ -273,8 +306,8 @@ class Mqtt(object):
paho.mqtt.client.socket.error,
OSError,
paho.mqtt.client.WebsocketConnectionError,
):
log.debug("MQTT reconnection failed")
) as e:
log.debug("MQTT reconnection failed: %s", e)
return True # Keep listening

View File

@@ -70,10 +70,11 @@ def strip_json_cruft(text):
raise FBchatException("No JSON object found: {!r}".format(text))
def get_cookie_header(session, host):
def get_cookie_header(session, url):
"""Extract a cookie header from a requests session."""
# The cookies are extracted this way to make sure they're escaped correctly
return requests.cookies.get_cookie_header(
session.cookies, requests.Request("GET", host),
session.cookies, requests.Request("GET", url),
)