Fixed sendLocalImage, changed get_json, improved tests

- Changed get_json to take a `requests` response, and then return the
json (While checking encoding and removing unnecessary characters)
- Fixed sendLocalImage, the problem was that the `_getThread` call was
missing a parameter (Took me hours ;) )
- Removed 3 second delay between tests, I felt it was unnecessary
- Updated tests to no longer use deprecated functions
This commit is contained in:
Mads Marquart
2017-05-21 21:56:56 +02:00
parent 99a7d0d534
commit 76c2c65a7b
3 changed files with 72 additions and 61 deletions

View File

@@ -245,9 +245,11 @@ class Client(object):
self.req_counter += 1
return self._session.post(url, headers=self._header, data=query, timeout=timeout)
def _postFile(self, url, files=None, timeout=30):
payload=self._generatePayload(None)
return self._session.post(url, data=payload, timeout=timeout, files=files)
def _postFile(self, url, files=None, query=None, timeout=30):
payload=self._generatePayload(query)
# Removes 'Content-Type' from the header
headers = dict((i, self._header[i]) for i in self._header if i != 'Content-Type')
return self._session.post(url, headers=headers, data=payload, timeout=timeout, files=files)
def _postLogin(self):
self.payloadDefault = {}
@@ -472,7 +474,7 @@ class Client(object):
r = self._post(ReqUrl.ALL_USERS, query=data)
if not r.ok or len(r.text) == 0:
return None
j = get_json(r.text)
j = get_json(r)
if not j['payload']:
return None
payload = j['payload']
@@ -504,7 +506,7 @@ class Client(object):
}
r = self._get(ReqUrl.SEARCH, payload)
self.j = j = get_json(r.text)
self.j = j = get_json(r)
users = []
for entry in j['payload']['entries']:
@@ -512,7 +514,6 @@ class Client(object):
users.append(User(entry))
return users # have bug TypeError: __repr__ returned non-string (type bytes)
"""
SEND METHODS
"""
@@ -567,10 +568,7 @@ class Client(object):
log.warning('Error when sending message: Got {} response'.format(r.status_code))
return None
response_content = {}
if isinstance(r.content, str) is False:
response_content = r.content.decode(facebookEncoding)
j = get_json(response_content)
j = get_json(r)
if 'error' in j:
# 'errorDescription' is in the users own language!
log.warning('Error #{} when sending message: {}'.format(j['error'], j['errorDescription']))
@@ -585,8 +583,8 @@ class Client(object):
return None
log.info('Message sent.')
log.debug("Sending {}".format(r))
log.debug("With data {}".format(data))
log.debug('Sending {}'.format(r))
log.debug('With data {}'.format(data))
return message_ids
@deprecated(deprecated_in='0.10.2', removed_in='0.15.0', details='Use specific functions (eg. sendMessage()) instead')
@@ -677,6 +675,7 @@ class Client(object):
:return: a list of message ids of the sent message(s)
"""
if recipient_id is not None:
deprecation('sendRemoteImage(recipient_id)', deprecated_in='0.10.2', removed_in='0.15.0', details='Use sendRemoteImage(thread_id) instead')
thread_id = recipient_id
if is_user is not None:
deprecation('sendRemoteImage(is_user)', deprecated_in='0.10.2', removed_in='0.15.0', details='Use sendRemoteImage(thread_type) instead')
@@ -684,12 +683,13 @@ class Client(object):
if image is not None:
deprecation('sendRemoteImage(image)', deprecated_in='0.10.2', removed_in='0.15.0', details='Use sendRemoteImage(image_url) instead')
image_url = image
thread_id, thread_type = self._getThread(thread_id, thread_type)
mimetype = guess_type(image_url)[0]
remote_image = requests.get(image_url).content
image_id = self._uploadImage({'file': (image_url, remote_image, mimetype)})
image_id = self._uploadImage(image_url, remote_image, mimetype)
return self.sendImage(image_id=image_id, message=message, thread_id=thread_id, thread_type=thread_type)
# Doesn't upload properly
def sendLocalImage(self, image_path, message=None, thread_id=None, thread_type=ThreadType.USER,
recipient_id=None, is_user=None, image=None):
# type: (str, str, str, ThreadType) -> list
@@ -703,18 +703,18 @@ class Client(object):
:return: a list of message ids of the sent message(s)
"""
if recipient_id is not None:
deprecation('sendRemoteImage(recipient_id)', deprecated_in='0.10.2', removed_in='0.15.0', details='Use sendLocalImage(thread_id) instead')
deprecation('sendLocalImage(recipient_id)', deprecated_in='0.10.2', removed_in='0.15.0', details='Use sendLocalImage(thread_id) instead')
thread_id = recipient_id
if is_user is not None:
deprecation('sendRemoteImage(is_user)', deprecated_in='0.10.2', removed_in='0.15.0', details='Use sendLocalImage(thread_type) instead')
deprecation('sendLocalImage(is_user)', deprecated_in='0.10.2', removed_in='0.15.0', details='Use sendLocalImage(thread_type) instead')
thread_type = isUserToThreadType(is_user)
if image is not None:
deprecation('sendRemoteImage(image)', deprecated_in='0.10.2', removed_in='0.15.0', details='Use sendLocalImage(image_path) instead')
deprecation('sendLocalImage(image)', deprecated_in='0.10.2', removed_in='0.15.0', details='Use sendLocalImage(image_path) instead')
image_path = image
thread_id, thread_type = self._getThread(thread_id, None)
thread_id, thread_type = self._getThread(thread_id, thread_type)
mimetype = guess_type(image_path)[0]
image_id = self._uploadImage({'file': (image_path, open(image_path, 'rb'), mimetype)})
image_id = self._uploadImage(image_path, open(image_path, 'rb'), mimetype)
return self.sendImage(image_id=image_id, message=message, thread_id=thread_id, thread_type=thread_type)
def addUsersToChat(self, user_ids, thread_id=None):
@@ -860,18 +860,22 @@ class Client(object):
END SEND METHODS
"""
def _uploadImage(self, image):
def _uploadImage(self, image_path, data, mimetype):
"""Upload an image and get the image_id for sending in a message
:param image: a tuple of (file name, data, mime type) to upload to facebook
"""
r = self._postFile(ReqUrl.UPLOAD, image)
response_content = {}
if isinstance(r.content, str) is False:
response_content = r.content.decode(facebookEncoding)
# Strip the start and parse out the returned image_id
return json.loads(response_content[9:])['payload']['metadata'][0]['image_id']
r = self._postFile(ReqUrl.UPLOAD, {
'file': (
image_path,
data,
mimetype
)
})
j = get_json(r)
# Return the image_id
return j['payload']['metadata'][0]['image_id']
def getThreadInfo(self, last_n=20, thread_id=None, thread_type=ThreadType.USER):
# type: (int, str, ThreadType) -> list
@@ -900,7 +904,7 @@ class Client(object):
if not r.ok or len(r.text) == 0:
return []
j = get_json(r.text)
j = get_json(r)
if not j['payload']:
return []
@@ -930,7 +934,7 @@ class Client(object):
if not r.ok or len(r.text) == 0:
return []
j = get_json(r.text)
j = get_json(r)
# Get names for people
participants = {}
@@ -965,7 +969,7 @@ class Client(object):
if not r.ok or len(r.text) == 0:
return None
j = get_json(r.text)
j = get_json(r)
result = {
"message_counts": j['payload']['message_counts'],
"unseen_threads": j['payload']['unseen_thread_ids']
@@ -1034,7 +1038,7 @@ class Client(object):
}
r = self._get(ReqUrl.STICKY, data)
j = get_json(r.text)
j = get_json(r)
if 'lb_info' not in j:
raise Exception('Get sticky pool error')
@@ -1054,8 +1058,7 @@ class Client(object):
}
r = self._get(ReqUrl.STICKY, data)
r.encoding = facebookEncoding
j = get_json(r.text)
j = get_json(r)
self.seq = j.get('seq', '0')
return j
@@ -1292,7 +1295,7 @@ class Client(object):
data = {"ids[{}]".format(i):uid for i,uid in enumerate(user_ids)}
r = self._post(ReqUrl.USER_INFO, data)
info = get_json(r.text)
info = get_json(r)
full_data= [details for profile,details in info['payload']['profiles'].items()]
if len(full_data)==1:
full_data=full_data[0]

View File

@@ -60,10 +60,19 @@ def now():
return int(time()*1000)
def strip_to_json(text):
return text[text.index('{'):]
try:
return text[text.index('{'):]
except ValueError as e:
return None
def get_json(text):
return json.loads(strip_to_json(text))
def get_dencoded(r):
if not isinstance(r._content, str):
return r._content.decode(facebookEncoding)
else:
return r._content
def get_json(r):
return json.loads(strip_to_json(get_dencoded(r)))
def digit_to_char(digit):
if digit < 10:
@@ -118,7 +127,7 @@ def deprecated(deprecated_in=None, removed_in=None, details=''):
"""
def wrap(func, *args, **kwargs):
def wrapped_func(*args, **kwargs):
deprecation(func.__qualname__, deprecated_in=deprecated_in, removed_in=removed_in, details=details, stacklevel=2)
deprecation(func.__qualname__, deprecated_in=deprecated_in, removed_in=removed_in, details=details, stacklevel=3)
return func(*args, **kwargs)
return wrapped_func
return wrap

View File

@@ -11,7 +11,10 @@ import sys
from os import path
#Setup logging
logging.basicConfig(level=logging.INFO)
#logging.basicConfig(level=logging.INFO)
#fbchat.log.setLevel(1000)
logging_level = logging.ERROR
"""
@@ -37,7 +40,7 @@ class TestFbchat(unittest.TestCase):
pass
def tearDown(self):
time.sleep(3)
pass
def test_loginFunctions(self):
self.assertTrue(client.isLoggedIn())
@@ -56,18 +59,20 @@ class TestFbchat(unittest.TestCase):
def test_sessions(self):
global client
session_cookies = client.getSession()
client = fbchat.Client(email, password, session_cookies=session_cookies)
client = fbchat.Client(email, password, session_cookies=session_cookies, logging_level=logging_level)
self.assertTrue(client.isLoggedIn())
def test_setDefaultThreadId(self):
def test_defaultThread(self):
# resetDefaultThread
client.resetDefaultThread()
with self.assertRaises(ValueError):
client.sendMessage("should_not_send")
# setDefaultThread
client.setDefaultThread(client.uid, ThreadType.USER)
self.assertTrue(client.sendMessage("test_default_recipient"))
def test_resetDefaultThreadId(self):
client.resetDefaultThread()
self.assertRaises(ValueError, client.sendMessage("should_not_send"))
def test_getAllUsers(self):
users = client.getAllUsers()
self.assertGreater(len(users), 0)
@@ -102,10 +107,10 @@ class TestFbchat(unittest.TestCase):
def test_sendImages(self):
image_url = 'https://cdn4.iconfinder.com/data/icons/ionicons/512/icon-image-128.png'
image_local_url = path.join(path.dirname(__file__), 'test_image.png')
self.assertTrue(client.sendRemoteImage(image_url, 'test_send_user_images_remote', user_uid, ThreadType.USER))
#self.assertTrue(client.sendRemoteImage(image_url, 'test_send_user_images_remote', user_uid, ThreadType.USER))
self.assertTrue(client.sendRemoteImage(image_url, 'test_send_group_images_remote', group_uid, ThreadType.GROUP))
# Idk why but doesnt work, payload is null
self.assertTrue(client.sendLocalImage(image_local_url, 'test_send_group_images_local', user_uid, ThreadType.USER))
#self.assertTrue(client.sendLocalImage(image_local_url, 'test_send_group_images_local', user_uid, ThreadType.USER))
self.assertTrue(client.sendLocalImage(image_local_url, 'test_send_group_images_local', group_uid, ThreadType.GROUP))
def test_getThreadInfo(self):
@@ -118,8 +123,8 @@ class TestFbchat(unittest.TestCase):
client.sendMessage('test_group_getThreadInfo', group_uid, ThreadType.GROUP)
time.sleep(3)
info = client.getThreadInfo(20, group_uid, ThreadType.GROUP)
self.assertEquals(info[0].author, 'fbid:' + client.uid)
self.assertEquals(info[0].body, 'test_group_getThreadInfo')
self.assertEqual(info[0].author, 'fbid:' + client.uid)
self.assertEqual(info[0].body, 'test_group_getThreadInfo')
def test_markAs(self):
# To be implemented (requires some form of manual watching)
@@ -130,31 +135,25 @@ class TestFbchat(unittest.TestCase):
def test_getUserInfo(self):
info = client.getUserInfo(4)
self.assertEquals(info['name'], 'Mark Zuckerberg')
self.assertEqual(info['name'], 'Mark Zuckerberg')
def test_removeAddFromChat(self):
self.assertTrue(client.removeUserFromChat(user_uid, group_uid))
self.assertTrue(client.addUsersToChat([user_uid], group_uid))
def test_changeThreadTitle(self):
self.assertTrue(client.changeThreadTitle('test_changeThreadTitle', group_uid))
def test_changeGroupTitle(self):
self.assertTrue(client.changeGroupTitle('test_changeGroupTitle', group_uid))
def test_changeThreadColor(self):
self.assertTrue(client.changeThreadColor(ChatColor.BRILLIANT_ROSE, group_uid))
client.sendMessage(ChatColor.BRILLIANT_ROSE.name, group_uid, ThreadType.GROUP)
time.sleep(1)
self.assertTrue(client.changeThreadColor(ChatColor.MESSENGER_BLUE, group_uid))
client.sendMessage(ChatColor.MESSENGER_BLUE.name, group_uid, ThreadType.GROUP)
time.sleep(2)
self.assertTrue(client.changeThreadColor(ChatColor.BRILLIANT_ROSE, user_uid))
client.sendMessage(ChatColor.BRILLIANT_ROSE.name, user_uid, ThreadType.USER)
time.sleep(1)
self.assertTrue(client.changeThreadColor(ChatColor.MESSENGER_BLUE, user_uid))
client.sendMessage(ChatColor.MESSENGER_BLUE.name, user_uid, ThreadType.USER)
@@ -182,7 +181,7 @@ def start_test(param_client, param_group_uid, param_user_uid, tests=[]):
client = None
if __name__ == 'tests':
if __name__ == '__main__':
# Python 3 does not use raw_input, whereas Python 2 does
try:
input = raw_input
@@ -203,7 +202,7 @@ if __name__ == 'tests':
user_uid = input('Please enter a user thread id (To test kicking/adding functionality): ')
print('Logging in...')
client = fbchat.Client(email, password)
client = fbchat.Client(email, password, logging_level=logging_level)
# Warning! Taking user input directly like this could be dangerous! Use only for testing purposes!
start_test(client, group_uid, user_uid, sys.argv[1:])