tpm: selftest: add test covering async mode
Add a test that sends a tpm cmd in an async mode. Currently there is a gap in test coverage with regards to this functionality. Signed-off-by: Tadeusz Struk <tadeusz.struk@intel.com> Reviewed-by: Jarkko Sakkinen <jarkko.sakkinen@linux.intel.com> Signed-off-by: Jarkko Sakkinen <jarkko.sakkinen@linux.intel.com>
This commit is contained in:
parent
d23d124843
commit
8f84bddcfa
@ -2,3 +2,4 @@
|
|||||||
# SPDX-License-Identifier: (GPL-2.0 OR BSD-3-Clause)
|
# SPDX-License-Identifier: (GPL-2.0 OR BSD-3-Clause)
|
||||||
|
|
||||||
python -m unittest -v tpm2_tests.SmokeTest
|
python -m unittest -v tpm2_tests.SmokeTest
|
||||||
|
python -m unittest -v tpm2_tests.AsyncTest
|
||||||
|
@ -6,8 +6,8 @@ import socket
|
|||||||
import struct
|
import struct
|
||||||
import sys
|
import sys
|
||||||
import unittest
|
import unittest
|
||||||
from fcntl import ioctl
|
import fcntl
|
||||||
|
import select
|
||||||
|
|
||||||
TPM2_ST_NO_SESSIONS = 0x8001
|
TPM2_ST_NO_SESSIONS = 0x8001
|
||||||
TPM2_ST_SESSIONS = 0x8002
|
TPM2_ST_SESSIONS = 0x8002
|
||||||
@ -352,6 +352,7 @@ def hex_dump(d):
|
|||||||
class Client:
|
class Client:
|
||||||
FLAG_DEBUG = 0x01
|
FLAG_DEBUG = 0x01
|
||||||
FLAG_SPACE = 0x02
|
FLAG_SPACE = 0x02
|
||||||
|
FLAG_NONBLOCK = 0x04
|
||||||
TPM_IOC_NEW_SPACE = 0xa200
|
TPM_IOC_NEW_SPACE = 0xa200
|
||||||
|
|
||||||
def __init__(self, flags = 0):
|
def __init__(self, flags = 0):
|
||||||
@ -362,13 +363,27 @@ class Client:
|
|||||||
else:
|
else:
|
||||||
self.tpm = open('/dev/tpmrm0', 'r+b', buffering=0)
|
self.tpm = open('/dev/tpmrm0', 'r+b', buffering=0)
|
||||||
|
|
||||||
|
if (self.flags & Client.FLAG_NONBLOCK):
|
||||||
|
flags = fcntl.fcntl(self.tpm, fcntl.F_GETFL)
|
||||||
|
flags |= os.O_NONBLOCK
|
||||||
|
fcntl.fcntl(self.tpm, fcntl.F_SETFL, flags)
|
||||||
|
self.tpm_poll = select.poll()
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.tpm.close()
|
self.tpm.close()
|
||||||
|
|
||||||
def send_cmd(self, cmd):
|
def send_cmd(self, cmd):
|
||||||
self.tpm.write(cmd)
|
self.tpm.write(cmd)
|
||||||
|
|
||||||
|
if (self.flags & Client.FLAG_NONBLOCK):
|
||||||
|
self.tpm_poll.register(self.tpm, select.POLLIN)
|
||||||
|
self.tpm_poll.poll(10000)
|
||||||
|
|
||||||
rsp = self.tpm.read()
|
rsp = self.tpm.read()
|
||||||
|
|
||||||
|
if (self.flags & Client.FLAG_NONBLOCK):
|
||||||
|
self.tpm_poll.unregister(self.tpm)
|
||||||
|
|
||||||
if (self.flags & Client.FLAG_DEBUG) != 0:
|
if (self.flags & Client.FLAG_DEBUG) != 0:
|
||||||
sys.stderr.write('cmd' + os.linesep)
|
sys.stderr.write('cmd' + os.linesep)
|
||||||
sys.stderr.write(hex_dump(cmd) + os.linesep)
|
sys.stderr.write(hex_dump(cmd) + os.linesep)
|
||||||
|
@ -288,3 +288,16 @@ class SpaceTest(unittest.TestCase):
|
|||||||
|
|
||||||
self.assertEqual(rc, tpm2.TPM2_RC_COMMAND_CODE |
|
self.assertEqual(rc, tpm2.TPM2_RC_COMMAND_CODE |
|
||||||
tpm2.TSS2_RESMGR_TPM_RC_LAYER)
|
tpm2.TSS2_RESMGR_TPM_RC_LAYER)
|
||||||
|
|
||||||
|
class AsyncTest(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
logging.basicConfig(filename='AsyncTest.log', level=logging.DEBUG)
|
||||||
|
|
||||||
|
def test_async(self):
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
log.debug(sys._getframe().f_code.co_name)
|
||||||
|
|
||||||
|
async_client = tpm2.Client(tpm2.Client.FLAG_NONBLOCK)
|
||||||
|
log.debug("Calling get_cap in a NON_BLOCKING mode")
|
||||||
|
async_client.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_LOADED_SESSION)
|
||||||
|
async_client.close()
|
||||||
|
Loading…
Reference in New Issue
Block a user