Revision dbd56c149072e656ca8d6a43a59588f3e7513da2 authored by Hubert Kario on 29 September 2021, 13:05:34 UTC, committed by GitHub on 29 September 2021, 13:05:34 UTC
ExpectAlert - add __repr__
test_tlsfuzzer_timing_runner.py
# Author: Jan Koscielniak, (c) 2020
# Released under Gnu GPL v2.0, see LICENSE file for details
try:
import unittest2 as unittest
except ImportError:
import unittest
try:
import mock
except ImportError:
import unittest.mock as mock
import io
import subprocess
from sys import version_info
from tlsfuzzer.timing_runner import TimingRunner
from tlsfuzzer.utils.statics import WARM_UP
def is_33():
return (3, 3) == version_info[:2]
class TestRunner(unittest.TestCase):
def setUp(self):
with mock.patch('tlsfuzzer.timing_runner.os.mkdir'):
with mock.patch('tlsfuzzer.timing_runner.TimingRunner.check_tcpdump'):
self.runner = TimingRunner("test", [], "/outdir", "localhost", 4433, "lo")
@staticmethod
def _mock_open(*args, **kwargs):
"""Fix mock not supporting iterators in all Python versions."""
mock_open = mock.mock_open(*args, **kwargs)
mock_open.return_value.__iter__ = lambda s: iter(s.readline, '')
return mock_open
def test_init_without_tcpdump(self):
mock_check = mock.Mock()
mock_check.return_value = False
with mock.patch('tlsfuzzer.timing_runner.os.mkdir'):
with mock.patch('tlsfuzzer.timing_runner.TimingRunner.check_tcpdump', mock_check):
self.assertRaises(Exception, TimingRunner, "test", [], "/outdir", "localhost", 4433, "lo")
def test_check_tcpdump(self):
mock_call = mock.Mock(spec=subprocess.check_call)
def raise_error(*args, **kwargs):
raise subprocess.CalledProcessError(1, "")
with mock.patch("tlsfuzzer.timing_runner.subprocess.check_call", mock_call) as mock_c:
ret = TimingRunner.check_tcpdump()
mock_c.assert_called_once()
self.assertTrue(ret)
mock_c.side_effect = raise_error
ret = TimingRunner.check_tcpdump()
self.assertFalse(ret)
def test_sniff(self):
self.runner.tcpdump_running = False
mock_popen = mock.Mock(spec=subprocess.Popen)
mock_popen.return_value.stderr = io.BytesIO(b"listening\n")
with mock.patch("tlsfuzzer.timing_runner.subprocess.Popen", mock_popen):
self.runner.sniff()
self.assertTrue(self.runner.tcpdump_running)
mock_popen.return_value.stderr = io.BytesIO(b"\n")
self.assertRaises(SystemExit, self.runner.sniff)
self.assertFalse(self.runner.tcpdump_running)
def test_tcpdump_status(self):
mock_popen = mock.Mock(spec=subprocess.Popen)
mock_popen.communicate.return_value = ("", b"Error")
mock_popen.returncode = 1
self.runner.tcpdump_running = True
self.runner.tcpdump_status(mock_popen)
self.assertFalse(self.runner.tcpdump_running)
self.runner.tcpdump_running = False
self.runner.tcpdump_status(mock_popen)
self.assertFalse(self.runner.tcpdump_running)
def test_generate_log_sanity(self):
self.runner.tests = [("sanity", None), ("regular", None)]
with mock.patch('__main__.__builtins__.open', mock.mock_open()) as mock_file:
self.runner.generate_log(set(), set(), 10)
self.assertEqual(self.runner.log.classes, ["regular"])
self.assertEqual(self.runner.tests, {"regular": None})
self.assertEqual(mock_file.return_value.write.call_count, 11)
def test_generate_log_exclusion(self):
self.runner.tests = [("regular", None), ("exclude", None)]
with mock.patch('__main__.__builtins__.open', mock.mock_open()) as mock_file:
self.runner.generate_log(set(), set(["exclude"]), 10)
self.assertEqual(self.runner.log.classes, ["regular"])
self.assertEqual(mock_file.return_value.write.call_count, 11)
def test_generate_log_run_only(self):
self.runner.tests = [("regular", None), ("exclude", None)]
with mock.patch('__main__.__builtins__.open', mock.mock_open()) as mock_file:
self.runner.generate_log(set(["regular"]), set(), 10)
self.assertEqual(self.runner.log.classes, ["regular"])
self.assertEqual(mock_file.return_value.write.call_count, 11)
def test_create_dir(self):
with mock.patch('tlsfuzzer.timing_runner.os.mkdir') as mock_mkdir:
TimingRunner("test", [], "/outdir", "localhost", 4433, "lo")
mock_mkdir.assert_called_once()
def test_check_extraction_availability(self):
extraction_present = True
try:
from tlsfuzzer.extract import Extract
except ImportError:
extraction_present = False
self.assertEqual(TimingRunner.check_extraction_availability(), extraction_present)
def test_check_analysis_availability(self):
analysis_present = True
try:
from tlsfuzzer.analysis import Analysis
except ImportError:
analysis_present = False
self.assertEqual(TimingRunner.check_analysis_availability(), analysis_present)
def test_extract(self):
check_extract = mock.Mock()
check_extract.return_value = False
with mock.patch("tlsfuzzer.timing_runner.TimingRunner.check_extraction_availability", check_extract):
self.assertFalse(self.runner.extract())
self.runner.log = mock.Mock(autospec=True)
with mock.patch("__main__.__builtins__.__import__"):
self.assertTrue(self.runner.extract())
def test_analyse(self):
check_analysis = mock.Mock()
check_analysis.return_value = False
with mock.patch("tlsfuzzer.timing_runner.TimingRunner.check_analysis_availability", check_analysis):
self.assertEqual(self.runner.analyse(), 2)
self.runner.log = mock.Mock(autospec=True)
with mock.patch("__main__.__builtins__.__import__"):
self.assertNotEqual(self.runner.analyse(), 2)
def test_run(self):
self.runner.tests = {"A": None, "B": None, "C": None}
self.runner.tcpdump_running = True
analyse = mock.Mock()
analyse.return_value = 1
with mock.patch('__main__.__builtins__.open',
self._mock_open(read_data="A,B,C\r\n0,2,1\r\n2,0,1\r\n2,1,0\r\n")):
with mock.patch('tlsfuzzer.timing_runner.TimingRunner.sniff'):
with mock.patch('tlsfuzzer.timing_runner.TimingRunner.extract') as extract:
with mock.patch('tlsfuzzer.timing_runner.TimingRunner.analyse', analyse):
with mock.patch('tlsfuzzer.timing_runner.Thread'):
with mock.patch('tlsfuzzer.timing_runner.time.sleep'):
with mock.patch('tlsfuzzer.timing_runner.Runner') as runner:
ret = self.runner.run()
self.assertEqual(runner.call_count, WARM_UP + 9)
extract.assert_called_once()
analyse.assert_called_once()
self.assertEqual(ret, 1)
def test_run_no_extraction(self):
self.runner.tests = {"A": None, "B": None, "C": None}
self.runner.tcpdump_running = True
extract = mock.Mock()
extract.return_value = False
with mock.patch('__main__.__builtins__.open',
self._mock_open(read_data="A,B,C\r\n0,2,1\r\n2,0,1\r\n2,1,0\r\n")):
with mock.patch('tlsfuzzer.timing_runner.TimingRunner.sniff'):
with mock.patch('tlsfuzzer.timing_runner.TimingRunner.extract', extract):
with mock.patch('tlsfuzzer.timing_runner.TimingRunner.analyse') as analyse:
with mock.patch('tlsfuzzer.timing_runner.Thread'):
with mock.patch('tlsfuzzer.timing_runner.time.sleep'):
with mock.patch('tlsfuzzer.timing_runner.Runner') as runner:
ret = self.runner.run()
self.assertEqual(runner.call_count, WARM_UP + 9)
extract.assert_called_once()
self.assertEqual(analyse.call_count, 0)
self.assertEqual(ret, 2)
def test_run_tcpdump_failure(self):
self.runner.tests = {"A": None, "B": None, "C": None}
with mock.patch('__main__.__builtins__.open',
self._mock_open(read_data="A,B,C\r\n0,2,1\r\n2,0,1\r\n2,1,0\r\n")):
with mock.patch('tlsfuzzer.timing_runner.TimingRunner.sniff'):
with mock.patch('tlsfuzzer.timing_runner.TimingRunner.extract'):
with mock.patch('tlsfuzzer.timing_runner.TimingRunner.analyse'):
with mock.patch('tlsfuzzer.timing_runner.Thread'):
with mock.patch('tlsfuzzer.timing_runner.time.sleep'):
with mock.patch('tlsfuzzer.timing_runner.Runner') as runner:
self.runner.tcpdump_running = False
self.assertRaises(SystemExit, self.runner.run)
self.assertEqual(runner.call_count, 0)
@unittest.skipIf(is_33(), reason="Skipping because of a bug in unittest")
def test_run_test_failure(self):
self.runner.tests = {"A": None, "B": None, "C": None}
self.runner.tcpdump_running = True
def raise_error(*args, **kwargs):
raise Exception()
with mock.patch('__main__.__builtins__.open',
self._mock_open(read_data="A,B,C\r\n0,2,1\r\n2,0,1\r\n2,1,0\r\n")):
with mock.patch('tlsfuzzer.timing_runner.TimingRunner.sniff'):
with mock.patch('tlsfuzzer.timing_runner.TimingRunner.analyse'):
with mock.patch('tlsfuzzer.timing_runner.TimingRunner.extract'):
with mock.patch('tlsfuzzer.timing_runner.Thread'):
with mock.patch('tlsfuzzer.timing_runner.time.sleep'):
with mock.patch('tlsfuzzer.timing_runner.Runner') as runner:
runner.return_value.run.side_effect = raise_error
self.assertRaises(AssertionError, self.runner.run)
def test__format_seconds_with_seconds(self):
self.assertEqual(TimingRunner._format_seconds(12.5), "12.50s")
def test__format_seconds_with_minutes(self):
self.assertEqual(TimingRunner._format_seconds(60*35), "35m 0.00s")
def test__format_seconds_with_hours(self):
self.assertEqual(TimingRunner._format_seconds(60*60), "1h 0m 0.00s")
def test__format_seconds_with_days(self):
self.assertEqual(TimingRunner._format_seconds(24*60*60*2),
"2d 0h 0m 0.00s")

Computing file changes ...