2019-09-23 02:02:43 -07:00
# SPDX-License-Identifier: GPL-2.0
#
# Parses test results from a kernel dmesg log.
#
# Copyright (C) 2019, Google LLC.
# Author: Felix Guo <felixguoxiuping@gmail.com>
# Author: Brendan Higgins <brendanhiggins@google.com>
import re
from collections import namedtuple
from datetime import datetime
from enum import Enum , auto
from functools import reduce
2021-01-14 16:39:12 -08:00
from typing import Iterable , Iterator , List , Optional , Tuple
2019-09-23 02:02:43 -07:00
TestResult = namedtuple ( ' TestResult ' , [ ' status ' , ' suites ' , ' log ' ] )
class TestSuite ( object ) :
2021-01-14 16:39:11 -08:00
def __init__ ( self ) - > None :
2021-01-14 16:39:12 -08:00
self . status = TestStatus . SUCCESS
2021-01-14 16:39:11 -08:00
self . name = ' '
self . cases = [ ] # type: List[TestCase]
2019-09-23 02:02:43 -07:00
2021-01-14 16:39:11 -08:00
def __str__ ( self ) - > str :
return ' TestSuite( ' + str ( self . status ) + ' , ' + self . name + ' , ' + str ( self . cases ) + ' ) '
2019-09-23 02:02:43 -07:00
2021-01-14 16:39:11 -08:00
def __repr__ ( self ) - > str :
2019-09-23 02:02:43 -07:00
return str ( self )
class TestCase ( object ) :
2021-01-14 16:39:11 -08:00
def __init__ ( self ) - > None :
2021-01-14 16:39:12 -08:00
self . status = TestStatus . SUCCESS
2019-09-23 02:02:43 -07:00
self . name = ' '
2021-01-14 16:39:11 -08:00
self . log = [ ] # type: List[str]
2019-09-23 02:02:43 -07:00
2021-01-14 16:39:11 -08:00
def __str__ ( self ) - > str :
return ' TestCase( ' + str ( self . status ) + ' , ' + self . name + ' , ' + str ( self . log ) + ' ) '
2019-09-23 02:02:43 -07:00
2021-01-14 16:39:11 -08:00
def __repr__ ( self ) - > str :
2019-09-23 02:02:43 -07:00
return str ( self )
class TestStatus ( Enum ) :
SUCCESS = auto ( )
FAILURE = auto ( )
TEST_CRASHED = auto ( )
NO_TESTS = auto ( )
2020-08-04 13:47:44 -07:00
FAILURE_TO_PARSE_TESTS = auto ( )
2019-09-23 02:02:43 -07:00
2020-03-16 13:21:24 -07:00
kunit_start_re = re . compile ( r ' TAP version [0-9]+$ ' )
kunit_end_re = re . compile ( ' (List of all partitions:| '
2020-08-04 13:47:44 -07:00
' Kernel panic - not syncing: VFS:) ' )
2019-09-23 02:02:43 -07:00
2021-01-14 16:39:11 -08:00
def isolate_kunit_output ( kernel_output ) - > Iterator [ str ] :
2019-09-23 02:02:43 -07:00
started = False
for line in kernel_output :
2020-10-30 15:38:53 -07:00
line = line . rstrip ( ) # line always has a trailing \n
2020-03-16 13:21:24 -07:00
if kunit_start_re . search ( line ) :
prefix_len = len ( line . split ( ' TAP version ' ) [ 0 ] )
2019-09-23 02:02:43 -07:00
started = True
2020-03-16 13:21:24 -07:00
yield line [ prefix_len : ] if prefix_len > 0 else line
elif kunit_end_re . search ( line ) :
2019-09-23 02:02:43 -07:00
break
elif started :
2020-03-16 13:21:24 -07:00
yield line [ prefix_len : ] if prefix_len > 0 else line
2019-09-23 02:02:43 -07:00
2021-01-14 16:39:11 -08:00
def raw_output ( kernel_output ) - > None :
2019-09-23 02:02:43 -07:00
for line in kernel_output :
2020-10-30 15:38:53 -07:00
print ( line . rstrip ( ) )
2019-09-23 02:02:43 -07:00
DIVIDER = ' = ' * 60
RESET = ' \033 [0;0m '
2021-01-14 16:39:11 -08:00
def red ( text ) - > str :
2019-09-23 02:02:43 -07:00
return ' \033 [1;31m ' + text + RESET
2021-01-14 16:39:11 -08:00
def yellow ( text ) - > str :
2019-09-23 02:02:43 -07:00
return ' \033 [1;33m ' + text + RESET
2021-01-14 16:39:11 -08:00
def green ( text ) - > str :
2019-09-23 02:02:43 -07:00
return ' \033 [1;32m ' + text + RESET
2021-01-14 16:39:11 -08:00
def print_with_timestamp ( message ) - > None :
2019-09-23 02:02:43 -07:00
print ( ' [ %s ] %s ' % ( datetime . now ( ) . strftime ( ' % H: % M: % S ' ) , message ) )
2021-01-14 16:39:11 -08:00
def format_suite_divider ( message ) - > str :
2019-09-23 02:02:43 -07:00
return ' ======== ' + message + ' ======== '
2021-01-14 16:39:11 -08:00
def print_suite_divider ( message ) - > None :
2019-09-23 02:02:43 -07:00
print_with_timestamp ( DIVIDER )
print_with_timestamp ( format_suite_divider ( message ) )
2021-01-14 16:39:11 -08:00
def print_log ( log ) - > None :
2019-09-23 02:02:43 -07:00
for m in log :
print_with_timestamp ( m )
2020-03-26 14:25:09 +00:00
TAP_ENTRIES = re . compile ( r ' ^(TAP|[ \ s]*ok|[ \ s]*not ok|[ \ s]*[0-9]+ \ . \ .[0-9]+|[ \ s]*#).*$ ' )
2019-09-23 02:02:43 -07:00
2020-12-11 14:32:32 -08:00
def consume_non_diagnostic ( lines : List [ str ] ) - > None :
2019-09-23 02:02:43 -07:00
while lines and not TAP_ENTRIES . match ( lines [ 0 ] ) :
lines . pop ( 0 )
2020-12-11 14:32:32 -08:00
def save_non_diagnostic ( lines : List [ str ] , test_case : TestCase ) - > None :
2019-09-23 02:02:43 -07:00
while lines and not TAP_ENTRIES . match ( lines [ 0 ] ) :
test_case . log . append ( lines [ 0 ] )
lines . pop ( 0 )
OkNotOkResult = namedtuple ( ' OkNotOkResult ' , [ ' is_ok ' , ' description ' , ' text ' ] )
2020-03-26 14:25:09 +00:00
OK_NOT_OK_SUBTEST = re . compile ( r ' ^[ \ s]+(ok|not ok) [0-9]+ - (.*)$ ' )
2019-09-23 02:02:43 -07:00
2020-08-04 13:47:44 -07:00
OK_NOT_OK_MODULE = re . compile ( r ' ^(ok|not ok) ([0-9]+) - (.*)$ ' )
2019-09-23 02:02:43 -07:00
2020-03-16 13:21:24 -07:00
def parse_ok_not_ok_test_case ( lines : List [ str ] , test_case : TestCase ) - > bool :
2020-12-11 14:32:32 -08:00
save_non_diagnostic ( lines , test_case )
2019-09-23 02:02:43 -07:00
if not lines :
2020-03-16 13:21:24 -07:00
test_case . status = TestStatus . TEST_CRASHED
return True
2019-09-23 02:02:43 -07:00
line = lines [ 0 ]
match = OK_NOT_OK_SUBTEST . match ( line )
2020-03-16 13:21:24 -07:00
while not match and lines :
line = lines . pop ( 0 )
match = OK_NOT_OK_SUBTEST . match ( line )
2019-09-23 02:02:43 -07:00
if match :
test_case . log . append ( lines . pop ( 0 ) )
test_case . name = match . group ( 2 )
if test_case . status == TestStatus . TEST_CRASHED :
return True
if match . group ( 1 ) == ' ok ' :
test_case . status = TestStatus . SUCCESS
else :
test_case . status = TestStatus . FAILURE
return True
else :
return False
2020-11-09 23:29:36 -08:00
SUBTEST_DIAGNOSTIC = re . compile ( r ' ^[ \ s]+# (.*)$ ' )
DIAGNOSTIC_CRASH_MESSAGE = re . compile ( r ' ^[ \ s]+# .*?: kunit test case crashed!$ ' )
2019-09-23 02:02:43 -07:00
def parse_diagnostic ( lines : List [ str ] , test_case : TestCase ) - > bool :
2020-12-11 14:32:32 -08:00
save_non_diagnostic ( lines , test_case )
2019-09-23 02:02:43 -07:00
if not lines :
return False
line = lines [ 0 ]
match = SUBTEST_DIAGNOSTIC . match ( line )
if match :
test_case . log . append ( lines . pop ( 0 ) )
2020-11-09 23:29:36 -08:00
crash_match = DIAGNOSTIC_CRASH_MESSAGE . match ( line )
if crash_match :
2019-09-23 02:02:43 -07:00
test_case . status = TestStatus . TEST_CRASHED
return True
else :
return False
2020-10-21 15:07:52 -07:00
def parse_test_case ( lines : List [ str ] ) - > Optional [ TestCase ] :
2019-09-23 02:02:43 -07:00
test_case = TestCase ( )
2020-12-11 14:32:32 -08:00
save_non_diagnostic ( lines , test_case )
2019-09-23 02:02:43 -07:00
while parse_diagnostic ( lines , test_case ) :
pass
2020-03-16 13:21:24 -07:00
if parse_ok_not_ok_test_case ( lines , test_case ) :
2019-09-23 02:02:43 -07:00
return test_case
else :
return None
2020-03-26 14:25:09 +00:00
SUBTEST_HEADER = re . compile ( r ' ^[ \ s]+# Subtest: (.*)$ ' )
2019-09-23 02:02:43 -07:00
2020-10-21 15:07:52 -07:00
def parse_subtest_header ( lines : List [ str ] ) - > Optional [ str ] :
2020-12-11 14:32:32 -08:00
consume_non_diagnostic ( lines )
2019-09-23 02:02:43 -07:00
if not lines :
return None
match = SUBTEST_HEADER . match ( lines [ 0 ] )
if match :
lines . pop ( 0 )
return match . group ( 1 )
else :
return None
2020-03-26 14:25:09 +00:00
SUBTEST_PLAN = re . compile ( r ' [ \ s]+[0-9]+ \ . \ .([0-9]+) ' )
2019-09-23 02:02:43 -07:00
2020-10-21 15:07:52 -07:00
def parse_subtest_plan ( lines : List [ str ] ) - > Optional [ int ] :
2020-12-11 14:32:32 -08:00
consume_non_diagnostic ( lines )
2019-09-23 02:02:43 -07:00
match = SUBTEST_PLAN . match ( lines [ 0 ] )
if match :
lines . pop ( 0 )
return int ( match . group ( 1 ) )
else :
return None
def max_status ( left : TestStatus , right : TestStatus ) - > TestStatus :
if left == TestStatus . TEST_CRASHED or right == TestStatus . TEST_CRASHED :
return TestStatus . TEST_CRASHED
elif left == TestStatus . FAILURE or right == TestStatus . FAILURE :
return TestStatus . FAILURE
elif left != TestStatus . SUCCESS :
return left
elif right != TestStatus . SUCCESS :
return right
else :
return TestStatus . SUCCESS
2020-08-04 13:47:44 -07:00
def parse_ok_not_ok_test_suite ( lines : List [ str ] ,
test_suite : TestSuite ,
expected_suite_index : int ) - > bool :
2020-12-11 14:32:32 -08:00
consume_non_diagnostic ( lines )
2019-09-23 02:02:43 -07:00
if not lines :
test_suite . status = TestStatus . TEST_CRASHED
return False
line = lines [ 0 ]
match = OK_NOT_OK_MODULE . match ( line )
if match :
lines . pop ( 0 )
if match . group ( 1 ) == ' ok ' :
test_suite . status = TestStatus . SUCCESS
else :
test_suite . status = TestStatus . FAILURE
2020-08-04 13:47:44 -07:00
suite_index = int ( match . group ( 2 ) )
if suite_index != expected_suite_index :
print_with_timestamp (
red ( ' [ERROR] ' ) + ' expected_suite_index ' +
str ( expected_suite_index ) + ' , but got ' +
str ( suite_index ) )
2019-09-23 02:02:43 -07:00
return True
else :
return False
2021-01-14 16:39:12 -08:00
def bubble_up_errors ( statuses : Iterable [ TestStatus ] ) - > TestStatus :
return reduce ( max_status , statuses , TestStatus . SUCCESS )
2019-09-23 02:02:43 -07:00
def bubble_up_test_case_errors ( test_suite : TestSuite ) - > TestStatus :
2021-01-14 16:39:12 -08:00
max_test_case_status = bubble_up_errors ( x . status for x in test_suite . cases )
2019-09-23 02:02:43 -07:00
return max_status ( max_test_case_status , test_suite . status )
2020-10-21 15:07:52 -07:00
def parse_test_suite ( lines : List [ str ] , expected_suite_index : int ) - > Optional [ TestSuite ] :
2019-09-23 02:02:43 -07:00
if not lines :
return None
2020-12-11 14:32:32 -08:00
consume_non_diagnostic ( lines )
2019-09-23 02:02:43 -07:00
test_suite = TestSuite ( )
test_suite . status = TestStatus . SUCCESS
name = parse_subtest_header ( lines )
if not name :
return None
test_suite . name = name
expected_test_case_num = parse_subtest_plan ( lines )
2020-10-15 19:28:17 +03:00
if expected_test_case_num is None :
2019-09-23 02:02:43 -07:00
return None
2020-03-16 13:21:24 -07:00
while expected_test_case_num > 0 :
test_case = parse_test_case ( lines )
if not test_case :
break
2019-09-23 02:02:43 -07:00
test_suite . cases . append ( test_case )
expected_test_case_num - = 1
2020-08-04 13:47:44 -07:00
if parse_ok_not_ok_test_suite ( lines , test_suite , expected_suite_index ) :
2019-09-23 02:02:43 -07:00
test_suite . status = bubble_up_test_case_errors ( test_suite )
return test_suite
elif not lines :
print_with_timestamp ( red ( ' [ERROR] ' ) + ' ran out of lines before end token ' )
return test_suite
else :
print ( ' failed to parse end of suite ' + lines [ 0 ] )
return None
TAP_HEADER = re . compile ( r ' ^TAP version 14$ ' )
def parse_tap_header ( lines : List [ str ] ) - > bool :
2020-12-11 14:32:32 -08:00
consume_non_diagnostic ( lines )
2019-09-23 02:02:43 -07:00
if TAP_HEADER . match ( lines [ 0 ] ) :
lines . pop ( 0 )
return True
else :
return False
2020-08-04 13:47:44 -07:00
TEST_PLAN = re . compile ( r ' [0-9]+ \ . \ .([0-9]+) ' )
2020-10-21 15:07:52 -07:00
def parse_test_plan ( lines : List [ str ] ) - > Optional [ int ] :
2020-12-11 14:32:32 -08:00
consume_non_diagnostic ( lines )
2020-08-04 13:47:44 -07:00
match = TEST_PLAN . match ( lines [ 0 ] )
if match :
lines . pop ( 0 )
return int ( match . group ( 1 ) )
else :
return None
2021-01-14 16:39:12 -08:00
def bubble_up_suite_errors ( test_suites : Iterable [ TestSuite ] ) - > TestStatus :
return bubble_up_errors ( x . status for x in test_suites )
2019-09-23 02:02:43 -07:00
def parse_test_result ( lines : List [ str ] ) - > TestResult :
2020-12-11 14:32:32 -08:00
consume_non_diagnostic ( lines )
2020-06-11 21:05:45 +00:00
if not lines or not parse_tap_header ( lines ) :
return TestResult ( TestStatus . NO_TESTS , [ ] , lines )
2020-08-04 13:47:44 -07:00
expected_test_suite_num = parse_test_plan ( lines )
if not expected_test_suite_num :
return TestResult ( TestStatus . FAILURE_TO_PARSE_TESTS , [ ] , lines )
2019-09-23 02:02:43 -07:00
test_suites = [ ]
2020-08-04 13:47:44 -07:00
for i in range ( 1 , expected_test_suite_num + 1 ) :
test_suite = parse_test_suite ( lines , i )
if test_suite :
test_suites . append ( test_suite )
else :
print_with_timestamp (
red ( ' [ERROR] ' ) + ' expected ' +
str ( expected_test_suite_num ) +
' test suites, but got ' + str ( i - 2 ) )
break
test_suite = parse_test_suite ( lines , - 1 )
if test_suite :
print_with_timestamp ( red ( ' [ERROR] ' ) +
' got unexpected test suite: ' + test_suite . name )
if test_suites :
return TestResult ( bubble_up_suite_errors ( test_suites ) , test_suites , lines )
else :
return TestResult ( TestStatus . NO_TESTS , [ ] , lines )
2019-09-23 02:02:43 -07:00
2020-10-21 15:07:52 -07:00
def print_and_count_results ( test_result : TestResult ) - > Tuple [ int , int , int ] :
2019-09-23 02:02:43 -07:00
total_tests = 0
failed_tests = 0
crashed_tests = 0
for test_suite in test_result . suites :
if test_suite . status == TestStatus . SUCCESS :
print_suite_divider ( green ( ' [PASSED] ' ) + test_suite . name )
elif test_suite . status == TestStatus . TEST_CRASHED :
print_suite_divider ( red ( ' [CRASHED] ' + test_suite . name ) )
else :
print_suite_divider ( red ( ' [FAILED] ' ) + test_suite . name )
for test_case in test_suite . cases :
total_tests + = 1
if test_case . status == TestStatus . SUCCESS :
print_with_timestamp ( green ( ' [PASSED] ' ) + test_case . name )
elif test_case . status == TestStatus . TEST_CRASHED :
crashed_tests + = 1
print_with_timestamp ( red ( ' [CRASHED] ' + test_case . name ) )
print_log ( map ( yellow , test_case . log ) )
print_with_timestamp ( ' ' )
else :
failed_tests + = 1
print_with_timestamp ( red ( ' [FAILED] ' ) + test_case . name )
print_log ( map ( yellow , test_case . log ) )
print_with_timestamp ( ' ' )
2020-08-04 13:47:44 -07:00
return total_tests , failed_tests , crashed_tests
def parse_run_tests ( kernel_output ) - > TestResult :
total_tests = 0
failed_tests = 0
crashed_tests = 0
test_result = parse_test_result ( list ( isolate_kunit_output ( kernel_output ) ) )
if test_result . status == TestStatus . NO_TESTS :
print ( red ( ' [ERROR] ' ) + yellow ( ' no tests run! ' ) )
elif test_result . status == TestStatus . FAILURE_TO_PARSE_TESTS :
print ( red ( ' [ERROR] ' ) + yellow ( ' could not parse test results! ' ) )
else :
( total_tests ,
failed_tests ,
crashed_tests ) = print_and_count_results ( test_result )
2019-09-23 02:02:43 -07:00
print_with_timestamp ( DIVIDER )
fmt = green if test_result . status == TestStatus . SUCCESS else red
print_with_timestamp (
fmt ( ' Testing complete. %d tests run. %d failed. %d crashed. ' %
( total_tests , failed_tests , crashed_tests ) ) )
return test_result