Compare commits

...

36 Commits

Author SHA1 Message Date
Mads Marquart
b9b4d57b25 Bump version: 1.9.5 → 1.9.6 2020-01-21 19:50:57 +01:00
Mads Marquart
b4618739f3 Fix MQTT errors after being offline for too long 2020-01-21 19:39:59 +01:00
Mads Marquart
22c6c82c0e Disable /t_rtc MQTT topic 2020-01-20 14:54:25 +01:00
Mads Marquart
19c875c18a Bump version: 1.9.4 → 1.9.5 2020-01-20 09:32:30 +01:00
Mateusz Soszyński
12bbc0058c Add onPendingMessage (#512) 2020-01-20 09:28:41 +01:00
Mads Marquart
9c81806b95 Bump version: 1.9.3 → 1.9.4 2020-01-14 23:29:58 +01:00
Mads Marquart
45303005b8 Fix onFriendRequest 2020-01-14 23:27:50 +01:00
Mads Marquart
881aa9adce Bump version: 1.9.2 → 1.9.3 2020-01-08 09:38:18 +01:00
Mads Marquart
4714be5697 Fix MQTT JSON decoding 2020-01-08 09:35:26 +01:00
Mads Marquart
cb7f4a72d7 Bump version: 1.9.1 → 1.9.2 2020-01-08 08:47:16 +01:00
Mads Marquart
fb63ff0db8 Fix cookie header extraction
Only worked when the cookies were loaded from file, hence the reason I
didn't spot it the first time. Thanks to @gave92 for the suggestion.

Fixes #495
2020-01-08 08:46:22 +01:00
Mads Marquart
c5f447e20b Bump version: 1.9.0 → 1.9.1 2020-01-06 13:23:39 +01:00
Mads Marquart
b4d3769fd5 Fix MQTT error handling
- Fix "Out of memory" errors
- Fix typo
2020-01-06 13:14:07 +01:00
Mads Marquart
b199d597b2 Bump version: 1.8.3 → 1.9.0 2020-01-06 10:57:19 +01:00
Mads Marquart
debfb37a47 Merge pull request #494 from carpedm20/websocket-mqtt-support
Add MQTT over WebSockets support
2020-01-06 10:51:20 +01:00
Mads Marquart
67fd6ffdf6 Better document MQTT topics 2020-01-06 10:34:39 +01:00
Mads Marquart
e57265016e Skip NoOp events 2020-01-06 10:27:40 +01:00
Mads Marquart
cf4c22898c Add undocumented _onSeen callback
Mostly just to slowly document unknown events
2020-01-06 10:27:11 +01:00
Mads Marquart
3bb99541e7 Improve MQTT connection error reporting 2020-01-05 23:44:19 +01:00
Mads Marquart
8c367af0ff Fix Python 2.7 errors 2020-01-05 20:52:50 +01:00
Mads Marquart
bc1e3edf17 Small fixes
Handle more errors, and fix Client.stopListening
2020-01-05 20:29:44 +01:00
Mads Marquart
e488f4a7da Fix typing status parsing
Co-authored-by: Tulir Asokan <tulir@maunium.net>
2020-01-05 19:57:53 +01:00
Mads Marquart
afad38d8e1 Fix chat timestamp parsing 2020-01-05 19:57:53 +01:00
Mads Marquart
e9804d4184 Fix message parsing 2020-01-05 19:57:53 +01:00
Mads Marquart
a1b80a7abb Replace pull channel with MQTT setup 2020-01-05 19:57:53 +01:00
Mads Marquart
803bfa7084 Add proper MQTT error handling 2020-01-05 19:57:53 +01:00
Mads Marquart
d1cb866b44 Refactor MQTT listening 2020-01-05 19:57:52 +01:00
Mads Marquart
a298e0cf16 Refactor MQTT to do proper reconnecting 2020-01-05 14:56:01 +01:00
Mads Marquart
766b0125fb Refactor MQTT connecting, add sync token support 2020-01-05 00:31:58 +01:00
Mads Marquart
998fa43fb2 Refactor MQTT connecting 2020-01-04 23:18:20 +01:00
Mads Marquart
ecc6edac5a Fix message receiving in MQTT 2020-01-04 16:23:51 +01:00
Mads Marquart
ea518ba4c9 Add initial MQTT helper 2020-01-04 16:23:35 +01:00
Mads Marquart
ffdf4222bf Split ._parseMessage to reduce indentation 2019-12-15 16:24:17 +01:00
Mads Marquart
a97ef67411 Backport e348425 2019-12-15 15:26:53 +01:00
Mads Marquart
813219cd9c Bump version: 1.8.2 → 1.8.3 2019-09-08 15:59:29 +02:00
Asiel Díaz Benítez
bb1f7d9294 Fix mimetypes.guess_type (#471)
`mimetypes.guess_type` fails if the url is something like `http://example.com/file.zip?u=10`.

Backported from 6bffb66
2019-09-08 15:58:34 +02:00
10 changed files with 568 additions and 238 deletions

View File

@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.8.2
current_version = 1.9.6
commit = True
tag = True

View File

@@ -13,7 +13,7 @@ from ._client import Client
from ._util import log # TODO: Remove this (from examples too)
__title__ = "fbchat"
__version__ = "1.8.2"
__version__ = "1.9.6"
__description__ = "Facebook Chat (Messenger) for Python"
__copyright__ = "Copyright 2015 - 2019 by Taehoon Kim"

View File

@@ -12,6 +12,7 @@ from ._util import *
from .models import *
from . import _graphql
from ._state import State
from ._mqtt import Mqtt
import time
import json
@@ -85,13 +86,11 @@ class Client(object):
Raises:
FBchatException: On failed login
"""
self._sticky, self._pool = (None, None)
self._seq = "0"
self._default_thread_id = None
self._default_thread_type = None
self._pull_channel = 0
self._markAlive = True
self._buddylist = dict()
self._mqtt = None
handler.setLevel(logging_level)
@@ -280,7 +279,7 @@ class Client(object):
def _forcedFetch(self, thread_id, mid):
params = {"thread_and_message_id": {"thread_id": thread_id, "message_id": mid}}
j, = self.graphql_requests(_graphql.from_doc_id("1768656253222505", params))
(j,) = self.graphql_requests(_graphql.from_doc_id("1768656253222505", params))
return j
def fetchThreads(self, thread_location, before=None, after=None, limit=None):
@@ -404,7 +403,7 @@ class Client(object):
FBchatException: If request failed
"""
params = {"search": name, "limit": limit}
j, = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_USER, params))
(j,) = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_USER, params))
return [User._from_graphql(node) for node in j[name]["users"]["nodes"]]
@@ -421,7 +420,7 @@ class Client(object):
FBchatException: If request failed
"""
params = {"search": name, "limit": limit}
j, = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_PAGE, params))
(j,) = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_PAGE, params))
return [Page._from_graphql(node) for node in j[name]["pages"]["nodes"]]
@@ -439,7 +438,7 @@ class Client(object):
FBchatException: If request failed
"""
params = {"search": name, "limit": limit}
j, = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_GROUP, params))
(j,) = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_GROUP, params))
return [Group._from_graphql(node) for node in j["viewer"]["groups"]["nodes"]]
@@ -457,7 +456,9 @@ class Client(object):
FBchatException: If request failed
"""
params = {"search": name, "limit": limit}
j, = self.graphql_requests(_graphql.from_query(_graphql.SEARCH_THREAD, params))
(j,) = self.graphql_requests(
_graphql.from_query(_graphql.SEARCH_THREAD, params)
)
rtn = []
for node in j[name]["threads"]["nodes"]:
@@ -764,7 +765,7 @@ class Client(object):
"load_read_receipts": True,
"before": before,
}
j, = self.graphql_requests(_graphql.from_doc_id("1860982147341344", params))
(j,) = self.graphql_requests(_graphql.from_doc_id("1860982147341344", params))
if j.get("message_thread") is None:
raise FBchatException("Could not fetch thread {}: {}".format(thread_id, j))
@@ -823,7 +824,7 @@ class Client(object):
"includeDeliveryReceipts": True,
"includeSeqID": False,
}
j, = self.graphql_requests(_graphql.from_doc_id("1349387578499440", params))
(j,) = self.graphql_requests(_graphql.from_doc_id("1349387578499440", params))
rtn = []
for node in j["viewer"]["message_threads"]["nodes"]:
@@ -943,7 +944,7 @@ class Client(object):
return Plan._from_fetch(j)
def _getPrivateData(self):
j, = self.graphql_requests(_graphql.from_doc_id("1868889766468115", {}))
(j,) = self.graphql_requests(_graphql.from_doc_id("1868889766468115", {}))
return j["viewer"]
def getPhoneNumbers(self):
@@ -994,7 +995,7 @@ class Client(object):
thread_id, thread_type = self._getThread(thread_id, None)
data = {"id": thread_id, "first": 48}
thread_id = str(thread_id)
j, = self.graphql_requests(_graphql.from_query_id("515216185516880", data))
(j,) = self.graphql_requests(_graphql.from_query_id("515216185516880", data))
while True:
try:
i = j[thread_id]["message_shared_media"]["edges"][0]
@@ -1005,7 +1006,7 @@ class Client(object):
data["after"] = j[thread_id]["message_shared_media"][
"page_info"
].get("end_cursor")
j, = self.graphql_requests(
(j,) = self.graphql_requests(
_graphql.from_query_id("515216185516880", data)
)
continue
@@ -1534,7 +1535,7 @@ class Client(object):
"response": "ACCEPT" if approve else "DENY",
"surface": "ADMIN_MODEL_APPROVAL_CENTER",
}
j, = self.graphql_requests(
(j,) = self.graphql_requests(
_graphql.from_doc_id("1574519202665847", {"data": data})
)
@@ -1589,7 +1590,7 @@ class Client(object):
Raises:
FBchatException: If request failed
"""
(image_id, mimetype), = self._upload(get_files_from_urls([image_url]))
((image_id, mimetype),) = self._upload(get_files_from_urls([image_url]))
return self._changeGroupImage(image_id, thread_id)
def changeGroupImageLocal(self, image_path, thread_id=None):
@@ -1603,7 +1604,7 @@ class Client(object):
FBchatException: If request failed
"""
with get_files_from_paths([image_path]) as files:
(image_id, mimetype), = self._upload(files)
((image_id, mimetype),) = self._upload(files)
return self._changeGroupImage(image_id, thread_id)
@@ -2167,39 +2168,7 @@ class Client(object):
LISTEN METHODS
"""
def _ping(self):
data = {
"seq": self._seq,
"channel": "p_" + self._uid,
"clientid": self._state._client_id,
"partition": -2,
"cap": 0,
"uid": self._uid,
"sticky_token": self._sticky,
"sticky_pool": self._pool,
"viewer_uid": self._uid,
"state": "active",
}
j = self._get(
"https://{}-edge-chat.facebook.com/active_ping".format(self._pull_channel),
data,
)
def _pullMessage(self):
"""Call pull api to fetch message data."""
data = {
"seq": self._seq,
"msgs_recv": 0,
"sticky_token": self._sticky,
"sticky_pool": self._pool,
"clientid": self._state._client_id,
"state": "active" if self._markAlive else "offline",
}
return self._get(
"https://{}-edge-chat.facebook.com/pull".format(self._pull_channel), data
)
def _parseDelta(self, m):
def _parseDelta(self, delta):
def getThreadIdAndThreadType(msg_metadata):
"""Return a tuple consisting of thread ID and thread type."""
id_thread = None
@@ -2212,7 +2181,6 @@ class Client(object):
type_thread = ThreadType.USER
return id_thread, type_thread
delta = m["delta"]
delta_type = delta.get("type")
delta_class = delta.get("class")
metadata = delta.get("messageMetadata")
@@ -2232,7 +2200,7 @@ class Client(object):
author_id=author_id,
thread_id=thread_id,
ts=ts,
msg=m,
msg=delta,
)
# Left/removed participants
@@ -2245,7 +2213,7 @@ class Client(object):
author_id=author_id,
thread_id=thread_id,
ts=ts,
msg=m,
msg=delta,
)
# Color change
@@ -2260,9 +2228,15 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
msg=delta,
)
elif delta_class == "MarkFolderSeen":
locations = [
ThreadLocation(folder.lstrip("FOLDER_")) for folder in delta["folders"]
]
self._onSeen(locations=locations, ts=delta["timestamp"], msg=delta)
# Emoji change
elif delta_type == "change_thread_icon":
new_emoji = delta["untypedData"]["thread_icon"]
@@ -2275,7 +2249,7 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
msg=delta,
)
# Thread title change
@@ -2290,14 +2264,24 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
msg=delta,
)
# Forced fetch
elif delta_class == "ForcedFetch":
mid = delta.get("messageId")
if mid is None:
self.onUnknownMesssageType(msg=m)
if delta["threadKey"] is not None:
# Looks like the whole delta is metadata in this case
thread_id, thread_type = getThreadIdAndThreadType(delta)
self.onPendingMessage(
thread_id=thread_id,
thread_type=thread_type,
metadata=delta,
msg=delta,
)
else:
self.onUnknownMesssageType(msg=delta)
else:
thread_id = str(delta["threadKey"]["threadFbId"])
fetch_info = self._forcedFetch(thread_id, mid)
@@ -2319,7 +2303,7 @@ class Client(object):
thread_id=thread_id,
thread_type=ThreadType.GROUP,
ts=ts,
msg=m,
msg=delta,
)
# Nickname change
@@ -2336,7 +2320,7 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
msg=delta,
)
# Admin added or removed in a group thread
@@ -2352,7 +2336,7 @@ class Client(object):
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
msg=delta,
)
elif admin_event == "remove_admin":
self.onAdminRemoved(
@@ -2362,7 +2346,7 @@ class Client(object):
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
msg=delta,
)
# Group approval mode change
@@ -2376,7 +2360,7 @@ class Client(object):
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
msg=delta,
)
# Message delivered
@@ -2394,7 +2378,7 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
msg=delta,
)
# Message seen
@@ -2410,7 +2394,7 @@ class Client(object):
seen_ts=seen_ts,
ts=delivered_ts,
metadata=metadata,
msg=m,
msg=delta,
)
# Messages marked as seen
@@ -2431,7 +2415,11 @@ class Client(object):
# thread_id, thread_type = getThreadIdAndThreadType(delta)
self.onMarkedSeen(
threads=threads, seen_ts=seen_ts, ts=delivered_ts, metadata=delta, msg=m
threads=threads,
seen_ts=seen_ts,
ts=delivered_ts,
metadata=delta,
msg=delta,
)
# Game played
@@ -2456,9 +2444,13 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
msg=delta,
)
# Skip "no operation" events
elif delta_class == "NoOp":
pass
# Group call started/ended
elif delta_type == "rtc_call_log":
thread_id, thread_type = getThreadIdAndThreadType(metadata)
@@ -2474,7 +2466,7 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
msg=delta,
)
elif call_status == "call_ended":
self.onCallEnded(
@@ -2486,7 +2478,7 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
msg=delta,
)
# User joined to group call
@@ -2501,7 +2493,7 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
msg=delta,
)
# Group poll event
@@ -2520,7 +2512,7 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
msg=delta,
)
elif event_type == "update_vote":
# User voted on group poll
@@ -2536,7 +2528,7 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
msg=delta,
)
# Plan created
@@ -2550,7 +2542,7 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
msg=delta,
)
# Plan ended
@@ -2563,7 +2555,7 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
msg=delta,
)
# Plan edited
@@ -2577,7 +2569,7 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
msg=delta,
)
# Plan deleted
@@ -2591,7 +2583,7 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
msg=delta,
)
# Plan participation change
@@ -2607,13 +2599,13 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=m,
msg=delta,
)
# Client payload (that weird numbers)
elif delta_class == "ClientPayload":
payload = json.loads("".join(chr(z) for z in delta["payload"]))
ts = m.get("ofd_ts")
ts = now() # Hack
for d in payload.get("deltas", []):
# Message reaction
@@ -2634,7 +2626,7 @@ class Client(object):
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
msg=delta,
)
else:
self.onReactionRemoved(
@@ -2643,7 +2635,7 @@ class Client(object):
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
msg=delta,
)
# Viewer status change
@@ -2660,7 +2652,7 @@ class Client(object):
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
msg=delta,
)
else:
self.onBlock(
@@ -2668,7 +2660,7 @@ class Client(object):
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
msg=delta,
)
# Live location info
@@ -2686,7 +2678,7 @@ class Client(object):
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
msg=delta,
)
# Message deletion
@@ -2702,7 +2694,7 @@ class Client(object):
thread_id=thread_id,
thread_type=thread_type,
ts=ts,
msg=m,
msg=delta,
)
elif d.get("deltaMessageReply"):
@@ -2721,7 +2713,7 @@ class Client(object):
thread_type=thread_type,
ts=message.timestamp,
metadata=metadata,
msg=m,
msg=delta,
)
# New message
@@ -2742,117 +2734,92 @@ class Client(object):
thread_type=thread_type,
ts=ts,
metadata=metadata,
msg=delta,
)
# New pending message
elif delta_class == "ThreadFolder" and delta.get("folder") == "FOLDER_PENDING":
# Looks like the whole delta is metadata in this case
thread_id, thread_type = getThreadIdAndThreadType(delta)
self.onPendingMessage(
thread_id=thread_id, thread_type=thread_type, metadata=delta, msg=delta
)
# Unknown message type
else:
self.onUnknownMesssageType(msg=delta)
def _parse_payload(self, topic, m):
# Things that directly change chat
if topic == "/t_ms":
if "deltas" not in m:
return
for delta in m["deltas"]:
self._parseDelta(delta)
# TODO: Remove old parsing below
# Inbox
elif topic == "inbox":
self.onInbox(
unseen=m["unseen"],
unread=m["unread"],
recent_unread=m["recent_unread"],
msg=m,
)
# Typing
# /thread_typing {'sender_fbid': X, 'state': 1, 'type': 'typ', 'thread': 'Y'}
# /orca_typing_notifications {'type': 'typ', 'sender_fbid': X, 'state': 0}
elif topic in ("/thread_typing", "/orca_typing_notifications"):
author_id = str(m["sender_fbid"])
thread_id = m.get("thread", author_id)
typing_status = TypingStatus(m.get("state"))
thread_type = (
ThreadType.USER if thread_id == author_id else ThreadType.GROUP
)
self.onTyping(
author_id=author_id,
status=typing_status,
thread_id=thread_id,
thread_type=thread_type,
msg=m,
)
# Other notifications
elif topic == "/legacy_web":
# Friend request
if m["type"] == "jewel_requests_add":
from_id = m["from"]
# TODO: from_id = str(from_id)
self.onFriendRequest(from_id=from_id, msg=m)
else:
self.onUnknownMesssageType(msg=m)
# Chat timestamp / Buddylist overlay
elif topic == "/orca_presence":
if m["list_type"] == "full":
self._buddylist = {} # Refresh internal list
statuses = dict()
for data in m["list"]:
user_id = str(data["u"])
statuses[user_id] = ActiveStatus._from_orca_presence(data)
self._buddylist[user_id] = statuses[user_id]
# TODO: Which one should we call?
self.onChatTimestamp(buddylist=statuses, msg=m)
self.onBuddylistOverlay(statuses=statuses, msg=m)
# Unknown message type
else:
self.onUnknownMesssageType(msg=m)
def _parseMessage(self, content):
"""Get message and author name from content.
May contain multiple messages in the content.
"""
self._seq = content.get("seq", "0")
if "lb_info" in content:
self._sticky = content["lb_info"]["sticky"]
self._pool = content["lb_info"]["pool"]
if "batches" in content:
for batch in content["batches"]:
self._parseMessage(batch)
if "ms" not in content:
return
for m in content["ms"]:
mtype = m.get("type")
try:
# Things that directly change chat
if mtype == "delta":
self._parseDelta(m)
# Inbox
elif mtype == "inbox":
self.onInbox(
unseen=m["unseen"],
unread=m["unread"],
recent_unread=m["recent_unread"],
msg=m,
)
# Typing
elif mtype == "typ" or mtype == "ttyp":
author_id = str(m.get("from"))
thread_id = m.get("thread_fbid")
if thread_id:
thread_type = ThreadType.GROUP
thread_id = str(thread_id)
else:
thread_type = ThreadType.USER
if author_id == self._uid:
thread_id = m.get("to")
else:
thread_id = author_id
typing_status = TypingStatus(m.get("st"))
self.onTyping(
author_id=author_id,
status=typing_status,
thread_id=thread_id,
thread_type=thread_type,
msg=m,
)
# Delivered
# Seen
# elif mtype == "m_read_receipt":
#
# self.onSeen(m.get('realtime_viewer_fbid'), m.get('reader'), m.get('time'))
elif mtype in ["jewel_requests_add"]:
from_id = m["from"]
self.onFriendRequest(from_id=from_id, msg=m)
# Happens on every login
elif mtype == "qprimer":
self.onQprimer(ts=m.get("made"), msg=m)
# Is sent before any other message
elif mtype == "deltaflow":
pass
# Chat timestamp
elif mtype == "chatproxy-presence":
statuses = dict()
for id_, data in m.get("buddyList", {}).items():
statuses[id_] = ActiveStatus._from_chatproxy_presence(id_, data)
self._buddylist[id_] = statuses[id_]
self.onChatTimestamp(buddylist=statuses, msg=m)
# Buddylist overlay
elif mtype == "buddylist_overlay":
statuses = dict()
for id_, data in m.get("overlay", {}).items():
old_in_game = None
if id_ in self._buddylist:
old_in_game = self._buddylist[id_].in_game
statuses[id_] = ActiveStatus._from_buddylist_overlay(
data, old_in_game
)
self._buddylist[id_] = statuses[id_]
self.onBuddylistOverlay(statuses=statuses, msg=m)
# Unknown message type
else:
self.onUnknownMesssageType(msg=m)
except Exception as e:
self.onMessageError(exception=e, msg=m)
def _parse_message(self, topic, data):
try:
self._parse_payload(topic, data)
except Exception as e:
self.onMessageError(exception=e, msg=data)
def startListening(self):
"""Start listening from an external event loop.
@@ -2860,6 +2827,15 @@ class Client(object):
Raises:
FBchatException: If request failed
"""
if not self._mqtt:
self._mqtt = Mqtt.connect(
state=self._state,
on_message=self._parse_message,
chat_on=self._markAlive,
foreground=True,
)
# Backwards compat
self.onQprimer(ts=now(), msg=None)
self.listening = True
def doOneListen(self, markAlive=None):
@@ -2877,36 +2853,23 @@ class Client(object):
"""
if markAlive is not None:
self._markAlive = markAlive
try:
if self._markAlive:
self._ping()
content = self._pullMessage()
if content:
self._parseMessage(content)
except KeyboardInterrupt:
return False
except requests.Timeout:
pass
except requests.ConnectionError:
# If the client has lost their internet connection, keep trying every 30 seconds
time.sleep(30)
except FBchatFacebookError as e:
# Fix 502 and 503 pull errors
if e.request_status_code in [502, 503]:
# Bump pull channel, while contraining withing 0-4
self._pull_channel = (self._pull_channel + 1) % 5
self.startListening()
else:
raise e
except Exception as e:
return self.onListenError(exception=e)
return True
# TODO: Remove this wierd check, and let the user handle the chat_on parameter
if self._markAlive != self._mqtt._chat_on:
self._mqtt.set_chat_on(self._markAlive)
# TODO: Remove on_error param
return self._mqtt.loop_once(on_error=lambda e: self.onListenError(exception=e))
def stopListening(self):
"""Clean up the variables from `Client.startListening`."""
"""Stop the listening loop."""
self.listening = False
self._sticky, self._pool = (None, None)
if not self._mqtt:
return
self._mqtt.disconnect()
# TODO: Preserve the _mqtt object
# Currently, there's some issues when disconnecting
self._mqtt = None
def listen(self, markAlive=None):
"""Initialize and runs the listening loop continually.
@@ -3004,6 +2967,21 @@ class Client(object):
"""
log.info("{} from {} in {}".format(message_object, thread_id, thread_type.name))
def onPendingMessage(
self, thread_id=None, thread_type=None, metadata=None, msg=None
):
"""Called when the client is listening, and somebody that isn't
connected with you on either Facebook or Messenger sends a message.
After that, you need to use fetchThreadList to actually read the message.
Args:
thread_id: Thread ID that the message was sent to. See :ref:`intro_threads`
thread_type (ThreadType): Type of thread that the message was sent to. See :ref:`intro_threads`
metadata: Extra metadata about the message
msg: A full set of the data received
"""
log.info("New pending message from {}".format(thread_id))
def onColorChange(
self,
mid=None,
@@ -3365,6 +3343,17 @@ class Client(object):
"""
log.info("Friend request from {}".format(from_id))
def _onSeen(self, locations=None, ts=None, msg=None):
"""
Todo:
Document this, and make it public
Args:
locations: ---
ts: A timestamp of the action
msg: A full set of the data received
"""
def onInbox(self, unseen=None, unread=None, recent_unread=None, msg=None):
"""
Todo:
@@ -3863,7 +3852,6 @@ class Client(object):
statuses (dict): Dictionary with user IDs as keys and :class:`ActiveStatus` as values
msg: A full set of the data received
"""
log.debug("Buddylist overlay received: {}".format(statuses))
def onUnknownMesssageType(self, msg=None):
"""Called when the client is listening, and some unknown data was received.

338
fbchat/_mqtt.py Normal file
View File

@@ -0,0 +1,338 @@
import attr
import random
import paho.mqtt.client
from ._core import log
from . import _util, _exception, _graphql
def generate_session_id():
"""Generate a random session ID between 1 and 9007199254740991."""
return random.randint(1, 2 ** 53)
@attr.s(slots=True)
class Mqtt(object):
_state = attr.ib()
_mqtt = attr.ib()
_on_message = attr.ib()
_chat_on = attr.ib()
_foreground = attr.ib()
_sequence_id = attr.ib()
_sync_token = attr.ib(None)
_HOST = "edge-chat.facebook.com"
@classmethod
def connect(cls, state, on_message, chat_on, foreground):
mqtt = paho.mqtt.client.Client(
client_id="mqttwsclient",
clean_session=True,
protocol=paho.mqtt.client.MQTTv31,
transport="websockets",
)
mqtt.enable_logger()
# mqtt.max_inflight_messages_set(20) # The rest will get queued
# mqtt.max_queued_messages_set(0) # Unlimited messages can be queued
# mqtt.message_retry_set(20) # Retry sending for at least 20 seconds
# mqtt.reconnect_delay_set(min_delay=1, max_delay=120)
# TODO: Is region (lla | atn | odn | others?) important?
mqtt.tls_set()
self = cls(
state=state,
mqtt=mqtt,
on_message=on_message,
chat_on=chat_on,
foreground=foreground,
sequence_id=cls._fetch_sequence_id(state),
)
# Configure callbacks
mqtt.on_message = self._on_message_handler
mqtt.on_connect = self._on_connect_handler
self._configure_connect_options()
# Attempt to connect
try:
rc = mqtt.connect(self._HOST, 443, keepalive=10)
except (
# Taken from .loop_forever
paho.mqtt.client.socket.error,
OSError,
paho.mqtt.client.WebsocketConnectionError,
) as e:
raise _exception.FBchatException("MQTT connection failed")
# Raise error if connecting failed
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
err = paho.mqtt.client.error_string(rc)
raise _exception.FBchatException("MQTT connection failed: {}".format(err))
return self
def _on_message_handler(self, client, userdata, message):
# Parse payload JSON
try:
j = _util.parse_json(message.payload.decode("utf-8"))
except (_exception.FBchatFacebookError, UnicodeDecodeError):
log.exception("Failed parsing MQTT data on %s as JSON", message.topic)
return
log.debug("MQTT payload: %s, %s", message.topic, j)
if message.topic == "/t_ms":
# Update sync_token when received
# This is received in the first message after we've created a messenger
# sync queue.
if "syncToken" in j and "firstDeltaSeqId" in j:
self._sync_token = j["syncToken"]
self._sequence_id = j["firstDeltaSeqId"]
return
# Update last sequence id when received
if "lastIssuedSeqId" in j:
self._sequence_id = j["lastIssuedSeqId"]
if "errorCode" in j:
error = j["errorCode"]
# TODO: 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00'
if error in ("ERROR_QUEUE_NOT_FOUND", "ERROR_QUEUE_OVERFLOW"):
# ERROR_QUEUE_NOT_FOUND means that the queue was deleted, since too
# much time passed, or that it was simply missing
# ERROR_QUEUE_OVERFLOW means that the sequence id was too small, so
# the desired events could not be retrieved
log.error(
"The MQTT listener was disconnected for too long,"
" events may have been lost"
)
self._sync_token = None
self._sequence_id = self._fetch_sequence_id(self._state)
self._messenger_queue_publish()
# TODO: Signal to the user that they should reload their data!
return
log.error("MQTT error code %s received", error)
return
# Call the external callback
self._on_message(message.topic, j)
@staticmethod
def _fetch_sequence_id(state):
"""Fetch sequence ID."""
params = {
"limit": 1,
"tags": ["INBOX"],
"before": None,
"includeDeliveryReceipts": False,
"includeSeqID": True,
}
log.debug("Fetching MQTT sequence ID")
# Same request as in `Client.fetchThreadList`
(j,) = state._graphql_requests(_graphql.from_doc_id("1349387578499440", params))
try:
return int(j["viewer"]["message_threads"]["sync_sequence_id"])
except (KeyError, ValueError):
# TODO: Proper exceptions
raise
def _on_connect_handler(self, client, userdata, flags, rc):
if rc == 21:
raise _exception.FBchatException(
"Failed connecting. Maybe your cookies are wrong?"
)
if rc != 0:
return # Don't try to send publish if the connection failed
self._messenger_queue_publish()
def _messenger_queue_publish(self):
# configure receiving messages.
payload = {
"sync_api_version": 10,
"max_deltas_able_to_process": 1000,
"delta_batch_size": 500,
"encoding": "JSON",
"entity_fbid": self._state.user_id,
}
# If we don't have a sync_token, create a new messenger queue
# This is done so that across reconnects, if we've received a sync token, we
# SHOULD receive a piece of data in /t_ms exactly once!
if self._sync_token is None:
topic = "/messenger_sync_create_queue"
payload["initial_titan_sequence_id"] = str(self._sequence_id)
payload["device_params"] = None
else:
topic = "/messenger_sync_get_diffs"
payload["last_seq_id"] = str(self._sequence_id)
payload["sync_token"] = self._sync_token
self._mqtt.publish(topic, _util.json_minimal(payload), qos=1)
def _configure_connect_options(self):
# Generate a new session ID on each reconnect
session_id = generate_session_id()
topics = [
# Things that happen in chats (e.g. messages)
"/t_ms",
# Group typing notifications
"/thread_typing",
# Private chat typing notifications
"/orca_typing_notifications",
# Active notifications
"/orca_presence",
# Other notifications not related to chats (e.g. friend requests)
"/legacy_web",
# Facebook's continuous error reporting/logging?
"/br_sr",
# Response to /br_sr
"/sr_res",
# Data about user-to-user calls
# TODO: Investigate the response from this! (A bunch of binary data)
# "/t_rtc",
# TODO: Find out what this does!
# TODO: Investigate the response from this! (A bunch of binary data)
# "/t_p",
# TODO: Find out what this does!
"/webrtc",
# TODO: Find out what this does!
"/onevc",
# TODO: Find out what this does!
"/notify_disconnect",
# Old, no longer active topics
# These are here just in case something interesting pops up
"/inbox",
"/mercury",
"/messaging_events",
"/orca_message_notifications",
"/pp",
"/webrtc_response",
]
username = {
# The user ID
"u": self._state.user_id,
# Session ID
"s": session_id,
# Active status setting
"chat_on": self._chat_on,
# foreground_state - Whether the window is focused
"fg": self._foreground,
# Can be any random ID
"d": self._state._client_id,
# Application ID, taken from facebook.com
"aid": 219994525426954,
# MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing
"st": topics,
# MQTT extension by FB, allows making a PUBLISH while CONNECTing
# Using this is more efficient, but the same can be acheived with:
# def on_connect(*args):
# mqtt.publish(topic, payload, qos=1)
# mqtt.on_connect = on_connect
# TODO: For some reason this doesn't work!
"pm": [
# {
# "topic": topic,
# "payload": payload,
# "qos": 1,
# "messageId": 65536,
# }
],
# Unknown parameters
"cp": 3,
"ecp": 10,
"ct": "websocket",
"mqtt_sid": "",
"dc": "",
"no_auto_fg": True,
"gas": None,
"pack": [],
}
# TODO: Make this thread safe
self._mqtt.username_pw_set(_util.json_minimal(username))
headers = {
# TODO: Make this access thread safe
"Cookie": _util.get_cookie_header(
self._state._session, "https://edge-chat.facebook.com/chat"
),
"User-Agent": self._state._session.headers["User-Agent"],
"Origin": "https://www.facebook.com",
"Host": self._HOST,
}
self._mqtt.ws_set_options(
path="/chat?sid={}".format(session_id), headers=headers
)
def loop_once(self, on_error=None):
"""Run the listening loop once.
Returns whether to keep listening or not.
"""
rc = self._mqtt.loop(timeout=1.0)
# If disconnect() has been called
if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting:
return False # Stop listening
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
# If known/expected error
if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST:
log.warning("Connection lost, retrying")
elif rc == paho.mqtt.client.MQTT_ERR_NOMEM:
# This error is wrongly classified
# See https://github.com/eclipse/paho.mqtt.python/issues/340
log.warning("Connection error, retrying")
else:
err = paho.mqtt.client.error_string(rc)
log.error("MQTT Error: %s", err)
# For backwards compatibility
if on_error:
on_error(_exception.FBchatException("MQTT Error {}".format(err)))
# Wait before reconnecting
self._mqtt._reconnect_wait()
# Try reconnecting
self._configure_connect_options()
try:
self._mqtt.reconnect()
except (
# Taken from .loop_forever
paho.mqtt.client.socket.error,
OSError,
paho.mqtt.client.WebsocketConnectionError,
) as e:
log.debug("MQTT reconnection failed: %s", e)
return True # Keep listening
def disconnect(self):
self._mqtt.disconnect()
def set_foreground(self, value):
payload = _util.json_minimal({"foreground": value})
info = self._mqtt.publish("/foreground_state", payload=payload, qos=1)
self._foreground = value
# TODO: We can't wait for this, since the loop is running with .loop_forever()
# info.wait_for_publish()
def set_chat_on(self, value):
# TODO: Is this the right request to make?
data = {"make_user_available_when_in_foreground": value}
payload = _util.json_minimal(data)
info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1)
self._chat_on = value
# TODO: We can't wait for this, since the loop is running with .loop_forever()
# info.wait_for_publish()
# def send_additional_contacts(self, additional_contacts):
# payload = _util.json_minimal({"additional_contacts": additional_contacts})
# info = self._mqtt.publish("/send_additional_contacts", payload=payload, qos=1)
#
# def browser_close(self):
# info = self._mqtt.publish("/browser_close", payload=b"{}", qos=1)

View File

@@ -192,17 +192,6 @@ class ActiveStatus(object):
in_game = attr.ib(None)
@classmethod
def _from_chatproxy_presence(cls, id_, data):
return cls(
active=data["p"] in [2, 3] if "p" in data else None,
last_active=data.get("lat"),
in_game=int(id_) in data.get("gamers", {}),
)
@classmethod
def _from_buddylist_overlay(cls, data, in_game=None):
return cls(
active=data["a"] in [2, 3] if "a" in data else None,
last_active=data.get("la"),
in_game=None,
)
def _from_orca_presence(cls, data):
# TODO: Handle `c` and `vc` keys (Probably some binary data)
return cls(active=data["p"] in [2, 3], last_active=data.get("l"), in_game=None)

View File

@@ -57,6 +57,11 @@ def now():
return int(time() * 1000)
def json_minimal(data):
"""Get JSON data in minimal form."""
return json.dumps(data, separators=(",", ":"))
def strip_json_cruft(text):
"""Removes `for(;;);` (and other cruft) that preceeds JSON responses."""
try:
@@ -65,6 +70,14 @@ def strip_json_cruft(text):
raise FBchatException("No JSON object found: {!r}".format(text))
def get_cookie_header(session, url):
"""Extract a cookie header from a requests session."""
# The cookies are extracted this way to make sure they're escaped correctly
return requests.cookies.get_cookie_header(
session.cookies, requests.Request("GET", url),
)
def get_decoded_r(r):
return get_decoded(r._content)
@@ -219,11 +232,12 @@ def get_files_from_urls(file_urls):
r = requests.get(file_url)
# We could possibly use r.headers.get('Content-Disposition'), see
# https://stackoverflow.com/a/37060758
file_name = basename(file_url).split("?")[0].split("#")[0]
files.append(
(
basename(file_url).split("?")[0].split("#")[0],
file_name,
r.content,
r.headers.get("Content-Type") or guess_type(file_url)[0],
r.headers.get("Content-Type") or guess_type(file_name)[0],
)
)
return files

View File

@@ -17,6 +17,7 @@ requires = [
"attrs>=18.2",
"requests~=2.19",
"beautifulsoup4~=4.0",
"paho-mqtt~=1.5",
]
description-file = "README.rst"
classifiers = [

View File

@@ -27,7 +27,7 @@ def test_fetch_threads(client1):
@pytest.mark.parametrize("emoji, emoji_size", EMOJI_LIST)
def test_fetch_message_emoji(client, emoji, emoji_size):
mid = client.sendEmoji(emoji, emoji_size)
message, = client.fetchThreadMessages(limit=1)
(message,) = client.fetchThreadMessages(limit=1)
assert subset(
vars(message), uid=mid, author=client.uid, text=emoji, emoji_size=emoji_size
@@ -46,7 +46,7 @@ def test_fetch_message_info_emoji(client, thread, emoji, emoji_size):
def test_fetch_message_mentions(client, thread, message_with_mentions):
mid = client.send(message_with_mentions)
message, = client.fetchThreadMessages(limit=1)
(message,) = client.fetchThreadMessages(limit=1)
assert subset(
vars(message), uid=mid, author=client.uid, text=message_with_mentions.text
@@ -71,7 +71,7 @@ def test_fetch_message_info_mentions(client, thread, message_with_mentions):
@pytest.mark.parametrize("sticker", STICKER_LIST)
def test_fetch_message_sticker(client, sticker):
mid = client.send(Message(sticker=sticker))
message, = client.fetchThreadMessages(limit=1)
(message,) = client.fetchThreadMessages(limit=1)
assert subset(vars(message), uid=mid, author=client.uid)
assert subset(vars(message.sticker), uid=sticker.uid)
@@ -96,6 +96,6 @@ def test_fetch_info(client1, group):
def test_fetch_image_url(client):
client.sendLocalFiles([path.join(path.dirname(__file__), "resources", "image.png")])
message, = client.fetchThreadMessages(limit=1)
(message,) = client.fetchThreadMessages(limit=1)
assert client.fetchImageUrl(message.attachments[0].uid)

View File

@@ -19,5 +19,5 @@ def test_delete_messages(client):
mid1 = client.sendMessage(text1)
mid2 = client.sendMessage(text2)
client.deleteMessages(mid2)
message, = client.fetchThreadMessages(limit=1)
(message,) = client.fetchThreadMessages(limit=1)
assert subset(vars(message), uid=mid1, author=client.uid, text=text1)

View File

@@ -63,7 +63,7 @@ def test_create_poll(client1, group, catch_event, poll_data):
for recv_option in event[
"poll"
].options: # The recieved options may not be the full list
old_option, = list(filter(lambda o: o.text == recv_option.text, poll.options))
(old_option,) = list(filter(lambda o: o.text == recv_option.text, poll.options))
voters = [client1.uid] if old_option.vote else []
assert subset(
vars(recv_option), voters=voters, votes_count=len(voters), vote=False