paasio: refactor/simplify test file(s)

This commit is contained in:
Corey McCandless
2021-02-06 17:12:26 -05:00
committed by Corey McCandless
parent 21aedf159e
commit 753bcccdad
3 changed files with 193 additions and 482 deletions

View File

@@ -0,0 +1,5 @@
{
"files": {
"test": ["paasio_test.py", "test_utils.py"]
}
}

View File

@@ -1,463 +1,14 @@
import errno
import inspect
import io
import logging
import os
import unittest
from unittest.mock import ANY, call, NonCallableMagicMock, patch
from test_utils import MockSock, MockFile, MockException, ZEN, SuperMock
from paasio import MeteredFile, MeteredSocket
LOGGER = logging.getLogger(__name__)
ZEN = b"""Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one-- and preferably only one --obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better than *right* now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea -- let's do more of those!
"""
# Implementation tests begin on line 456
# Code below this line consists of mocks and tests for mocks compliance
class MockException(Exception):
pass
class MockFile(io.BytesIO):
def __init__(self, *args, chunk=None, exception=None, **kwargs):
super(MockFile, self).__init__(*args, **kwargs)
self.__chunk = chunk
self.__exception = exception
def __exit__(self, exc_type, exc_val, exc_tb):
ret = super(MockFile, self).__exit__(exc_type, exc_val, exc_tb)
if exc_type is not None and "suppress" in exc_val.args[0]:
return True
return ret
def read(self, size=-1):
if self.__exception is not None:
raise self.__exception
if self.__chunk is None:
return super(MockFile, self).read(size)
if size is None:
return super(MockFile, self).read(self.__chunk)
if size < 0:
return super(MockFile, self).read(self.__chunk)
return super(MockFile, self).read(min(self.__chunk, size))
def write(self, data):
if self.__chunk is None:
return super(MockFile, self).write(data)
return super(MockFile, self).write(data[: self.__chunk])
class MockSock:
def __init__(self, *, chunk=None, exception=None):
self._recver = io.BytesIO(ZEN)
self._sender = io.BytesIO()
self.__closed = False
self.__chunk = chunk
self.__exception = exception
self.flags = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._recver.close()
self._sender.close()
self.__closed = True
if exc_type is not None and "suppress" in exc_val.args[0]:
return True
return False
def recv(self, bufsize, flags=0):
if self.__closed:
raise OSError(errno.EBADF, os.strerror(errno.EBADF))
if bufsize is None:
raise TypeError(
"'NoneType' object cannot be interpreted as an integer"
)
if not isinstance(flags, int):
raise TypeError(
"an integer is required (got type {})".format(
type(flags).__name__
)
)
self.flags = flags
if self.__exception is not None:
raise self.__exception
if self.__chunk is None:
return self._recver.read(bufsize)
else:
return self._recver.read(min(self.__chunk, bufsize))
def send(self, data, flags=0):
if self.__closed:
raise OSError(errno.EBADF, os.strerror(errno.EBADF))
if not isinstance(flags, int):
raise TypeError(
"an integer is required (got type {})".format(
type(flags).__name__
)
)
self.flags = flags
if self.__chunk is None:
return self._sender.write(data)
return self._sender.write(data[: self.__chunk])
class SuperMock:
"""Mock for super().__init__ calls only, as mock.MagicMock cannot."""
def __init__(self, *args, **kwargs):
if self.initialized:
self.init_called += 1
else:
self.initialized = True
def __call__(self, *args, **kwargs):
frame = inspect.currentframe()
if frame is None:
raise RuntimeError("Could not get current frame object")
stack = inspect.getouterframes(frame)
if any(
frame[3] == "__init__" and "paasio" in frame[1] for frame in stack
):
return self
else:
return self.mock_object
def __repr__(self):
return "<SuperMock at {} with mock object: {!r}>".format(
hex(id(self)), self.mock_object
)
mock_object = None
init_called = 0
initialized = False
class MockTestCase(unittest.TestCase):
def setUp(self):
self.test_passed = False
def tearDown(self):
if not self.test_passed:
LOGGER.critical(
"\nError in mocks, please report to Exercism: %s", self.id()
)
def test_fixture_exception(self):
self.assertEqual(False, self.test_passed)
with self.assertLogs(__name__, level="CRITICAL") as logs:
self.tearDown()
self.assertEqual(1, len(logs.output))
self.assertRegex(logs.output[0], ":\nError in mocks")
self.assertRegex(logs.output[0], "{}$".format(self.id()))
self.test_passed = True
class SuperMockTest(MockTestCase):
def test_super_mock_repr(self):
mock = SuperMock()
expected = "<SuperMock at {} with mock object: {}>".format(
hex(id(mock)), repr(None)
)
self.assertEqual(expected, repr(mock))
file = MockFile()
mock.mock_object = file
expected = "<SuperMock at {} with mock object: {}>".format(
hex(id(mock)), repr(file)
)
self.assertEqual(expected, repr(mock))
self.test_passed = True
def test_super_mock_init(self):
self.assertIs(False, SuperMock.initialized)
mock = SuperMock()
self.assertIs(True, mock.initialized)
for calls in range(5):
self.assertEqual(calls, mock.init_called)
mock.__init__()
self.test_passed = True
def test_super_mock_call(self):
def call2(obj):
return obj()
def __init__(obj):
ret = call2(obj)
self.assertEqual(obj(), ret)
return ret
mock = SuperMock()
self.assertIs(None, mock.mock_object)
wrapped = object()
mock.mock_object = wrapped
self.assertIs(wrapped, mock())
self.assertIs(mock, __init__(mock))
self.test_passed = True
@patch("inspect.currentframe", return_value=None)
def test_super_mock_frame_error(self, inspect_mock):
with self.assertRaisesRegex(
RuntimeError, "^Could not get current frame object$"
):
SuperMock()()
self.test_passed = True
class MockSockTest(MockTestCase):
def test_context_manager(self):
with MockSock() as socket:
socket.recv(30)
socket.send(b"data")
with self.assertRaisesRegex(OSError, os.strerror(errno.EBADF)):
socket.recv(30)
with self.assertRaisesRegex(OSError, os.strerror(errno.EBADF)):
socket.send(b"")
self.test_passed = True
def test_context_manager_exception_raise(self):
exception = MockException("Should raise")
with self.assertRaisesRegex(MockException, "Should raise") as err:
with MockSock(exception=exception) as socket:
socket.recv(4096)
self.assertEqual(exception, err.exception)
self.test_passed = True
def test_context_manager_exception_suppress(self):
exception = MockException("Should suppress")
with MockSock(exception=exception) as socket:
socket.recv(4096)
self.test_passed = True
def test_recv_once(self):
with MockSock() as socket:
actual_recv = socket.recv(4096)
self.assertEqual(ZEN, actual_recv)
self.test_passed = True
def test_recv_multiple(self):
actual_recv = b""
with MockSock() as socket:
for _ in range(5):
actual_recv += socket.recv(30)
self.assertEqual(ZEN[:150], actual_recv)
self.test_passed = True
def test_recv_multiple_chunk(self):
actual_recv = b""
with MockSock(chunk=20) as socket:
for _ in range(5):
actual_recv += socket.recv(4096)
actual_recv += socket.recv(10)
self.assertEqual(ZEN[:110], actual_recv)
self.test_passed = True
def test_recv_under_size(self):
with MockSock(chunk=257) as socket:
actual_recv = socket.recv(4096)
self.assertEqual(ZEN[:257], actual_recv)
self.test_passed = True
def test_send_once(self):
with MockSock(chunk=257) as socket:
send_len = socket.send(ZEN)
self.assertEqual(ZEN[:257], socket._sender.getbuffer())
self.assertEqual(257, send_len)
self.test_passed = True
def test_send_multiple(self):
send_len = 0
expected = b"Tomorrow's victory is today's practice."
with MockSock() as socket:
send_len += socket.send(b"Tomorro")
send_len += socket.send(b"w's victo")
send_len += socket.send(b"ry is today")
send_len += socket.send(b"'s practice.")
self.assertEqual(expected, socket._sender.getbuffer())
self.assertEqual(39, send_len)
self.test_passed = True
def test_send_under_size(self):
with MockSock(chunk=257) as socket:
send_len = socket.send(ZEN[:123])
self.assertEqual(ZEN[:123], socket._sender.getbuffer())
self.assertEqual(123, send_len)
self.test_passed = True
def test_bufsize_required(self):
with self.assertRaisesRegex(TypeError, "argument"):
with MockSock() as socket:
socket.recv()
with self.assertRaisesRegex(TypeError, "'NoneType'.+integer"):
with MockSock() as socket:
socket.recv(None)
self.test_passed = True
def test_flags_support(self):
with MockSock() as socket:
self.assertEqual(len(ZEN), socket.send(ZEN, 42))
self.assertEqual(ZEN, socket.recv(4096, 24))
with MockSock() as mock_sock:
self.assertIs(None, mock_sock.flags)
mock_sock.recv(50)
self.assertEqual(0, mock_sock.flags)
mock_sock.send(b"no flags")
self.assertEqual(0, mock_sock.flags)
mock_sock.recv(30, 30)
self.assertEqual(30, mock_sock.flags)
mock_sock.send(b"flags", 1024)
self.assertEqual(
1024, mock_sock.flags,
)
with self.assertRaisesRegex(
TypeError, r"^an integer is.+NoneType\)$"
):
mock_sock.send(b"data", None)
with self.assertRaisesRegex(
TypeError, r"^an integer is.+bytes\)$"
):
mock_sock.send(b"data", b"flags")
with self.assertRaisesRegex(
TypeError, r"^an integer is.+NoneType\)$"
):
mock_sock.recv(b"data", None)
with self.assertRaisesRegex(
TypeError, r"^an integer is.+bytes\)$"
):
mock_sock.recv(b"data", b"flags")
self.test_passed = True
class MockFileTest(MockTestCase):
def test_context_manager(self):
with MockFile(ZEN) as file:
file.read()
with self.assertRaisesRegex(
ValueError, "I/O operation on closed file."
):
file.read()
with self.assertRaisesRegex(
ValueError, "I/O operation on closed file."
):
file.write(b"data")
self.test_passed = True
def test_context_manager_exception_raise(self):
exception = MockException("Should raise")
mock = MockFile(ZEN, exception=exception)
with self.assertRaisesRegex(MockException, "Should raise") as err:
with mock as file:
file.read()
self.assertEqual(exception, err.exception)
self.test_passed = True
def test_context_manager_exception_suppress(self):
exception = MockException("Should suppress")
mock = MockFile(ZEN, exception=exception)
with mock as file:
file.read()
self.test_passed = True
def test_read_once(self):
with MockFile(ZEN) as file:
actual_read = file.read()
self.assertEqual(ZEN, actual_read)
with MockFile(ZEN) as file:
actual_read = file.read(None)
self.assertEqual(ZEN, actual_read)
with MockFile(ZEN) as file:
actual_read = file.read(-1)
self.assertEqual(ZEN, actual_read)
self.test_passed = True
def test_read_multiple(self):
actual_read = b""
with MockFile(ZEN) as file:
for _ in range(5):
actual_read += file.read(30)
self.assertEqual(ZEN[:150], actual_read)
self.test_passed = True
def test_read_multiple_chunk(self):
actual_read = b""
with MockFile(ZEN, chunk=20) as file:
for _ in range(5):
actual_read += file.read()
actual_read += file.read(10)
self.assertEqual(ZEN[:110], actual_read)
actual_read = b""
with MockFile(ZEN, chunk=20) as file:
for size in [None, -2, -1, 0, 1, 2]:
actual_read += file.read(size)
actual_read += file.read(10)
self.assertEqual(ZEN[:73], actual_read)
self.test_passed = True
def test_read_under_size(self):
with MockFile(ZEN, chunk=257) as file:
actual_read = file.read()
self.assertEqual(ZEN[:257], actual_read)
self.test_passed = True
def test_write_once(self):
with MockFile(chunk=257) as file:
write_len = file.write(ZEN)
self.assertEqual(ZEN[:257], file.getbuffer())
self.assertEqual(257, write_len)
self.test_passed = True
def test_write_multiple(self):
write_len = 0
expected = b"Tomorrow's victory is today's practice."
with MockFile() as file:
write_len += file.write(b"Tomorro")
write_len += file.write(b"w's victo")
write_len += file.write(b"ry is today")
write_len += file.write(b"'s practice.")
self.assertEqual(expected, file.getbuffer())
self.assertEqual(len(expected), write_len)
self.test_passed = True
def test_write_under_size(self):
with MockFile(chunk=257) as file:
write_len = file.write(ZEN[:123])
self.assertEqual(ZEN[:123], file.getbuffer())
self.assertEqual(123, write_len)
self.test_passed = True
# Tests for paasio.py begin here
class MeteredSocketTest(unittest.TestCase):
def test_context_manager(self):
class PaasioTest(unittest.TestCase):
def test_meteredsocket_context_manager(self):
wrapped = MockSock()
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
mock.__exit__.side_effect = wrapped.__exit__
@@ -472,7 +23,7 @@ class MeteredSocketTest(unittest.TestCase):
with self.assertRaisesRegex(OSError, os.strerror(errno.EBADF)):
socket.send(b"")
def test_context_manager_exception_raise(self):
def test_meteredsocket_context_manager_exception_raise(self):
exception = MockException("Should raise")
wrapped = MockSock(exception=exception)
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
@@ -487,7 +38,7 @@ class MeteredSocketTest(unittest.TestCase):
)
self.assertEqual(exception, err.exception)
def test_context_manager_exception_suppress(self):
def test_meteredsocket_context_manager_exception_suppress(self):
exception = MockException("Should suppress")
wrapped = MockSock(exception=exception)
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
@@ -500,7 +51,7 @@ class MeteredSocketTest(unittest.TestCase):
MockException, exception, ANY,
)
def test_recv_once(self):
def test_meteredsocket_recv_once(self):
mock = NonCallableMagicMock(wraps=MockSock(), autospec=True)
with MeteredSocket(mock) as socket:
actual_recv = socket.recv(4096)
@@ -509,7 +60,7 @@ class MeteredSocketTest(unittest.TestCase):
self.assertEqual(len(ZEN), socket.recv_bytes)
self.assertEqual(1, mock.recv.call_count)
def test_recv_multiple(self):
def test_meteredsocket_recv_multiple(self):
wrapped = MockSock()
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
actual_recv = b""
@@ -521,7 +72,7 @@ class MeteredSocketTest(unittest.TestCase):
self.assertEqual(150, socket.recv_bytes)
self.assertEqual(5, mock.recv.call_count)
def test_recv_multiple_chunk(self):
def test_meteredsocket_recv_multiple_chunk(self):
wrapped = MockSock(chunk=20)
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
actual_recv = b""
@@ -534,7 +85,7 @@ class MeteredSocketTest(unittest.TestCase):
self.assertEqual(110, socket.recv_bytes)
self.assertEqual(6, mock.recv.call_count)
def test_recv_under_size(self):
def test_meteredsocket_recv_under_size(self):
wrapped = MockSock(chunk=257) # largish odd number
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
with MeteredSocket(mock) as socket:
@@ -544,7 +95,7 @@ class MeteredSocketTest(unittest.TestCase):
self.assertEqual(257, socket.recv_bytes)
self.assertEqual(1, mock.recv.call_count)
def test_send_once(self):
def test_meteredsocket_send_once(self):
wrapped = MockSock(chunk=257) # largish odd number
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
with MeteredSocket(mock) as socket:
@@ -555,7 +106,7 @@ class MeteredSocketTest(unittest.TestCase):
self.assertEqual(257, socket.send_bytes)
self.assertEqual(1, mock.send.call_count)
def test_send_multiple(self):
def test_meteredsocket_send_multiple(self):
wrapped = MockSock()
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
send_len = 0
@@ -571,7 +122,7 @@ class MeteredSocketTest(unittest.TestCase):
self.assertEqual(39, socket.send_bytes)
self.assertEqual(4, mock.send.call_count)
def test_send_under_size(self):
def test_meteredsocket_send_under_size(self):
wrapped = MockSock(chunk=257) # largish odd number
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
with MeteredSocket(mock) as socket:
@@ -582,7 +133,7 @@ class MeteredSocketTest(unittest.TestCase):
self.assertEqual(123, socket.send_bytes)
self.assertEqual(1, mock.send.call_count)
def test_bufsize_required(self):
def test_meteredsocket_bufsize_required(self):
mock = NonCallableMagicMock(wraps=MockSock(), autospec=True)
with self.assertRaisesRegex(TypeError, "argument"):
with MeteredSocket(mock) as socket:
@@ -598,7 +149,7 @@ class MeteredSocketTest(unittest.TestCase):
or call(None, ANY) in mock.recv.mock_calls
)
def test_flags_support(self):
def test_meteredsocket_flags_support(self):
mock = NonCallableMagicMock(wraps=MockSock(), autospec=True)
with MeteredSocket(mock) as socket:
self.assertEqual(len(ZEN), socket.send(ZEN, 42))
@@ -626,7 +177,7 @@ class MeteredSocketTest(unittest.TestCase):
with self.assertRaisesRegex(TypeError, "integer is required"):
socket.recv(b"data", b"flags")
def test_stats_read_only(self):
def test_meteredsocket_stats_read_only(self):
mock = NonCallableMagicMock(wraps=MockSock(), autospec=True)
with MeteredSocket(mock) as socket:
self.assertEqual(0, socket.send_ops)
@@ -655,11 +206,9 @@ class MeteredSocketTest(unittest.TestCase):
self.assertEqual(282, socket.send_bytes)
self.assertEqual(258, socket.recv_ops)
self.assertEqual(259, socket.recv_bytes)
@patch("paasio.super", create=True, new_callable=SuperMock)
class MeteredFileTest(unittest.TestCase):
def test_context_manager(self, super_mock):
"""
@patch("paasio.super", create=True, new_callable=SuperMock)
def test_meteredfile_context_manager(self, super_mock):
wrapped = MockFile(ZEN)
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
mock.__exit__.side_effect = wrapped.__exit__
@@ -680,7 +229,8 @@ class MeteredFileTest(unittest.TestCase):
):
file.write(b"data")
def test_context_manager_exception_raise(self, super_mock):
@patch("paasio.super", create=True, new_callable=SuperMock)
def test_meteredfile_context_manager_exception_raise(self, super_mock):
exception = MockException("Should raise")
wrapped = MockFile(ZEN, exception=exception)
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
@@ -696,7 +246,8 @@ class MeteredFileTest(unittest.TestCase):
)
self.assertEqual(exception, err.exception)
def test_context_manager_exception_suppress(self, super_mock):
@patch("paasio.super", create=True, new_callable=SuperMock)
def test_meteredfile_context_manager_exception_suppress(self, super_mock):
exception = MockException("Should suppress")
wrapped = MockFile(ZEN, exception=exception)
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
@@ -710,7 +261,8 @@ class MeteredFileTest(unittest.TestCase):
MockException, exception, ANY,
)
def test_iteration(self, super_mock):
@patch("paasio.super", create=True, new_callable=SuperMock)
def test_meteredfile_iteration(self, super_mock):
mock = NonCallableMagicMock(wraps=MockFile(ZEN), autospec=True)
super_mock.mock_object = mock
actual_reads = b""
@@ -728,7 +280,8 @@ class MeteredFileTest(unittest.TestCase):
self.assertEqual(len(ZEN), file.read_bytes)
self.assertEqual(ZEN, actual_reads)
def test_read_once(self, super_mock):
@patch("paasio.super", create=True, new_callable=SuperMock)
def test_meteredfile_read_once(self, super_mock):
mock = NonCallableMagicMock(wraps=MockFile(ZEN), autospec=True)
super_mock.mock_object = mock
with MeteredFile() as file:
@@ -756,7 +309,8 @@ class MeteredFileTest(unittest.TestCase):
self.assertEqual(1, file.read_ops)
self.assertEqual(mock.read.call_count, file.read_ops)
def test_read_multiple(self, super_mock):
@patch("paasio.super", create=True, new_callable=SuperMock)
def test_meteredfile_read_multiple(self, super_mock):
wrapped = MockFile(ZEN)
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
super_mock.mock_object = mock
@@ -769,7 +323,8 @@ class MeteredFileTest(unittest.TestCase):
self.assertEqual(150, file.read_bytes)
self.assertEqual(5, mock.read.call_count)
def test_read_multiple_chunk(self, super_mock):
@patch("paasio.super", create=True, new_callable=SuperMock)
def test_meteredfile_read_multiple_chunk(self, super_mock):
wrapped = MockFile(ZEN, chunk=20)
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
super_mock.mock_object = mock
@@ -796,7 +351,8 @@ class MeteredFileTest(unittest.TestCase):
self.assertEqual(73, file.read_bytes)
self.assertEqual(7, mock.read.call_count)
def test_read_under_size(self, super_mock):
@patch("paasio.super", create=True, new_callable=SuperMock)
def test_meteredfile_read_under_size(self, super_mock):
wrapped = MockFile(ZEN, chunk=257) # largish odd number
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
super_mock.mock_object = mock
@@ -807,7 +363,8 @@ class MeteredFileTest(unittest.TestCase):
self.assertEqual(257, file.read_bytes)
self.assertEqual(1, mock.read.call_count)
def test_write_once(self, super_mock):
@patch("paasio.super", create=True, new_callable=SuperMock)
def test_meteredfile_write_once(self, super_mock):
wrapped = MockFile(chunk=257) # largish odd number
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
super_mock.mock_object = mock
@@ -819,7 +376,8 @@ class MeteredFileTest(unittest.TestCase):
self.assertEqual(257, file.write_bytes)
self.assertEqual(1, mock.write.call_count)
def test_write_multiple(self, super_mock):
@patch("paasio.super", create=True, new_callable=SuperMock)
def test_meteredfile_write_multiple(self, super_mock):
wrapped = MockFile()
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
super_mock.mock_object = mock
@@ -836,7 +394,8 @@ class MeteredFileTest(unittest.TestCase):
self.assertEqual(39, file.write_bytes)
self.assertEqual(4, mock.write.call_count)
def test_write_under_size(self, super_mock):
@patch("paasio.super", create=True, new_callable=SuperMock)
def test_meteredfile_write_under_size(self, super_mock):
wrapped = MockFile(chunk=257) # largish odd number
mock = NonCallableMagicMock(wraps=wrapped, autospec=True)
super_mock.mock_object = mock
@@ -848,7 +407,8 @@ class MeteredFileTest(unittest.TestCase):
self.assertEqual(123, file.write_bytes)
self.assertEqual(1, mock.write.call_count)
def test_stats_read_only(self, super_mock):
@patch("paasio.super", create=True, new_callable=SuperMock)
def test_meteredfile_stats_read_only(self, super_mock):
mock = NonCallableMagicMock(wraps=MockFile(ZEN), autospec=True)
super_mock.mock_object = mock
with MeteredFile() as file:
@@ -878,7 +438,7 @@ class MeteredFileTest(unittest.TestCase):
self.assertEqual(82, file.write_bytes)
self.assertEqual(58, file.read_ops)
self.assertEqual(59, file.read_bytes)
"""
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,146 @@
import errno
import inspect
import io
import os
ZEN = b"""Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one-- and preferably only one --obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better than *right* now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea -- let's do more of those!
"""
class MockException(Exception):
pass
class MockFile(io.BytesIO):
def __init__(self, *args, chunk=None, exception=None, **kwargs):
super(MockFile, self).__init__(*args, **kwargs)
self.__chunk = chunk
self.__exception = exception
def __exit__(self, exc_type, exc_val, exc_tb):
ret = super(MockFile, self).__exit__(exc_type, exc_val, exc_tb)
if exc_type is not None and "suppress" in exc_val.args[0]:
return True
return ret
def read(self, size=-1):
if self.__exception is not None:
raise self.__exception
if self.__chunk is None:
return super(MockFile, self).read(size)
if size is None:
return super(MockFile, self).read(self.__chunk)
if size < 0:
return super(MockFile, self).read(self.__chunk)
return super(MockFile, self).read(min(self.__chunk, size))
def write(self, data):
if self.__chunk is None:
return super(MockFile, self).write(data)
return super(MockFile, self).write(data[: self.__chunk])
class MockSock:
def __init__(self, *, chunk=None, exception=None):
self._recver = io.BytesIO(ZEN)
self._sender = io.BytesIO()
self.__closed = False
self.__chunk = chunk
self.__exception = exception
self.flags = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._recver.close()
self._sender.close()
self.__closed = True
if exc_type is not None and "suppress" in exc_val.args[0]:
return True
return False
def recv(self, bufsize, flags=0):
if self.__closed:
raise OSError(errno.EBADF, os.strerror(errno.EBADF))
if bufsize is None:
raise TypeError(
"'NoneType' object cannot be interpreted as an integer"
)
if not isinstance(flags, int):
raise TypeError(
"an integer is required (got type {})".format(
type(flags).__name__
)
)
self.flags = flags
if self.__exception is not None:
raise self.__exception
if self.__chunk is None:
return self._recver.read(bufsize)
else:
return self._recver.read(min(self.__chunk, bufsize))
def send(self, data, flags=0):
if self.__closed:
raise OSError(errno.EBADF, os.strerror(errno.EBADF))
if not isinstance(flags, int):
raise TypeError(
"an integer is required (got type {})".format(
type(flags).__name__
)
)
self.flags = flags
if self.__chunk is None:
return self._sender.write(data)
return self._sender.write(data[: self.__chunk])
class SuperMock:
"""Mock for super().__init__ calls only, as mock.MagicMock cannot."""
def __init__(self, *args, **kwargs):
if self.initialized:
self.init_called += 1
else:
self.initialized = True
def __call__(self, *args, **kwargs):
frame = inspect.currentframe()
if frame is None:
raise RuntimeError("Could not get current frame object")
stack = inspect.getouterframes(frame)
if any(
frame[3] == "__init__" and "paasio" in frame[1] for frame in stack
):
return self
else:
return self.mock_object
def __repr__(self):
return "<SuperMock at {} with mock object: {!r}>".format(
hex(id(self)), self.mock_object
)
mock_object = None
init_called = 0
initialized = False