add: tester: valgrind test

This commit is contained in:
2025-12-17 06:56:48 -06:00
parent f45128e380
commit cefd7e7c1f
5 changed files with 36 additions and 10 deletions

View File

@@ -4,11 +4,12 @@ import os
import signal
import thread_manager
from print import print_test
def _cmp_header(expected: str, value: str):
pass
def _test(title: str, excutable: str, args: str, exit_code: int, delay: float, log_path: str) -> None:
def _test(valgrind: bool, title: str, excutable: str, args: str, exit_code: int, delay: float, log_path: str) -> None:
log_path = f"{log_path}/{title}"
if not os.path.exists(log_path):
os.makedirs(log_path)
@@ -24,11 +25,23 @@ def _test(title: str, excutable: str, args: str, exit_code: int, delay: float, l
your_file.close()
normal_file.close()
def test_excution(excutable: str, tests_category: dict[str, list[dict[str, str]]], log_path: str):
if valgrind == False:
return
f = open(f"{log_path}/valgrind.log", "w")
p = subprocess.Popen(f"valgrind --leak-check=full --show-leak-kinds=all --errors-for-leak-kinds=all --error-exitcode=79 {excutable} {args}".split(" "), stdout=f, stderr=f)
time.sleep(1)
p.send_signal(signal.SIGINT)
value = p.wait()
f.close()
print_test(exit_code, value, f"{title}_VALGRIND", args)
def test_excution(valgrind: bool, excutable: str, tests_category: dict[str, list[dict[str, str]]], log_path: str):
for category_name, tests in tests_category.items():
print(category_name)
for test in tests:
_test(test["title"], excutable, test['args'], test.get("exit", 0), test.get("delay", 1), f"{log_path}/{category_name}")
thread_manager.add_to_queu(_test, (test["title"], excutable, test['args'], test.get("exit", 0), test.get("delay", 1), f"{log_path}/{category_name}"))
#thread_manager.add_to_queu(
_test(valgrind, test["title"], excutable, test['args'], test.get("exit", 0), test.get("delay", 1), f"{log_path}/{category_name}")
#)
thread_manager.wait_pool()
print("\n" * 1)

View File

@@ -6,7 +6,7 @@ import time
import thread_manager
from print import print_test
def _test(title: str, excutable: str, args: str, exit_code: int, log_path: str) -> None:
def _test(valgrind: bool, title: str, excutable: str, args: str, exit_code: int, log_path: str) -> None:
if not os.path.exists(log_path):
os.makedirs(log_path)
f = open(f"{log_path}/{title}.log", "w")
@@ -17,10 +17,21 @@ def _test(title: str, excutable: str, args: str, exit_code: int, log_path: str)
f.close()
print_test(exit_code, value, title, args)
def test_parsing(excutable: str, tests_category: dict[str, list[dict[str, str]]], log_path: str):
if valgrind == False:
return
f = open(f"{log_path}/{title}_valgrind.log", "w")
p = subprocess.Popen(f"valgrind --leak-check=full --show-leak-kinds=all --errors-for-leak-kinds=all --error-exitcode=79 {excutable} {args}".split(" "), stdout=f, stderr=f)
time.sleep(1)
p.send_signal(signal.SIGINT)
value = p.wait()
f.close()
print_test(exit_code, value, f"{title}_VALGRIND", args)
def test_parsing(valgrind: bool, excutable: str, tests_category: dict[str, list[dict[str, str]]], log_path: str):
for category_name, tests in tests_category.items():
print(category_name)
for test in tests:
thread_manager.add_to_queu(_test, (test["title"], excutable, test['args'], test['exit'], f"{log_path}/{category_name}"))
thread_manager.add_to_queu(_test, (valgrind, test["title"], excutable, test['args'], test['exit'], f"{log_path}/{category_name}"))
thread_manager.wait_pool()
print("\n" * 1)

View File

@@ -9,6 +9,7 @@ import parsing
import execution
excutable: str = sys.argv[1]
valgrind: bool = bool(sys.argv[2])
with open('test/tests.json', 'r') as file:
tests: dict[str, dict] = json.load(file)
@@ -19,5 +20,5 @@ shutil.rmtree("log")
for category_name, tests in tests.items():
print(category_name, end="\n" * 2)
tester[category_name](excutable, tests, f"log/{category_name}")
tester[category_name](valgrind, excutable, tests, f"log/{category_name}")
thread_manager.wait_pool()

View File

@@ -90,7 +90,8 @@
},
{
"title": "invalid",
"args": "domain.invalid"
"args": "domain.invalid",
"exit": 1
}
],
"payload_size": [