1
0
mirror of git://sourceware.org/git/lvm2.git synced 2024-12-21 13:34:40 +03:00

lvmdbusd: refactor and correct fetch thread logic

Simplify the fetch thread and correct the logic used for selecting the
options which are used when we batch update a state refresh.
This commit is contained in:
Tony Asleson 2022-08-29 16:18:06 -05:00
parent 25abe41b00
commit e5c41b94b8

View File

@ -140,15 +140,29 @@ class StateUpdate(object):
except queue.Empty: except queue.Empty:
pass pass
def _load_args(requests):
"""
If we have multiple requests in the queue, they might not all have the same options. If any of the requests
have an option set we need to honor it.
"""
refresh = any([r.refresh for r in requests])
emit_signal = any([r.emit_signal for r in requests])
cache_refresh = any([r.cache_refresh for r in requests])
log = any([r.log for r in requests])
need_main_thread = any([r.need_main_thread for r in requests])
return refresh, emit_signal, cache_refresh, log, need_main_thread
def _drain_queue(queued, incoming):
try:
while True:
queued.append(incoming.get(block=False))
except queue.Empty:
pass
while cfg.run.value != 0: while cfg.run.value != 0:
# noinspection PyBroadException # noinspection PyBroadException
try: try:
refresh = True
emit_signal = True
cache_refresh = True
log = True
need_main_thread = True
with obj.lock: with obj.lock:
wait = not obj.deferred wait = not obj.deferred
obj.deferred = False obj.deferred = False
@ -156,36 +170,17 @@ class StateUpdate(object):
if len(queued_requests) == 0 and wait: if len(queued_requests) == 0 and wait:
# Note: If we don't have anything for 2 seconds we will # Note: If we don't have anything for 2 seconds we will
# get a queue.Empty exception raised here # get a queue.Empty exception raised here
queued_requests.append(obj.queue.get(True, 2)) queued_requests.append(obj.queue.get(block=True, timeout=2))
# Ok we have one or the deferred queue has some, # Ok we have one or the deferred queue has some,
# check if any others # check if any others and grab them too
try: _drain_queue(queued_requests, obj.queue)
while True:
queued_requests.append(obj.queue.get(False))
except queue.Empty:
pass
if len(queued_requests) > 1: if len(queued_requests) > 1:
log_debug("Processing %d updates!" % len(queued_requests), log_debug("Processing %d updates!" % len(queued_requests),
'bg_black', 'fg_light_green') 'bg_black', 'fg_light_green')
# We have what we can, run the update with the needed options num_changes = load(*_load_args(queued_requests))
for i in queued_requests:
if not i.refresh:
refresh = False
if not i.emit_signal:
emit_signal = False
if not i.cache_refresh:
cache_refresh = False
if not i.log:
log = False
if not i.need_main_thread:
need_main_thread = False
num_changes = load(refresh, emit_signal, cache_refresh, log,
need_main_thread)
# Update is done, let everyone know! # Update is done, let everyone know!
set_results(num_changes) set_results(num_changes)