Merge branch 'mqtt-improve'
This commit is contained in:
@@ -134,17 +134,18 @@ Listening & Events
|
|||||||
|
|
||||||
Now, we are finally at the point we have all been waiting for: Creating an automatic Facebook bot!
|
Now, we are finally at the point we have all been waiting for: Creating an automatic Facebook bot!
|
||||||
|
|
||||||
To get started, you create your methods that will handle your events::
|
To get started, you create a listener object::
|
||||||
|
|
||||||
def on_message(event):
|
listener = fbchat.Listener(session=session, chat_on=False, foreground=False)
|
||||||
|
|
||||||
|
The you use that to register methods that will handle your events::
|
||||||
|
|
||||||
|
@listener.register
|
||||||
|
def on_message(event: fbchat.MessageEvent):
|
||||||
print(f"Message from {event.author.id}: {event.message.text}")
|
print(f"Message from {event.author.id}: {event.message.text}")
|
||||||
|
|
||||||
And then you create a listener object, and start handling the incoming events::
|
And then you start handling the incoming events::
|
||||||
|
|
||||||
listener = fbchat.Listener.connect(session, False, False)
|
listener.run()
|
||||||
|
|
||||||
for event in listener.listen():
|
|
||||||
if isinstance(event, fbchat.MessageEvent):
|
|
||||||
on_message(event)
|
|
||||||
|
|
||||||
View the :ref:`examples` to see some more examples illustrating the event system.
|
View the :ref:`examples` to see some more examples illustrating the event system.
|
||||||
|
@@ -1,17 +1,15 @@
|
|||||||
import fbchat
|
import fbchat
|
||||||
|
|
||||||
session = fbchat.Session.login("<email>", "<password>")
|
session = fbchat.Session.login("<email>", "<password>")
|
||||||
|
listener = fbchat.Listener(session=session, chat_on=False, foreground=False)
|
||||||
listener = fbchat.Listener.connect(session, chat_on=False, foreground=False)
|
|
||||||
|
|
||||||
|
|
||||||
def on_message(event):
|
@listener.register
|
||||||
|
def on_message(event: fbchat.MessageEvent):
|
||||||
print(f"{event.message.text} from {event.author.id} in {event.thread.id}")
|
print(f"{event.message.text} from {event.author.id} in {event.thread.id}")
|
||||||
# If you're not the author, echo
|
# If you're not the author, echo
|
||||||
if event.author.id != session.user.id:
|
if event.author.id != session.user.id:
|
||||||
event.thread.send_text(event.message.text)
|
event.thread.send_text(event.message.text)
|
||||||
|
|
||||||
|
|
||||||
for event in listener.listen():
|
listener.run()
|
||||||
if isinstance(event, fbchat.MessageEvent):
|
|
||||||
on_message(event)
|
|
||||||
|
@@ -15,10 +15,10 @@ old_nicknames = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
session = fbchat.Session.login("<email>", "<password>")
|
session = fbchat.Session.login("<email>", "<password>")
|
||||||
|
listener = fbchat.Listener(session=session, chat_on=False, foreground=False)
|
||||||
listener = fbchat.Listener.connect(session, chat_on=False, foreground=False)
|
|
||||||
|
|
||||||
|
|
||||||
|
@listener.register
|
||||||
def on_color_set(event: fbchat.ColorSet):
|
def on_color_set(event: fbchat.ColorSet):
|
||||||
if old_thread_id != event.thread.id:
|
if old_thread_id != event.thread.id:
|
||||||
return
|
return
|
||||||
@@ -27,6 +27,7 @@ def on_color_set(event: fbchat.ColorSet):
|
|||||||
event.thread.set_color(old_color)
|
event.thread.set_color(old_color)
|
||||||
|
|
||||||
|
|
||||||
|
@listener.register
|
||||||
def on_emoji_set(event: fbchat.EmojiSet):
|
def on_emoji_set(event: fbchat.EmojiSet):
|
||||||
if old_thread_id != event.thread.id:
|
if old_thread_id != event.thread.id:
|
||||||
return
|
return
|
||||||
@@ -35,6 +36,7 @@ def on_emoji_set(event: fbchat.EmojiSet):
|
|||||||
event.thread.set_emoji(old_emoji)
|
event.thread.set_emoji(old_emoji)
|
||||||
|
|
||||||
|
|
||||||
|
@listener.register
|
||||||
def on_title_set(event: fbchat.TitleSet):
|
def on_title_set(event: fbchat.TitleSet):
|
||||||
if old_thread_id != event.thread.id:
|
if old_thread_id != event.thread.id:
|
||||||
return
|
return
|
||||||
@@ -43,6 +45,7 @@ def on_title_set(event: fbchat.TitleSet):
|
|||||||
event.thread.set_title(old_title)
|
event.thread.set_title(old_title)
|
||||||
|
|
||||||
|
|
||||||
|
@listener.register
|
||||||
def on_nickname_set(event: fbchat.NicknameSet):
|
def on_nickname_set(event: fbchat.NicknameSet):
|
||||||
if old_thread_id != event.thread.id:
|
if old_thread_id != event.thread.id:
|
||||||
return
|
return
|
||||||
@@ -55,6 +58,7 @@ def on_nickname_set(event: fbchat.NicknameSet):
|
|||||||
event.thread.set_nickname(event.subject.id, old_nickname)
|
event.thread.set_nickname(event.subject.id, old_nickname)
|
||||||
|
|
||||||
|
|
||||||
|
@listener.register
|
||||||
def on_people_added(event: fbchat.PeopleAdded):
|
def on_people_added(event: fbchat.PeopleAdded):
|
||||||
if old_thread_id != event.thread.id:
|
if old_thread_id != event.thread.id:
|
||||||
return
|
return
|
||||||
@@ -64,6 +68,7 @@ def on_people_added(event: fbchat.PeopleAdded):
|
|||||||
event.thread.remove_participant(added.id)
|
event.thread.remove_participant(added.id)
|
||||||
|
|
||||||
|
|
||||||
|
@listener.register
|
||||||
def on_person_removed(event: fbchat.PersonRemoved):
|
def on_person_removed(event: fbchat.PersonRemoved):
|
||||||
if old_thread_id != event.thread.id:
|
if old_thread_id != event.thread.id:
|
||||||
return
|
return
|
||||||
@@ -75,16 +80,4 @@ def on_person_removed(event: fbchat.PersonRemoved):
|
|||||||
event.thread.add_participants([removed.id])
|
event.thread.add_participants([removed.id])
|
||||||
|
|
||||||
|
|
||||||
for event in listener.listen():
|
listener.run()
|
||||||
if isinstance(event, fbchat.ColorSet):
|
|
||||||
on_color_set(event)
|
|
||||||
elif isinstance(event, fbchat.EmojiSet):
|
|
||||||
on_emoji_set(event)
|
|
||||||
elif isinstance(event, fbchat.TitleSet):
|
|
||||||
on_title_set(event)
|
|
||||||
elif isinstance(event, fbchat.NicknameSet):
|
|
||||||
on_nickname_set(event)
|
|
||||||
elif isinstance(event, fbchat.PeopleAdded):
|
|
||||||
on_people_added(event)
|
|
||||||
elif isinstance(event, fbchat.PersonRemoved):
|
|
||||||
on_person_removed(event)
|
|
||||||
|
@@ -1,10 +1,10 @@
|
|||||||
import fbchat
|
import fbchat
|
||||||
|
|
||||||
session = fbchat.Session.login("<email>", "<password>")
|
session = fbchat.Session.login("<email>", "<password>")
|
||||||
|
listener = fbchat.Listener(session=session, chat_on=False, foreground=False)
|
||||||
listener = fbchat.Listener.connect(session, chat_on=False, foreground=False)
|
|
||||||
|
|
||||||
|
|
||||||
|
@listener.register
|
||||||
def on_message(event):
|
def on_message(event):
|
||||||
# We can only kick people from group chats, so no need to try if it's a user chat
|
# We can only kick people from group chats, so no need to try if it's a user chat
|
||||||
if not isinstance(event.thread, fbchat.Group):
|
if not isinstance(event.thread, fbchat.Group):
|
||||||
@@ -14,6 +14,4 @@ def on_message(event):
|
|||||||
event.thread.remove_participant(event.author.id)
|
event.thread.remove_participant(event.author.id)
|
||||||
|
|
||||||
|
|
||||||
for event in listener.listen():
|
listener.run()
|
||||||
if isinstance(event, fbchat.MessageEvent):
|
|
||||||
on_message(event)
|
|
||||||
|
@@ -111,7 +111,7 @@ from ._events import (
|
|||||||
FriendRequest,
|
FriendRequest,
|
||||||
Presence,
|
Presence,
|
||||||
)
|
)
|
||||||
from ._mqtt import Listener
|
from ._listen import Listener
|
||||||
|
|
||||||
from ._client import Client
|
from ._client import Client
|
||||||
|
|
||||||
|
@@ -46,6 +46,11 @@ class ParseError(FacebookError):
|
|||||||
return msg.format(self.message, self.data)
|
return msg.format(self.message, self.data)
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(slots=True, auto_exc=True)
|
||||||
|
class NotLoggedIn(FacebookError):
|
||||||
|
"""Raised by Facebook if the client has been logged out."""
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, auto_exc=True)
|
@attr.s(slots=True, auto_exc=True)
|
||||||
class ExternalError(FacebookError):
|
class ExternalError(FacebookError):
|
||||||
"""Base class for errors that Facebook return."""
|
"""Base class for errors that Facebook return."""
|
||||||
@@ -86,13 +91,6 @@ class InvalidParameters(ExternalError):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, auto_exc=True)
|
|
||||||
class NotLoggedIn(ExternalError):
|
|
||||||
"""Raised by Facebook if the client has been logged out."""
|
|
||||||
|
|
||||||
code = attr.ib()
|
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, auto_exc=True)
|
@attr.s(slots=True, auto_exc=True)
|
||||||
class PleaseRefresh(ExternalError):
|
class PleaseRefresh(ExternalError):
|
||||||
"""Raised by Facebook if the client has been inactive for too long.
|
"""Raised by Facebook if the client has been inactive for too long.
|
||||||
@@ -108,7 +106,7 @@ def handle_payload_error(j):
|
|||||||
return
|
return
|
||||||
code = j["error"]
|
code = j["error"]
|
||||||
if code == 1357001:
|
if code == 1357001:
|
||||||
error_cls = NotLoggedIn
|
raise NotLoggedIn(j["errorSummary"])
|
||||||
elif code == 1357004:
|
elif code == 1357004:
|
||||||
error_cls = PleaseRefresh
|
error_cls = PleaseRefresh
|
||||||
elif code in (1357031, 1545010, 1545003):
|
elif code in (1357031, 1545010, 1545003):
|
||||||
|
460
fbchat/_listen.py
Normal file
460
fbchat/_listen.py
Normal file
@@ -0,0 +1,460 @@
|
|||||||
|
import attr
|
||||||
|
import inspect
|
||||||
|
import random
|
||||||
|
import paho.mqtt.client
|
||||||
|
import requests
|
||||||
|
from ._common import log, kw_only
|
||||||
|
from . import _util, _exception, _session, _graphql, _events
|
||||||
|
|
||||||
|
from typing import Iterable, Optional, Mapping, Callable
|
||||||
|
|
||||||
|
|
||||||
|
HOST = "edge-chat.facebook.com"
|
||||||
|
|
||||||
|
TOPICS = [
|
||||||
|
# Things that happen in chats (e.g. messages)
|
||||||
|
"/t_ms",
|
||||||
|
# Group typing notifications
|
||||||
|
"/thread_typing",
|
||||||
|
# Private chat typing notifications
|
||||||
|
"/orca_typing_notifications",
|
||||||
|
# Active notifications
|
||||||
|
"/orca_presence",
|
||||||
|
# Other notifications not related to chats (e.g. friend requests)
|
||||||
|
"/legacy_web",
|
||||||
|
# Facebook's continuous error reporting/logging?
|
||||||
|
"/br_sr",
|
||||||
|
# Response to /br_sr
|
||||||
|
"/sr_res",
|
||||||
|
# Data about user-to-user calls
|
||||||
|
# TODO: Investigate the response from this! (A bunch of binary data)
|
||||||
|
# "/t_rtc",
|
||||||
|
# TODO: Find out what this does!
|
||||||
|
# TODO: Investigate the response from this! (A bunch of binary data)
|
||||||
|
# "/t_p",
|
||||||
|
# TODO: Find out what this does!
|
||||||
|
"/webrtc",
|
||||||
|
# TODO: Find out what this does!
|
||||||
|
"/onevc",
|
||||||
|
# TODO: Find out what this does!
|
||||||
|
"/notify_disconnect",
|
||||||
|
# Old, no longer active topics
|
||||||
|
# These are here just in case something interesting pops up
|
||||||
|
"/inbox",
|
||||||
|
"/mercury",
|
||||||
|
"/messaging_events",
|
||||||
|
"/orca_message_notifications",
|
||||||
|
"/pp",
|
||||||
|
"/webrtc_response",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def get_cookie_header(session: requests.Session, url: str) -> str:
|
||||||
|
"""Extract a cookie header from a requests session."""
|
||||||
|
# The cookies are extracted this way to make sure they're escaped correctly
|
||||||
|
return requests.cookies.get_cookie_header(
|
||||||
|
session.cookies, requests.Request("GET", url),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def generate_session_id() -> int:
|
||||||
|
"""Generate a random session ID between 1 and 9007199254740991."""
|
||||||
|
return random.randint(1, 2 ** 53)
|
||||||
|
|
||||||
|
|
||||||
|
def mqtt_factory() -> paho.mqtt.client.Client:
|
||||||
|
# Configure internal MQTT handler
|
||||||
|
mqtt = paho.mqtt.client.Client(
|
||||||
|
client_id="mqttwsclient",
|
||||||
|
clean_session=True,
|
||||||
|
protocol=paho.mqtt.client.MQTTv31,
|
||||||
|
transport="websockets",
|
||||||
|
)
|
||||||
|
mqtt.enable_logger()
|
||||||
|
# mqtt.max_inflight_messages_set(20) # The rest will get queued
|
||||||
|
# mqtt.max_queued_messages_set(0) # Unlimited messages can be queued
|
||||||
|
# mqtt.message_retry_set(20) # Retry sending for at least 20 seconds
|
||||||
|
# mqtt.reconnect_delay_set(min_delay=1, max_delay=120)
|
||||||
|
mqtt.tls_set()
|
||||||
|
mqtt.connect_async(HOST, 443, keepalive=10)
|
||||||
|
return mqtt
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_sequence_id(session: _session.Session) -> int:
|
||||||
|
"""Fetch sequence ID."""
|
||||||
|
params = {
|
||||||
|
"limit": 0,
|
||||||
|
"tags": ["INBOX"],
|
||||||
|
"before": None,
|
||||||
|
"includeDeliveryReceipts": False,
|
||||||
|
"includeSeqID": True,
|
||||||
|
}
|
||||||
|
log.debug("Fetching MQTT sequence ID")
|
||||||
|
# Same doc id as in `Client.fetch_threads`
|
||||||
|
(j,) = session._graphql_requests(_graphql.from_doc_id("1349387578499440", params))
|
||||||
|
sequence_id = j["viewer"]["message_threads"]["sync_sequence_id"]
|
||||||
|
if not sequence_id:
|
||||||
|
raise _exception.NotLoggedIn("Failed fetching sequence id")
|
||||||
|
return int(sequence_id)
|
||||||
|
|
||||||
|
|
||||||
|
HandlerT = Callable[[_events.Event], None]
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(slots=True, kw_only=kw_only, eq=False)
|
||||||
|
class Listener:
|
||||||
|
"""Helper, to listen for incoming Facebook events.
|
||||||
|
|
||||||
|
Initialize a connection to the Facebook MQTT service.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session: The session to use when making requests.
|
||||||
|
chat_on: Whether ...
|
||||||
|
foreground: Whether ...
|
||||||
|
|
||||||
|
Example:
|
||||||
|
>>> listener = fbchat.Listener(session, chat_on=True, foreground=True)
|
||||||
|
"""
|
||||||
|
|
||||||
|
session = attr.ib(type=_session.Session)
|
||||||
|
_chat_on = attr.ib(type=bool)
|
||||||
|
_foreground = attr.ib(type=bool)
|
||||||
|
_mqtt = attr.ib(factory=mqtt_factory, type=paho.mqtt.client.Client)
|
||||||
|
_sync_token = attr.ib(None, type=Optional[str])
|
||||||
|
_sequence_id = attr.ib(None, type=Optional[int])
|
||||||
|
_tmp_events = attr.ib(factory=list, type=Iterable[_events.Event])
|
||||||
|
_handlers = attr.ib(factory=dict, type=Mapping[HandlerT, _events.Event])
|
||||||
|
|
||||||
|
def __attrs_post_init__(self):
|
||||||
|
# Configure callbacks
|
||||||
|
self._mqtt.on_message = self._on_message_handler
|
||||||
|
self._mqtt.on_connect = self._on_connect_handler
|
||||||
|
|
||||||
|
def _handle_ms(self, j):
|
||||||
|
"""Handle /t_ms special logic.
|
||||||
|
|
||||||
|
Returns whether to continue parsing the message.
|
||||||
|
"""
|
||||||
|
# TODO: Merge this with the parsing in _events
|
||||||
|
|
||||||
|
# Update sync_token when received
|
||||||
|
# This is received in the first message after we've created a messenger
|
||||||
|
# sync queue.
|
||||||
|
if "syncToken" in j and "firstDeltaSeqId" in j:
|
||||||
|
self._sync_token = j["syncToken"]
|
||||||
|
self._sequence_id = j["firstDeltaSeqId"]
|
||||||
|
return False
|
||||||
|
|
||||||
|
if "errorCode" in j:
|
||||||
|
error = j["errorCode"]
|
||||||
|
# TODO: 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00'
|
||||||
|
if error in ("ERROR_QUEUE_NOT_FOUND", "ERROR_QUEUE_OVERFLOW"):
|
||||||
|
# ERROR_QUEUE_NOT_FOUND means that the queue was deleted, since too
|
||||||
|
# much time passed, or that it was simply missing
|
||||||
|
# ERROR_QUEUE_OVERFLOW means that the sequence id was too small, so
|
||||||
|
# the desired events could not be retrieved
|
||||||
|
log.error(
|
||||||
|
"The MQTT listener was disconnected for too long,"
|
||||||
|
" events may have been lost"
|
||||||
|
)
|
||||||
|
self._sync_token = None
|
||||||
|
self._sequence_id = None
|
||||||
|
return False
|
||||||
|
log.error("MQTT error code %s received", error)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Update last sequence id
|
||||||
|
# Except for the two cases above, this is always received
|
||||||
|
self._sequence_id = j["lastIssuedSeqId"]
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _on_message_handler(self, client, userdata, message):
|
||||||
|
# Parse payload JSON
|
||||||
|
try:
|
||||||
|
j = _util.parse_json(message.payload.decode("utf-8"))
|
||||||
|
except (_exception.FacebookError, UnicodeDecodeError):
|
||||||
|
log.debug(message.payload)
|
||||||
|
log.exception("Failed parsing MQTT data on %s as JSON", message.topic)
|
||||||
|
return
|
||||||
|
|
||||||
|
log.debug("MQTT payload: %s, %s", message.topic, j)
|
||||||
|
|
||||||
|
if message.topic == "/t_ms":
|
||||||
|
if not self._handle_ms(j):
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
# TODO: Don't handle this in a callback
|
||||||
|
self._tmp_events = list(
|
||||||
|
_events.parse_events(self.session, message.topic, j)
|
||||||
|
)
|
||||||
|
except _exception.ParseError:
|
||||||
|
log.exception("Failed parsing MQTT data")
|
||||||
|
|
||||||
|
def _on_connect_handler(self, client, userdata, flags, rc):
|
||||||
|
if rc == 21:
|
||||||
|
raise _exception.FacebookError(
|
||||||
|
"Failed connecting. Maybe your cookies are wrong?"
|
||||||
|
)
|
||||||
|
if rc != 0:
|
||||||
|
err = paho.mqtt.client.connack_string(rc)
|
||||||
|
log.error("MQTT Connection Error: %s", err)
|
||||||
|
return # Don't try to send publish if the connection failed
|
||||||
|
|
||||||
|
self._messenger_queue_publish()
|
||||||
|
|
||||||
|
def _messenger_queue_publish(self):
|
||||||
|
# configure receiving messages.
|
||||||
|
payload = {
|
||||||
|
"sync_api_version": 10,
|
||||||
|
"max_deltas_able_to_process": 1000,
|
||||||
|
"delta_batch_size": 500,
|
||||||
|
"encoding": "JSON",
|
||||||
|
"entity_fbid": self.session.user.id,
|
||||||
|
}
|
||||||
|
|
||||||
|
# If we don't have a sync_token, create a new messenger queue
|
||||||
|
# This is done so that across reconnects, if we've received a sync token, we
|
||||||
|
# SHOULD receive a piece of data in /t_ms exactly once!
|
||||||
|
if self._sync_token is None:
|
||||||
|
topic = "/messenger_sync_create_queue"
|
||||||
|
payload["initial_titan_sequence_id"] = str(self._sequence_id)
|
||||||
|
payload["device_params"] = None
|
||||||
|
else:
|
||||||
|
topic = "/messenger_sync_get_diffs"
|
||||||
|
payload["last_seq_id"] = str(self._sequence_id)
|
||||||
|
payload["sync_token"] = self._sync_token
|
||||||
|
|
||||||
|
self._mqtt.publish(topic, _util.json_minimal(payload), qos=1)
|
||||||
|
|
||||||
|
def _configure_connect_options(self):
|
||||||
|
# Generate a new session ID on each reconnect
|
||||||
|
session_id = generate_session_id()
|
||||||
|
|
||||||
|
username = {
|
||||||
|
# The user ID
|
||||||
|
"u": self.session.user.id,
|
||||||
|
# Session ID
|
||||||
|
"s": session_id,
|
||||||
|
# Active status setting
|
||||||
|
"chat_on": self._chat_on,
|
||||||
|
# foreground_state - Whether the window is focused
|
||||||
|
"fg": self._foreground,
|
||||||
|
# Can be any random ID
|
||||||
|
"d": self.session._client_id,
|
||||||
|
# Application ID, taken from facebook.com
|
||||||
|
"aid": 219994525426954,
|
||||||
|
# MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing
|
||||||
|
"st": TOPICS,
|
||||||
|
# MQTT extension by FB, allows making a PUBLISH while CONNECTing
|
||||||
|
# Using this is more efficient, but the same can be acheived with:
|
||||||
|
# def on_connect(*args):
|
||||||
|
# mqtt.publish(topic, payload, qos=1)
|
||||||
|
# mqtt.on_connect = on_connect
|
||||||
|
# TODO: For some reason this doesn't work!
|
||||||
|
"pm": [
|
||||||
|
# {
|
||||||
|
# "topic": topic,
|
||||||
|
# "payload": payload,
|
||||||
|
# "qos": 1,
|
||||||
|
# "messageId": 65536,
|
||||||
|
# }
|
||||||
|
],
|
||||||
|
# Unknown parameters
|
||||||
|
"cp": 3,
|
||||||
|
"ecp": 10,
|
||||||
|
"ct": "websocket",
|
||||||
|
"mqtt_sid": "",
|
||||||
|
"dc": "",
|
||||||
|
"no_auto_fg": True,
|
||||||
|
"gas": None,
|
||||||
|
"pack": [],
|
||||||
|
}
|
||||||
|
|
||||||
|
self._mqtt.username_pw_set(_util.json_minimal(username))
|
||||||
|
|
||||||
|
headers = {
|
||||||
|
"Cookie": get_cookie_header(
|
||||||
|
self.session._session, "https://edge-chat.facebook.com/chat"
|
||||||
|
),
|
||||||
|
"User-Agent": self.session._session.headers["User-Agent"],
|
||||||
|
"Origin": "https://www.facebook.com",
|
||||||
|
"Host": HOST,
|
||||||
|
}
|
||||||
|
|
||||||
|
# TODO: Is region (lla | atn | odn | others?) important?
|
||||||
|
self._mqtt.ws_set_options(
|
||||||
|
path="/chat?sid={}".format(session_id), headers=headers
|
||||||
|
)
|
||||||
|
|
||||||
|
def _reconnect(self):
|
||||||
|
# Try reconnecting
|
||||||
|
self._configure_connect_options()
|
||||||
|
try:
|
||||||
|
self._mqtt.reconnect()
|
||||||
|
except (
|
||||||
|
# Taken from .loop_forever
|
||||||
|
paho.mqtt.client.socket.error,
|
||||||
|
OSError,
|
||||||
|
paho.mqtt.client.WebsocketConnectionError,
|
||||||
|
) as e:
|
||||||
|
log.debug("MQTT reconnection failed: %s", e)
|
||||||
|
# Wait before reconnecting
|
||||||
|
self._mqtt._reconnect_wait()
|
||||||
|
|
||||||
|
def listen(self) -> Iterable[_events.Event]:
|
||||||
|
"""Run the listening loop continually.
|
||||||
|
|
||||||
|
Yields events when they arrive.
|
||||||
|
|
||||||
|
This will automatically reconnect on errors, except if the errors are one of
|
||||||
|
`PleaseRefresh` or `NotLoggedIn`.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
Print events continually.
|
||||||
|
|
||||||
|
>>> for event in listener.listen():
|
||||||
|
... print(event)
|
||||||
|
"""
|
||||||
|
if self._sequence_id is None:
|
||||||
|
self._sequence_id = fetch_sequence_id(self.session)
|
||||||
|
|
||||||
|
# Make sure we're connected
|
||||||
|
while True:
|
||||||
|
# Beware, internal API, may have to change this to something more stable!
|
||||||
|
if self._mqtt._state == paho.mqtt.client.mqtt_cs_connect_async:
|
||||||
|
self._reconnect()
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
while True:
|
||||||
|
rc = self._mqtt.loop(timeout=1.0)
|
||||||
|
|
||||||
|
# The sequence ID was reset in _handle_ms
|
||||||
|
# TODO: Signal to the user that they should reload their data!
|
||||||
|
if self._sequence_id is None:
|
||||||
|
self._sequence_id = fetch_sequence_id(self.session)
|
||||||
|
self._messenger_queue_publish()
|
||||||
|
|
||||||
|
# If disconnect() has been called
|
||||||
|
# Beware, internal API, may have to change this to something more stable!
|
||||||
|
if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting:
|
||||||
|
break # Stop listening
|
||||||
|
|
||||||
|
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
|
||||||
|
# If known/expected error
|
||||||
|
if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST:
|
||||||
|
log.warning("Connection lost, retrying")
|
||||||
|
elif rc == paho.mqtt.client.MQTT_ERR_NOMEM:
|
||||||
|
# This error is wrongly classified
|
||||||
|
# See https://github.com/eclipse/paho.mqtt.python/issues/340
|
||||||
|
log.warning("Connection error, retrying")
|
||||||
|
elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED:
|
||||||
|
raise _exception.NotLoggedIn("MQTT connection refused")
|
||||||
|
else:
|
||||||
|
err = paho.mqtt.client.error_string(rc)
|
||||||
|
log.error("MQTT Error: %s", err)
|
||||||
|
|
||||||
|
self._reconnect()
|
||||||
|
|
||||||
|
if self._tmp_events:
|
||||||
|
yield from self._tmp_events
|
||||||
|
self._tmp_events = []
|
||||||
|
|
||||||
|
def disconnect(self) -> None:
|
||||||
|
"""Disconnect the MQTT listener.
|
||||||
|
|
||||||
|
Can be called while listening, which will stop the listening loop.
|
||||||
|
|
||||||
|
The `Listener` object should not be used after this is called!
|
||||||
|
|
||||||
|
Example:
|
||||||
|
Stop the listener when recieving a message with the text "/stop"
|
||||||
|
|
||||||
|
>>> for event in listener.listen():
|
||||||
|
... if isinstance(event, fbchat.MessageEvent):
|
||||||
|
... if event.message.text == "/stop":
|
||||||
|
... listener.disconnect() # Almost the same "break"
|
||||||
|
"""
|
||||||
|
self._mqtt.disconnect()
|
||||||
|
|
||||||
|
def set_foreground(self, value: bool) -> None:
|
||||||
|
"""Set the `foreground` value while listening."""
|
||||||
|
# TODO: Document what this actually does!
|
||||||
|
payload = _util.json_minimal({"foreground": value})
|
||||||
|
info = self._mqtt.publish("/foreground_state", payload=payload, qos=1)
|
||||||
|
self._foreground = value
|
||||||
|
# TODO: We can't wait for this, since the loop is running within the same thread
|
||||||
|
# info.wait_for_publish()
|
||||||
|
|
||||||
|
def set_chat_on(self, value: bool) -> None:
|
||||||
|
"""Set the `chat_on` value while listening."""
|
||||||
|
# TODO: Document what this actually does!
|
||||||
|
# TODO: Is this the right request to make?
|
||||||
|
data = {"make_user_available_when_in_foreground": value}
|
||||||
|
payload = _util.json_minimal(data)
|
||||||
|
info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1)
|
||||||
|
self._chat_on = value
|
||||||
|
# TODO: We can't wait for this, since the loop is running within the same thread
|
||||||
|
# info.wait_for_publish()
|
||||||
|
|
||||||
|
# def send_additional_contacts(self, additional_contacts):
|
||||||
|
# payload = _util.json_minimal({"additional_contacts": additional_contacts})
|
||||||
|
# info = self._mqtt.publish("/send_additional_contacts", payload=payload, qos=1)
|
||||||
|
#
|
||||||
|
# def browser_close(self):
|
||||||
|
# info = self._mqtt.publish("/browser_close", payload=b"{}", qos=1)
|
||||||
|
|
||||||
|
def register(func: HandlerT) -> HandlerT:
|
||||||
|
"""Register a function that will be called when .run is called.
|
||||||
|
|
||||||
|
The input function must take a single annotated argument.
|
||||||
|
|
||||||
|
Should be used as a function decorator.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
>>> @listener.register
|
||||||
|
>>> def my_handler(event: fbchat.Event):
|
||||||
|
... print(f"New event: {event}")
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
parameter = next(iter(inspect.signature(func).parameters.values()))
|
||||||
|
except Exception as e: # TODO: More precise exceptions
|
||||||
|
raise ValueError("Invalid function. Must have at least an argument") from e
|
||||||
|
|
||||||
|
if parameter.annotation is parameter.empty:
|
||||||
|
raise ValueError("Invalid function. Must be annotated")
|
||||||
|
|
||||||
|
if not issubclass(parameter.annotation, _events.Event):
|
||||||
|
raise ValueError("Invalid function. Annotation must be an event class")
|
||||||
|
|
||||||
|
# TODO: More error checks, e.g. kw_only parameters
|
||||||
|
|
||||||
|
self._handlers[func] = parameter.annotation
|
||||||
|
|
||||||
|
def unregister(func: HandlerT):
|
||||||
|
"""Unregister a previously registered function."""
|
||||||
|
try:
|
||||||
|
self._handlers.pop(func)
|
||||||
|
except KeyError:
|
||||||
|
raise ValueError("Tried to unregister a function that was not registered")
|
||||||
|
|
||||||
|
def run(self) -> None:
|
||||||
|
"""Run the listening loop, and dispatch incoming events to registered handlers.
|
||||||
|
|
||||||
|
This uses `.listen`, which reconnect on errors, except if the errors are one of
|
||||||
|
`PleaseRefresh` or `NotLoggedIn`.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
Print incoming messages.
|
||||||
|
|
||||||
|
>>> @listener.register
|
||||||
|
>>> def print_msg(event: fbchat.MessageEvent):
|
||||||
|
... print(event.message.text)
|
||||||
|
...
|
||||||
|
>>> listener.run()
|
||||||
|
"""
|
||||||
|
for event in self.listen():
|
||||||
|
for handler, event_cls in self._handlers.items():
|
||||||
|
if isinstance(event, event_cls):
|
||||||
|
handler(event)
|
407
fbchat/_mqtt.py
407
fbchat/_mqtt.py
@@ -1,407 +0,0 @@
|
|||||||
import attr
|
|
||||||
import random
|
|
||||||
import paho.mqtt.client
|
|
||||||
import requests
|
|
||||||
from ._common import log, kw_only
|
|
||||||
from . import _util, _exception, _session, _graphql, _events
|
|
||||||
|
|
||||||
from typing import Iterable, Optional
|
|
||||||
|
|
||||||
|
|
||||||
def get_cookie_header(session: requests.Session, url: str) -> str:
|
|
||||||
"""Extract a cookie header from a requests session."""
|
|
||||||
# The cookies are extracted this way to make sure they're escaped correctly
|
|
||||||
return requests.cookies.get_cookie_header(
|
|
||||||
session.cookies, requests.Request("GET", url),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def generate_session_id() -> int:
|
|
||||||
"""Generate a random session ID between 1 and 9007199254740991."""
|
|
||||||
return random.randint(1, 2 ** 53)
|
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, kw_only=kw_only, repr=False, eq=False)
|
|
||||||
class Listener:
|
|
||||||
"""Helper, to listen for incoming Facebook events."""
|
|
||||||
|
|
||||||
session = attr.ib(type=_session.Session)
|
|
||||||
_mqtt = attr.ib(type=paho.mqtt.client.Client)
|
|
||||||
_chat_on = attr.ib(type=bool)
|
|
||||||
_foreground = attr.ib(type=bool)
|
|
||||||
_sequence_id = attr.ib(type=int)
|
|
||||||
_sync_token = attr.ib(None, type=str)
|
|
||||||
_tmp_events = attr.ib(None, type=Optional[Iterable[_events.Event]])
|
|
||||||
|
|
||||||
_HOST = "edge-chat.facebook.com"
|
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
|
||||||
# An alternative repr, to illustrate that you can't create the class directly
|
|
||||||
return "<fbchat.Listener session={} chat_on={} foreground={}>".format(
|
|
||||||
self.session, self._chat_on, self._foreground
|
|
||||||
)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def connect(cls, session: _session.Session, chat_on: bool, foreground: bool):
|
|
||||||
"""Initialize a connection to the Facebook MQTT service.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
session: The session to use when making requests.
|
|
||||||
chat_on: Whether ...
|
|
||||||
foreground: Whether ...
|
|
||||||
|
|
||||||
Example:
|
|
||||||
>>> listener = fbchat.Listener.connect(session, chat_on=True, foreground=True)
|
|
||||||
"""
|
|
||||||
mqtt = paho.mqtt.client.Client(
|
|
||||||
client_id="mqttwsclient",
|
|
||||||
clean_session=True,
|
|
||||||
protocol=paho.mqtt.client.MQTTv31,
|
|
||||||
transport="websockets",
|
|
||||||
)
|
|
||||||
mqtt.enable_logger()
|
|
||||||
# mqtt.max_inflight_messages_set(20) # The rest will get queued
|
|
||||||
# mqtt.max_queued_messages_set(0) # Unlimited messages can be queued
|
|
||||||
# mqtt.message_retry_set(20) # Retry sending for at least 20 seconds
|
|
||||||
# mqtt.reconnect_delay_set(min_delay=1, max_delay=120)
|
|
||||||
# TODO: Is region (lla | atn | odn | others?) important?
|
|
||||||
mqtt.tls_set()
|
|
||||||
|
|
||||||
self = cls(
|
|
||||||
session=session,
|
|
||||||
mqtt=mqtt,
|
|
||||||
chat_on=chat_on,
|
|
||||||
foreground=foreground,
|
|
||||||
sequence_id=cls._fetch_sequence_id(session),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Configure callbacks
|
|
||||||
mqtt.on_message = self._on_message_handler
|
|
||||||
mqtt.on_connect = self._on_connect_handler
|
|
||||||
|
|
||||||
self._configure_connect_options()
|
|
||||||
|
|
||||||
# Attempt to connect
|
|
||||||
try:
|
|
||||||
rc = mqtt.connect(self._HOST, 443, keepalive=10)
|
|
||||||
except (
|
|
||||||
# Taken from .loop_forever
|
|
||||||
paho.mqtt.client.socket.error,
|
|
||||||
OSError,
|
|
||||||
paho.mqtt.client.WebsocketConnectionError,
|
|
||||||
) as e:
|
|
||||||
raise _exception.FacebookError("MQTT connection failed") from e
|
|
||||||
|
|
||||||
# Raise error if connecting failed
|
|
||||||
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
|
|
||||||
err = paho.mqtt.client.error_string(rc)
|
|
||||||
raise _exception.FacebookError("MQTT connection failed: {}".format(err))
|
|
||||||
|
|
||||||
return self
|
|
||||||
|
|
||||||
def _on_message_handler(self, client, userdata, message):
|
|
||||||
# Parse payload JSON
|
|
||||||
try:
|
|
||||||
j = _util.parse_json(message.payload.decode("utf-8"))
|
|
||||||
except (_exception.FacebookError, UnicodeDecodeError):
|
|
||||||
log.debug(message.payload)
|
|
||||||
log.exception("Failed parsing MQTT data on %s as JSON", message.topic)
|
|
||||||
return
|
|
||||||
|
|
||||||
log.debug("MQTT payload: %s, %s", message.topic, j)
|
|
||||||
|
|
||||||
if message.topic == "/t_ms":
|
|
||||||
# Update sync_token when received
|
|
||||||
# This is received in the first message after we've created a messenger
|
|
||||||
# sync queue.
|
|
||||||
if "syncToken" in j and "firstDeltaSeqId" in j:
|
|
||||||
self._sync_token = j["syncToken"]
|
|
||||||
self._sequence_id = j["firstDeltaSeqId"]
|
|
||||||
return
|
|
||||||
|
|
||||||
if "errorCode" in j:
|
|
||||||
error = j["errorCode"]
|
|
||||||
# TODO: 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00'
|
|
||||||
if error in ("ERROR_QUEUE_NOT_FOUND", "ERROR_QUEUE_OVERFLOW"):
|
|
||||||
# ERROR_QUEUE_NOT_FOUND means that the queue was deleted, since too
|
|
||||||
# much time passed, or that it was simply missing
|
|
||||||
# ERROR_QUEUE_OVERFLOW means that the sequence id was too small, so
|
|
||||||
# the desired events could not be retrieved
|
|
||||||
log.error(
|
|
||||||
"The MQTT listener was disconnected for too long,"
|
|
||||||
" events may have been lost"
|
|
||||||
)
|
|
||||||
self._sync_token = None
|
|
||||||
self._sequence_id = self._fetch_sequence_id(self.session)
|
|
||||||
self._messenger_queue_publish()
|
|
||||||
# TODO: Signal to the user that they should reload their data!
|
|
||||||
return
|
|
||||||
log.error("MQTT error code %s received", error)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Update last sequence id when received
|
|
||||||
if "lastIssuedSeqId" in j:
|
|
||||||
self._sequence_id = j["lastIssuedSeqId"]
|
|
||||||
else:
|
|
||||||
log.error("Missing last sequence id: %s", j)
|
|
||||||
|
|
||||||
try:
|
|
||||||
# TODO: Don't handle this in a callback
|
|
||||||
self._tmp_events = list(
|
|
||||||
_events.parse_events(self.session, message.topic, j)
|
|
||||||
)
|
|
||||||
except _exception.ParseError:
|
|
||||||
log.exception("Failed parsing MQTT data")
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _fetch_sequence_id(session: _session.Session) -> int:
|
|
||||||
"""Fetch sequence ID."""
|
|
||||||
params = {
|
|
||||||
"limit": 0,
|
|
||||||
"tags": ["INBOX"],
|
|
||||||
"before": None,
|
|
||||||
"includeDeliveryReceipts": False,
|
|
||||||
"includeSeqID": True,
|
|
||||||
}
|
|
||||||
log.debug("Fetching MQTT sequence ID")
|
|
||||||
# Same request as in `Client.fetch_threads`
|
|
||||||
(j,) = session._graphql_requests(
|
|
||||||
_graphql.from_doc_id("1349387578499440", params)
|
|
||||||
)
|
|
||||||
sequence_id = j["viewer"]["message_threads"]["sync_sequence_id"]
|
|
||||||
if not sequence_id:
|
|
||||||
raise _exception.NotLoggedIn("Failed fetching sequence id")
|
|
||||||
return int(sequence_id)
|
|
||||||
|
|
||||||
def _on_connect_handler(self, client, userdata, flags, rc):
|
|
||||||
if rc == 21:
|
|
||||||
raise _exception.FacebookError(
|
|
||||||
"Failed connecting. Maybe your cookies are wrong?"
|
|
||||||
)
|
|
||||||
if rc != 0:
|
|
||||||
return # Don't try to send publish if the connection failed
|
|
||||||
|
|
||||||
self._messenger_queue_publish()
|
|
||||||
|
|
||||||
def _messenger_queue_publish(self):
|
|
||||||
# configure receiving messages.
|
|
||||||
payload = {
|
|
||||||
"sync_api_version": 10,
|
|
||||||
"max_deltas_able_to_process": 1000,
|
|
||||||
"delta_batch_size": 500,
|
|
||||||
"encoding": "JSON",
|
|
||||||
"entity_fbid": self.session.user.id,
|
|
||||||
}
|
|
||||||
|
|
||||||
# If we don't have a sync_token, create a new messenger queue
|
|
||||||
# This is done so that across reconnects, if we've received a sync token, we
|
|
||||||
# SHOULD receive a piece of data in /t_ms exactly once!
|
|
||||||
if self._sync_token is None:
|
|
||||||
topic = "/messenger_sync_create_queue"
|
|
||||||
payload["initial_titan_sequence_id"] = str(self._sequence_id)
|
|
||||||
payload["device_params"] = None
|
|
||||||
else:
|
|
||||||
topic = "/messenger_sync_get_diffs"
|
|
||||||
payload["last_seq_id"] = str(self._sequence_id)
|
|
||||||
payload["sync_token"] = self._sync_token
|
|
||||||
|
|
||||||
self._mqtt.publish(topic, _util.json_minimal(payload), qos=1)
|
|
||||||
|
|
||||||
def _configure_connect_options(self):
|
|
||||||
# Generate a new session ID on each reconnect
|
|
||||||
session_id = generate_session_id()
|
|
||||||
|
|
||||||
topics = [
|
|
||||||
# Things that happen in chats (e.g. messages)
|
|
||||||
"/t_ms",
|
|
||||||
# Group typing notifications
|
|
||||||
"/thread_typing",
|
|
||||||
# Private chat typing notifications
|
|
||||||
"/orca_typing_notifications",
|
|
||||||
# Active notifications
|
|
||||||
"/orca_presence",
|
|
||||||
# Other notifications not related to chats (e.g. friend requests)
|
|
||||||
"/legacy_web",
|
|
||||||
# Facebook's continuous error reporting/logging?
|
|
||||||
"/br_sr",
|
|
||||||
# Response to /br_sr
|
|
||||||
"/sr_res",
|
|
||||||
# Data about user-to-user calls
|
|
||||||
# TODO: Investigate the response from this! (A bunch of binary data)
|
|
||||||
# "/t_rtc",
|
|
||||||
# TODO: Find out what this does!
|
|
||||||
# TODO: Investigate the response from this! (A bunch of binary data)
|
|
||||||
# "/t_p",
|
|
||||||
# TODO: Find out what this does!
|
|
||||||
"/webrtc",
|
|
||||||
# TODO: Find out what this does!
|
|
||||||
"/onevc",
|
|
||||||
# TODO: Find out what this does!
|
|
||||||
"/notify_disconnect",
|
|
||||||
# Old, no longer active topics
|
|
||||||
# These are here just in case something interesting pops up
|
|
||||||
"/inbox",
|
|
||||||
"/mercury",
|
|
||||||
"/messaging_events",
|
|
||||||
"/orca_message_notifications",
|
|
||||||
"/pp",
|
|
||||||
"/webrtc_response",
|
|
||||||
]
|
|
||||||
|
|
||||||
username = {
|
|
||||||
# The user ID
|
|
||||||
"u": self.session.user.id,
|
|
||||||
# Session ID
|
|
||||||
"s": session_id,
|
|
||||||
# Active status setting
|
|
||||||
"chat_on": self._chat_on,
|
|
||||||
# foreground_state - Whether the window is focused
|
|
||||||
"fg": self._foreground,
|
|
||||||
# Can be any random ID
|
|
||||||
"d": self.session._client_id,
|
|
||||||
# Application ID, taken from facebook.com
|
|
||||||
"aid": 219994525426954,
|
|
||||||
# MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing
|
|
||||||
"st": topics,
|
|
||||||
# MQTT extension by FB, allows making a PUBLISH while CONNECTing
|
|
||||||
# Using this is more efficient, but the same can be acheived with:
|
|
||||||
# def on_connect(*args):
|
|
||||||
# mqtt.publish(topic, payload, qos=1)
|
|
||||||
# mqtt.on_connect = on_connect
|
|
||||||
# TODO: For some reason this doesn't work!
|
|
||||||
"pm": [
|
|
||||||
# {
|
|
||||||
# "topic": topic,
|
|
||||||
# "payload": payload,
|
|
||||||
# "qos": 1,
|
|
||||||
# "messageId": 65536,
|
|
||||||
# }
|
|
||||||
],
|
|
||||||
# Unknown parameters
|
|
||||||
"cp": 3,
|
|
||||||
"ecp": 10,
|
|
||||||
"ct": "websocket",
|
|
||||||
"mqtt_sid": "",
|
|
||||||
"dc": "",
|
|
||||||
"no_auto_fg": True,
|
|
||||||
"gas": None,
|
|
||||||
"pack": [],
|
|
||||||
}
|
|
||||||
|
|
||||||
# TODO: Make this thread safe
|
|
||||||
self._mqtt.username_pw_set(_util.json_minimal(username))
|
|
||||||
|
|
||||||
headers = {
|
|
||||||
# TODO: Make this access thread safe
|
|
||||||
"Cookie": get_cookie_header(
|
|
||||||
self.session._session, "https://edge-chat.facebook.com/chat"
|
|
||||||
),
|
|
||||||
"User-Agent": self.session._session.headers["User-Agent"],
|
|
||||||
"Origin": "https://www.facebook.com",
|
|
||||||
"Host": self._HOST,
|
|
||||||
}
|
|
||||||
|
|
||||||
self._mqtt.ws_set_options(
|
|
||||||
path="/chat?sid={}".format(session_id), headers=headers
|
|
||||||
)
|
|
||||||
|
|
||||||
def _loop_once(self) -> bool:
|
|
||||||
rc = self._mqtt.loop(timeout=1.0)
|
|
||||||
|
|
||||||
# If disconnect() has been called
|
|
||||||
# Beware, internal API, may have to change this to something more stable!
|
|
||||||
if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting:
|
|
||||||
return False # Stop listening
|
|
||||||
|
|
||||||
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
|
|
||||||
# If known/expected error
|
|
||||||
if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST:
|
|
||||||
log.warning("Connection lost, retrying")
|
|
||||||
elif rc == paho.mqtt.client.MQTT_ERR_NOMEM:
|
|
||||||
# This error is wrongly classified
|
|
||||||
# See https://github.com/eclipse/paho.mqtt.python/issues/340
|
|
||||||
log.warning("Connection error, retrying")
|
|
||||||
elif rc == paho.mqtt.client.MQTT_ERR_CONN_REFUSED:
|
|
||||||
raise _exception.NotLoggedIn("MQTT connection refused")
|
|
||||||
else:
|
|
||||||
err = paho.mqtt.client.error_string(rc)
|
|
||||||
log.error("MQTT Error: %s", err)
|
|
||||||
|
|
||||||
# Wait before reconnecting
|
|
||||||
self._mqtt._reconnect_wait()
|
|
||||||
|
|
||||||
# Try reconnecting
|
|
||||||
self._configure_connect_options()
|
|
||||||
try:
|
|
||||||
self._mqtt.reconnect()
|
|
||||||
except (
|
|
||||||
# Taken from .loop_forever
|
|
||||||
paho.mqtt.client.socket.error,
|
|
||||||
OSError,
|
|
||||||
paho.mqtt.client.WebsocketConnectionError,
|
|
||||||
) as e:
|
|
||||||
log.debug("MQTT reconnection failed: %s", e)
|
|
||||||
|
|
||||||
return True # Keep listening
|
|
||||||
|
|
||||||
def listen(self) -> Iterable[_events.Event]:
|
|
||||||
"""Run the listening loop continually.
|
|
||||||
|
|
||||||
Yields events when they arrive.
|
|
||||||
|
|
||||||
This will automatically reconnect on errors.
|
|
||||||
|
|
||||||
Example:
|
|
||||||
Print events continually.
|
|
||||||
|
|
||||||
>>> for event in listener.listen():
|
|
||||||
... print(event)
|
|
||||||
"""
|
|
||||||
while self._loop_once():
|
|
||||||
if self._tmp_events:
|
|
||||||
yield from self._tmp_events
|
|
||||||
self._tmp_events = None
|
|
||||||
|
|
||||||
def disconnect(self) -> None:
|
|
||||||
"""Disconnect the MQTT listener.
|
|
||||||
|
|
||||||
Can be called while listening, which will stop the listening loop.
|
|
||||||
|
|
||||||
The `Listener` object should not be used after this is called!
|
|
||||||
|
|
||||||
Example:
|
|
||||||
Stop the listener when recieving a message with the text "/stop"
|
|
||||||
|
|
||||||
>>> for event in listener.listen():
|
|
||||||
... if isinstance(event, fbchat.MessageEvent):
|
|
||||||
... if event.message.text == "/stop":
|
|
||||||
... listener.disconnect() # Almost the same "break"
|
|
||||||
"""
|
|
||||||
self._mqtt.disconnect()
|
|
||||||
|
|
||||||
def set_foreground(self, value: bool) -> None:
|
|
||||||
"""Set the `foreground` value while listening."""
|
|
||||||
# TODO: Document what this actually does!
|
|
||||||
payload = _util.json_minimal({"foreground": value})
|
|
||||||
info = self._mqtt.publish("/foreground_state", payload=payload, qos=1)
|
|
||||||
self._foreground = value
|
|
||||||
# TODO: We can't wait for this, since the loop is running within the same thread
|
|
||||||
# info.wait_for_publish()
|
|
||||||
|
|
||||||
def set_chat_on(self, value: bool) -> None:
|
|
||||||
"""Set the `chat_on` value while listening."""
|
|
||||||
# TODO: Document what this actually does!
|
|
||||||
# TODO: Is this the right request to make?
|
|
||||||
data = {"make_user_available_when_in_foreground": value}
|
|
||||||
payload = _util.json_minimal(data)
|
|
||||||
info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1)
|
|
||||||
self._chat_on = value
|
|
||||||
# TODO: We can't wait for this, since the loop is running within the same thread
|
|
||||||
# info.wait_for_publish()
|
|
||||||
|
|
||||||
# def send_additional_contacts(self, additional_contacts):
|
|
||||||
# payload = _util.json_minimal({"additional_contacts": additional_contacts})
|
|
||||||
# info = self._mqtt.publish("/send_additional_contacts", payload=payload, qos=1)
|
|
||||||
#
|
|
||||||
# def browser_close(self):
|
|
||||||
# info = self._mqtt.publish("/browser_close", payload=b"{}", qos=1)
|
|
@@ -308,7 +308,7 @@ class Session:
|
|||||||
# Fall back to searching with a regex
|
# Fall back to searching with a regex
|
||||||
res = FB_DTSG_REGEX.search(r.text)
|
res = FB_DTSG_REGEX.search(r.text)
|
||||||
if not res:
|
if not res:
|
||||||
raise ValueError("Failed loading session, could not find fb_dtsg")
|
raise _exception.NotLoggedIn("Could not find fb_dtsg")
|
||||||
fb_dtsg = res.group(1)
|
fb_dtsg = res.group(1)
|
||||||
|
|
||||||
revision = int(r.text.split('"client_revision":', 1)[1].split(",", 1)[0])
|
revision = int(r.text.split('"client_revision":', 1)[1].split(",", 1)[0])
|
||||||
|
Reference in New Issue
Block a user