1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-22 22:03:54 +03:00

Fixed missing method for testing publication queue

This commit is contained in:
Adolfo Gómez García 2024-08-31 19:49:24 +02:00
parent ef4b7e5bac
commit f4ee6e50c0
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
2 changed files with 33 additions and 28 deletions

View File

@ -631,6 +631,10 @@ class DynamicTestingPublicationQueue(dynamic_publication.DynamicPublication):
self.mock.shutdown_completed_checker()
return TaskState.FINISHED
def op_delete_checker(self) -> types.states.TaskState:
self.mock.remove_checker()
return types.states.TaskState.FINISHED
def op_delete_completed_checker(self) -> types.states.TaskState:
self.mock.remove_completed_checker()
return TaskState.FINISHED
@ -665,6 +669,7 @@ class DynamicTestingServiceForDeferredDeletion(dynamic_service.DynamicService):
return True
def notify_deleted(self, vmid: str) -> None:
super().notify_deleted(vmid) # to update delete_running flag
self.mock.notify_deleted(vmid)
return
@ -716,7 +721,7 @@ class DynamicTestingProvider(services.provider.ServiceProvider):
def create_dynamic_provider() -> DynamicTestingProvider:
uuid_ = str(uuid.uuid4())
return DynamicTestingProvider(environment=environment.Environment.private_environment(uuid), uuid=uuid_)
return DynamicTestingProvider(environment=environment.Environment.private_environment(uuid_), uuid=uuid_)
def create_dynamic_service(

View File

@ -48,7 +48,7 @@ from . import fixtures
class DynamicPublicationIterationInfo:
queue: list[types.services.Operation]
service_calls: list[mock._Call] = dataclasses.field(default_factory=list)
user_service_calls: list[mock._Call] = dataclasses.field(default_factory=list)
publication_calls: list[mock._Call] = dataclasses.field(default_factory=list)
state: str = types.states.TaskState.RUNNING
def __mul__(self, other: int) -> list['DynamicPublicationIterationInfo']:
@ -94,8 +94,8 @@ class DynamicPublicationTest(UDSTestCase):
)
self.assertEqual(
publication.mock.mock_calls,
info.user_service_calls,
f'************ Userservice calls: {iteration} {publication.mock.mock_calls}',
info.publication_calls,
f'************ Publication calls: {iteration} {publication.mock.mock_calls}',
)
def test_publication_queue_works_fine(self) -> None:
@ -202,132 +202,132 @@ EXPECTED_DEPLOY_ITERATIONS_INFO: typing.Final[list[DynamicPublicationIterationIn
# Initial state for queue
DynamicPublicationIterationInfo( # 1, INITIALIZE
queue=fixtures.PUB_TESTEABLE_OPERATIONS,
user_service_calls=[
publication_calls=[
call.initialize(),
],
),
DynamicPublicationIterationInfo( # 2, CREATE
queue=fixtures.PUB_TESTEABLE_OPERATIONS[1:],
user_service_calls=[call.initialize_checker(), call.create()],
publication_calls=[call.initialize_checker(), call.create()],
),
DynamicPublicationIterationInfo( # 3, CREATE_COMPLETED
queue=fixtures.PUB_TESTEABLE_OPERATIONS[2:],
user_service_calls=[call.create_checker(), call.create_completed()],
publication_calls=[call.create_checker(), call.create_completed()],
),
DynamicPublicationIterationInfo( # 4, START
queue=fixtures.PUB_TESTEABLE_OPERATIONS[3:],
user_service_calls=[call.create_completed_checker()],
publication_calls=[call.create_completed_checker()],
service_calls=[
call.start(MustBeOfType(fixtures.DynamicTestingPublicationQueue), MustBeOfType(str))
],
),
DynamicPublicationIterationInfo( # 5, START_COMPLETED
queue=fixtures.PUB_TESTEABLE_OPERATIONS[4:],
user_service_calls=[call.start_completed()],
publication_calls=[call.start_completed()],
service_calls=[
call.is_running(MustBeOfType(fixtures.DynamicTestingPublicationQueue), MustBeOfType(str))
],
),
DynamicPublicationIterationInfo( # 6, STOP
queue=fixtures.PUB_TESTEABLE_OPERATIONS[5:],
user_service_calls=[call.start_completed_checker()],
publication_calls=[call.start_completed_checker()],
service_calls=[
call.stop(MustBeOfType(fixtures.DynamicTestingPublicationQueue), MustBeOfType(str))
],
),
DynamicPublicationIterationInfo( # 7, STOP_COMPLETED
queue=fixtures.PUB_TESTEABLE_OPERATIONS[6:],
user_service_calls=[call.stop_completed()],
publication_calls=[call.stop_completed()],
service_calls=[
call.is_running(MustBeOfType(fixtures.DynamicTestingPublicationQueue), MustBeOfType(str))
],
),
DynamicPublicationIterationInfo( # 8, SHUTDOWN
queue=fixtures.PUB_TESTEABLE_OPERATIONS[7:],
user_service_calls=[call.stop_completed_checker()],
publication_calls=[call.stop_completed_checker()],
service_calls=[
call.shutdown(MustBeOfType(fixtures.DynamicTestingPublicationQueue), MustBeOfType(str)),
],
),
DynamicPublicationIterationInfo( # 9, SHUTDOWN_COMPLETED
queue=fixtures.PUB_TESTEABLE_OPERATIONS[8:],
user_service_calls=[call.shutdown_completed()],
publication_calls=[call.shutdown_completed()],
),
DynamicPublicationIterationInfo( # 10, REMOVE
queue=fixtures.PUB_TESTEABLE_OPERATIONS[9:],
user_service_calls=[call.shutdown_completed_checker()],
publication_calls=[call.shutdown_completed_checker()],
service_calls=[
call.remove(MustBeOfType(fixtures.DynamicTestingPublicationQueue), MustBeOfType(str))
],
),
DynamicPublicationIterationInfo( # 11, REMOVE_COMPLETED
queue=fixtures.PUB_TESTEABLE_OPERATIONS[10:],
user_service_calls=[call.remove_checker(), call.remove_completed()],
publication_calls=[call.remove_checker(), call.remove_completed()],
),
DynamicPublicationIterationInfo( # 12, NOP
queue=fixtures.PUB_TESTEABLE_OPERATIONS[11:],
user_service_calls=[call.remove_completed_checker(), call.nop()],
publication_calls=[call.remove_completed_checker(), call.nop()],
),
DynamicPublicationIterationInfo( # 13, DESTROY_VALIDATOR
queue=fixtures.PUB_TESTEABLE_OPERATIONS[12:],
user_service_calls=[call.nop_checker(), call.destroy_validator()],
publication_calls=[call.nop_checker(), call.destroy_validator()],
),
DynamicPublicationIterationInfo( # 14, CUSTOM_1
queue=fixtures.PUB_TESTEABLE_OPERATIONS[13:],
user_service_calls=[call.destroy_validator_checker(), call.custom(types.services.Operation.CUSTOM_1)],
publication_calls=[call.destroy_validator_checker(), call.custom(types.services.Operation.CUSTOM_1)],
),
DynamicPublicationIterationInfo( # 15, CUSTOM_2
queue=fixtures.PUB_TESTEABLE_OPERATIONS[14:],
user_service_calls=[
publication_calls=[
call.custom_checker(types.services.Operation.CUSTOM_1),
call.custom(types.services.Operation.CUSTOM_2),
],
),
DynamicPublicationIterationInfo( # 16, CUSTOM_3
queue=fixtures.PUB_TESTEABLE_OPERATIONS[15:],
user_service_calls=[
publication_calls=[
call.custom_checker(types.services.Operation.CUSTOM_2),
call.custom(types.services.Operation.CUSTOM_3),
],
),
DynamicPublicationIterationInfo( # 17, CUSTOM_4
queue=fixtures.PUB_TESTEABLE_OPERATIONS[16:],
user_service_calls=[
publication_calls=[
call.custom_checker(types.services.Operation.CUSTOM_3),
call.custom(types.services.Operation.CUSTOM_4),
],
),
DynamicPublicationIterationInfo( # 18, CUSTOM_5
queue=fixtures.PUB_TESTEABLE_OPERATIONS[17:],
user_service_calls=[
publication_calls=[
call.custom_checker(types.services.Operation.CUSTOM_4),
call.custom(types.services.Operation.CUSTOM_5),
],
),
DynamicPublicationIterationInfo( # 19, CUSTOM_6
queue=fixtures.PUB_TESTEABLE_OPERATIONS[18:],
user_service_calls=[
publication_calls=[
call.custom_checker(types.services.Operation.CUSTOM_5),
call.custom(types.services.Operation.CUSTOM_6),
],
),
DynamicPublicationIterationInfo( # 20, CUSTOM_7
queue=fixtures.PUB_TESTEABLE_OPERATIONS[19:],
user_service_calls=[
publication_calls=[
call.custom_checker(types.services.Operation.CUSTOM_6),
call.custom(types.services.Operation.CUSTOM_7),
],
),
DynamicPublicationIterationInfo( # 21, CUSTOM_8
queue=fixtures.PUB_TESTEABLE_OPERATIONS[20:],
user_service_calls=[
publication_calls=[
call.custom_checker(types.services.Operation.CUSTOM_7),
call.custom(types.services.Operation.CUSTOM_8),
],
),
DynamicPublicationIterationInfo( # 22, CUSTOM_9
queue=fixtures.PUB_TESTEABLE_OPERATIONS[21:],
user_service_calls=[
publication_calls=[
call.custom_checker(types.services.Operation.CUSTOM_8),
call.custom(types.services.Operation.CUSTOM_9),
],
@ -335,7 +335,7 @@ EXPECTED_DEPLOY_ITERATIONS_INFO: typing.Final[list[DynamicPublicationIterationIn
DynamicPublicationIterationInfo(
queue=fixtures.PUB_TESTEABLE_OPERATIONS[22:],
state=types.states.TaskState.FINISHED,
user_service_calls=[
publication_calls=[
call.custom_checker(types.services.Operation.CUSTOM_9),
],
), # 28, FINISH