# Copyright (C) 2015-2016 Red Hat, Inc. All rights reserved. # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions # of the GNU General Public License v.2. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from .pv import load_pvs from .vg import load_vgs from .lv import load_lvs from . import cfg from .utils import MThreadRunner, log_debug, log_error import threading import queue import time import traceback def _main_thread_load(refresh=True, emit_signal=True): num_total_changes = 0 to_remove = [] (changes, remove) = load_pvs( refresh=refresh, emit_signal=emit_signal, cache_refresh=False)[1:] num_total_changes += changes to_remove.extend(remove) (changes, remove) = load_vgs( refresh=refresh, emit_signal=emit_signal, cache_refresh=False)[1:] num_total_changes += changes to_remove.extend(remove) (lv_changes, remove) = load_lvs( refresh=refresh, emit_signal=emit_signal, cache_refresh=False)[1:] num_total_changes += lv_changes to_remove.extend(remove) # When the LVs change it can cause another change in the VGs which is # missed if we don't scan through the VGs again. We could achieve this # the other way and re-scan the LVs, but in general there are more LVs than # VGs, thus this should be more efficient. This happens when a LV interface # changes causing the dbus object representing it to be removed and # recreated. if refresh and lv_changes > 0: (changes, remove) = load_vgs( refresh=refresh, emit_signal=emit_signal, cache_refresh=False)[1:] num_total_changes += changes to_remove.extend(remove) # Remove any objects that are no longer needed. We do this after we process # all the objects to ensure that references still exist for objects that # are processed after them. to_remove.reverse() for i in to_remove: dbus_obj = cfg.om.get_object_by_path(i) if dbus_obj: cfg.om.remove_object(dbus_obj, True) num_total_changes += 1 return num_total_changes def load(refresh=True, emit_signal=True, cache_refresh=True, log=True, need_main_thread=True): # Go through and load all the PVs, VGs and LVs if cache_refresh: cfg.db.refresh(log) if need_main_thread: rc = MThreadRunner(_main_thread_load, refresh, emit_signal).done() else: rc = _main_thread_load(refresh, emit_signal) return rc # Even though lvm can handle multiple changes concurrently it really doesn't # make sense to make a 1-1 fetch of data for each change of lvm because when # we fetch the data once all previous changes are reflected. class StateUpdate(object): class UpdateRequest(object): def __init__(self, refresh, emit_signal, cache_refresh, log, need_main_thread): self.is_done = False self.refresh = refresh self.emit_signal = emit_signal self.cache_refresh = cache_refresh self.log = log self.need_main_thread = need_main_thread self.result = None self.cond = threading.Condition(threading.Lock()) def done(self): with self.cond: if not self.is_done: self.cond.wait() return self.result def set_result(self, result): with self.cond: self.result = result self.is_done = True self.cond.notify_all() @staticmethod def update_thread(obj): exception_count = 0 queued_requests = [] def set_results(val): nonlocal queued_requests for idx in queued_requests: idx.set_result(val) # Only clear out the requests after we have given them a result # otherwise we can orphan the waiting threads, and they never # wake up if we get an exception queued_requests = [] def bailing(rv): set_results(rv) try: while True: item = obj.queue.get(False) item.set_result(rv) except queue.Empty: pass while cfg.run.value != 0: # noinspection PyBroadException try: refresh = True emit_signal = True cache_refresh = True log = True need_main_thread = True with obj.lock: wait = not obj.deferred obj.deferred = False if len(queued_requests) == 0 and wait: queued_requests.append(obj.queue.get(True, 2)) # Ok we have one or the deferred queue has some, # check if any others try: while True: queued_requests.append(obj.queue.get(False)) except queue.Empty: pass if len(queued_requests) > 1: log_debug("Processing %d updates!" % len(queued_requests), 'bg_black', 'fg_light_green') # We have what we can, run the update with the needed options for i in queued_requests: if not i.refresh: refresh = False if not i.emit_signal: emit_signal = False if not i.cache_refresh: cache_refresh = False if not i.log: log = False if not i.need_main_thread: need_main_thread = False num_changes = load(refresh, emit_signal, cache_refresh, log, need_main_thread) # Update is done, let everyone know! set_results(num_changes) # We retrieved OK, clear exception count exception_count = 0 except queue.Empty: pass except Exception as e: st = traceback.format_exc() log_error("update_thread exception: \n%s" % st) cfg.flightrecorder.dump() exception_count += 1 if exception_count >= 5: bailing(e) log_error("Too many errors in update_thread, exiting daemon") cfg.exit_daemon() else: # Slow things down when encountering errors time.sleep(1) # Make sure to unblock any that may be waiting before we exit this thread # otherwise they hang forever ... bailing(Exception("update thread exiting")) log_debug("update thread exiting!") def __init__(self): self.lock = threading.RLock() self.queue = queue.Queue() self.deferred = False # Do initial load load(refresh=False, emit_signal=False, need_main_thread=False) self.thread = threading.Thread(target=StateUpdate.update_thread, args=(self,), name="StateUpdate.update_thread") def load(self, refresh=True, emit_signal=True, cache_refresh=True, log=True, need_main_thread=True): # Place this request on the queue and wait for it to be completed req = StateUpdate.UpdateRequest(refresh, emit_signal, cache_refresh, log, need_main_thread) self.queue.put(req) return req.done() def event(self): with self.lock: self.deferred = True