diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index 4cef92391f..341ed3877c 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -585,7 +585,7 @@ class Command(NoArgsCommand): self._batch_add_m2m_cache = {} cached_objs = self._batch_add_m2m_cache.setdefault(key, []) cached_objs.extend(objs) - if len(cached_objs) > 100 or flush: + if len(cached_objs) > self._batch_size or flush: if len(cached_objs): related_manager.add(*cached_objs) self._batch_add_m2m_cache[key] = [] @@ -635,25 +635,39 @@ class Command(NoArgsCommand): if settings.SQL_DEBUG: queries_before = len(connection.queries) if self.inventory_source.group: - del_hosts = self.inventory_source.group.all_hosts + hosts_qs = self.inventory_source.group.all_hosts # FIXME: Also include hosts from inventory_source.managed_hosts? else: - del_hosts = self.inventory.hosts.filter(active=True) + hosts_qs = self.inventory.hosts.filter(active=True) + # Build list of all host pks, remove all that should not be deleted. + del_host_pks = set(hosts_qs.values_list('pk', flat=True)) if self.instance_id_var: - instance_ids = set(self.mem_instance_id_map.keys()) - host_pks = set([v for k,v in self.db_instance_id_map.items() if k in instance_ids]) - host_names = set(self.mem_instance_id_map.values()) - set(self.all_group.all_hosts.keys()) - del_hosts = del_hosts.exclude(Q(name__in=host_names) | Q(instance_id__in=instance_ids) | Q(pk__in=host_pks)) + all_instance_ids = self.mem_instance_id_map.keys() + for offset in xrange(0, len(all_instance_ids), self._batch_size): + instance_ids = all_instance_ids[offset:(offset + self._batch_size)] + for host_pk in hosts_qs.filter(instance_id__in=instance_ids).values_list('pk', flat=True): + del_host_pks.discard(host_pk) + for host_pk in set([v for k,v in self.db_instance_id_map.items() if k in instance_ids]): + del_host_pks.discard(host_pk) + all_host_names = list(set(self.mem_instance_id_map.values()) - set(self.all_group.all_hosts.keys())) else: - del_hosts = del_hosts.exclude(name__in=self.all_group.all_hosts.keys()) - for host in del_hosts: - host_name = host.name - host.mark_inactive()#from_inventory_import=True) - self.logger.info('Deleted host "%s"', host_name) + all_host_names = self.all_group.all_hosts.keys() + for offset in xrange(0, len(all_host_names), self._batch_size): + host_names = all_host_names[offset:(offset + self._batch_size)] + for host_pk in hosts_qs.filter(name__in=host_names).values_list('pk', flat=True): + del_host_pks.discard(host_pk) + # Now delete all remaining hosts in batches. + all_del_pks = list(del_host_pks) + for offset in xrange(0, len(all_del_pks), self._batch_size): + del_pks = all_del_pks[offset:(offset + self._batch_size)] + for host in hosts_qs.filter(pk__in=del_pks): + host_name = host.name + host.mark_inactive() + self.logger.info('Deleted host "%s"', host_name) if settings.SQL_DEBUG: self.logger.warning('host deletions took %d queries for %d hosts', len(connection.queries) - queries_before, - del_hosts.count()) + len(all_del_pks)) def _delete_groups(self): ''' @@ -665,20 +679,29 @@ class Command(NoArgsCommand): if settings.SQL_DEBUG: queries_before = len(connection.queries) if self.inventory_source.group: - del_groups = self.inventory_source.group.all_children + groups_qs = self.inventory_source.group.all_children # FIXME: Also include groups from inventory_source.managed_groups? else: - del_groups = self.inventory.groups.filter(active=True) - group_names = set(self.all_group.all_groups.keys()) - del_groups = del_groups.exclude(name__in=group_names) - for group in del_groups: - group_name = group.name - group.mark_inactive(recompute=False)#from_inventory_import=True) - self.logger.info('Group "%s" deleted', group_name) + groups_qs = self.inventory.groups.filter(active=True) + # Build list of all group pks, remove those that should not be deleted. + del_group_pks = set(groups_qs.values_list('pk', flat=True)) + all_group_names = self.all_group.all_groups.keys() + for offset in xrange(0, len(all_group_names), self._batch_size): + group_names = all_group_names[offset:(offset + self._batch_size)] + for group_pk in groups_qs.filter(name__in=group_names).values_list('pk', flat=True): + del_group_pks.discard(group_pk) + # Now delete all remaining groups in batches. + all_del_pks = list(del_group_pks) + for offset in xrange(0, len(all_del_pks), self._batch_size): + del_pks = all_del_pks[offset:(offset + self._batch_size)] + for group in groups_qs.filter(pk__in=del_pks): + group_name = group.name + group.mark_inactive(recompute=False) + self.logger.info('Group "%s" deleted', group_name) if settings.SQL_DEBUG: self.logger.warning('group deletions took %d queries for %d groups', len(connection.queries) - queries_before, - del_groups.count()) + len(all_del_pks)) def _delete_group_children_and_hosts(self): ''' @@ -697,28 +720,47 @@ class Command(NoArgsCommand): else: db_groups = self.inventory.groups.filter(active=True) for db_group in db_groups: + # Delete child group relationships not present in imported data. db_children = db_group.children.filter(active=True) + db_children_name_pk_map = dict(db_children.values_list('name', 'pk')) mem_children = self.all_group.all_groups[db_group.name].children - mem_children_names = [g.name for g in mem_children] - for db_child in db_children.exclude(name__in=mem_children_names): - group_group_count += 1 - if db_child not in db_group.children.filter(active=True): - continue - db_group.children.remove(db_child) - self.logger.info('Group "%s" removed from group "%s"', - db_child.name, db_group.name) + for mem_group in mem_children: + db_children_name_pk_map.pop(mem_group.name, None) + del_child_group_pks = list(set(db_children_name_pk_map.values())) + for offset in xrange(0, len(del_child_group_pks), self._batch_size): + child_group_pks = del_child_group_pks[offset:(offset + self._batch_size)] + for db_child in db_children.filter(pk__in=child_group_pks): + group_group_count += 1 + db_group.children.remove(db_child) + self.logger.info('Group "%s" removed from group "%s"', + db_child.name, db_group.name) + # Delete group/host relationships not present in imported data. db_hosts = db_group.hosts.filter(active=True) + del_host_pks = set(db_hosts.values_list('pk', flat=True)) mem_hosts = self.all_group.all_groups[db_group.name].hosts - mem_host_names = set([h.name for h in mem_hosts if not h.instance_id]) - mem_instance_ids = set([h.instance_id for h in mem_hosts if h.instance_id]) - db_host_pks = set([v for k,v in self.db_instance_id_map.items() if k in mem_instance_ids]) - for db_host in db_hosts.exclude(Q(name__in=mem_host_names) | Q(instance_id__in=mem_instance_ids) | Q(pk__in=db_host_pks)): - group_host_count += 1 - if db_host not in db_group.hosts.filter(active=True): - continue - db_group.hosts.remove(db_host) - self.logger.info('Host "%s" removed from group "%s"', - db_host.name, db_group.name) + all_mem_host_names = [h.name for h in mem_hosts if not h.instance_id] + for offset in xrange(0, len(all_mem_host_names), self._batch_size): + mem_host_names = all_mem_host_names[offset:(offset + self._batch_size)] + for db_host_pk in db_hosts.filter(name__in=mem_host_names).values_list('pk', flat=True): + del_host_pks.discard(db_host_pk) + all_mem_instance_ids = [h.instance_id for h in mem_hosts if h.instance_id] + for offset in xrange(0, len(all_mem_instance_ids), self._batch_size): + mem_instance_ids = all_mem_instance_ids[offset:(offset + self._batch_size)] + for db_host_pk in db_hosts.filter(instance_id__in=mem_instance_ids).values_list('pk', flat=True): + del_host_pks.discard(db_host_pk) + all_db_host_pks = [v for k,v in self.db_instance_id_map.items() if k in mem_instance_ids] + for db_host_pk in all_db_host_pks: + del_host_pks.discard(db_host_pk) + del_host_pks = list(del_host_pks) + for offset in xrange(0, len(del_host_pks), self._batch_size): + del_pks = del_host_pks[offset:(offset + self._batch_size)] + for db_host in db_hosts.filter(pk__in=del_pks): + group_host_count += 1 + if db_host not in db_group.hosts.filter(active=True): + continue + db_group.hosts.remove(db_host) + self.logger.info('Host "%s" removed from group "%s"', + db_host.name, db_group.name) if settings.SQL_DEBUG: self.logger.warning('group-group and group-host deletions took %d queries for %d relationships', len(connection.queries) - queries_before, @@ -762,28 +804,30 @@ class Command(NoArgsCommand): if settings.SQL_DEBUG: queries_before = len(connection.queries) inv_src_group = self.inventory_source.group - group_names = set(self.all_group.all_groups.keys()) - for group in self.inventory.groups.filter(name__in=group_names): - mem_group = self.all_group.all_groups[group.name] - db_variables = group.variables_dict - if self.overwrite_vars or self.overwrite: - db_variables = mem_group.variables - else: - db_variables.update(mem_group.variables) - if db_variables != group.variables_dict: - group.variables = json.dumps(db_variables) - group.save(update_fields=['variables']) + all_group_names = self.all_group.all_groups.keys() + for offset in xrange(0, len(all_group_names), self._batch_size): + group_names = all_group_names[offset:(offset + self._batch_size)] + for group in self.inventory.groups.filter(name__in=group_names): + mem_group = self.all_group.all_groups[group.name] + db_variables = group.variables_dict if self.overwrite_vars or self.overwrite: - self.logger.info('Group "%s" variables replaced', group.name) + db_variables = mem_group.variables else: - self.logger.info('Group "%s" variables updated', group.name) - else: - self.logger.info('Group "%s" variables unmodified', group.name) - group_names.remove(group.name) - if inv_src_group and inv_src_group != group: - self._batch_add_m2m(inv_src_group.children, group) - self._batch_add_m2m(self.inventory_source.groups, group) - for group_name in group_names: + db_variables.update(mem_group.variables) + if db_variables != group.variables_dict: + group.variables = json.dumps(db_variables) + group.save(update_fields=['variables']) + if self.overwrite_vars or self.overwrite: + self.logger.info('Group "%s" variables replaced', group.name) + else: + self.logger.info('Group "%s" variables updated', group.name) + else: + self.logger.info('Group "%s" variables unmodified', group.name) + all_group_names.remove(group.name) + if inv_src_group and inv_src_group != group: + self._batch_add_m2m(inv_src_group.children, group) + self._batch_add_m2m(self.inventory_source.groups, group) + for group_name in all_group_names: mem_group = self.all_group.all_groups[group_name] group = self.inventory.groups.create(name=group_name, variables=json.dumps(mem_group.variables), description='imported') # Access auto one-to-one attribute to create related object. @@ -890,25 +934,40 @@ class Command(NoArgsCommand): mem_host_name_map[k] = v # Update all existing hosts where we know the PK based on instance_id. - for db_host in self.inventory.hosts.filter(active=True, pk__in=mem_host_pk_map.keys()): - mem_host = mem_host_pk_map[db_host.pk] - self._update_db_host_from_mem_host(db_host, mem_host) - host_pks_updated.add(db_host.pk) - mem_host_names_to_update.discard(mem_host.name) + all_host_pks = mem_host_pk_map.keys() + for offset in xrange(0, len(all_host_pks), self._batch_size): + host_pks = all_host_pks[offset:(offset + self._batch_size)] + for db_host in self.inventory.hosts.filter(active=True, pk__in=host_pks): + if db_host.pk in host_pks_updated: + continue + mem_host = mem_host_pk_map[db_host.pk] + self._update_db_host_from_mem_host(db_host, mem_host) + host_pks_updated.add(db_host.pk) + mem_host_names_to_update.discard(mem_host.name) # Update all existing hosts where we know the instance_id. - for db_host in self.inventory.hosts.filter(active=True, instance_id__in=mem_host_instance_id_map.keys()).exclude(pk__in=host_pks_updated): - mem_host = mem_host_instance_id_map[db_host.instance_id] - self._update_db_host_from_mem_host(db_host, mem_host) - host_pks_updated.add(db_host.pk) - mem_host_names_to_update.discard(mem_host.name) + all_instance_ids = mem_host_instance_id_map.keys() + for offset in xrange(0, len(all_instance_ids), self._batch_size): + instance_ids = all_instance_ids[offset:(offset + self._batch_size)] + for db_host in self.inventory.hosts.filter(active=True, instance_id__in=instance_ids): + if db_host.pk in host_pks_updated: + continue + mem_host = mem_host_instance_id_map[db_host.instance_id] + self._update_db_host_from_mem_host(db_host, mem_host) + host_pks_updated.add(db_host.pk) + mem_host_names_to_update.discard(mem_host.name) # Update all existing hosts by name. - for db_host in self.inventory.hosts.filter(active=True, name__in=mem_host_name_map.keys()).exclude(pk__in=host_pks_updated): - mem_host = mem_host_name_map[db_host.name] - self._update_db_host_from_mem_host(db_host, mem_host) - host_pks_updated.add(db_host.pk) - mem_host_names_to_update.discard(mem_host.name) + all_host_names = mem_host_name_map.keys() + for offset in xrange(0, len(all_host_names), self._batch_size): + host_names = all_host_names[offset:(offset + self._batch_size)] + for db_host in self.inventory.hosts.filter(active=True, name__in=host_names): + if db_host.pk in host_pks_updated: + continue + mem_host = mem_host_name_map[db_host.name] + self._update_db_host_from_mem_host(db_host, mem_host) + host_pks_updated.add(db_host.pk) + mem_host_names_to_update.discard(mem_host.name) # Create any new hosts. for mem_host_name in mem_host_names_to_update: @@ -951,19 +1010,22 @@ class Command(NoArgsCommand): ''' if settings.SQL_DEBUG: queries_before = len(connection.queries) - group_names = [k for k,v in self.all_group.all_groups.iteritems() if v.children] + all_group_names = [k for k,v in self.all_group.all_groups.iteritems() if v.children] group_group_count = 0 - for db_group in self.inventory.groups.filter(name__in=group_names): - mem_group = self.all_group.all_groups[db_group.name] - group_group_count += len(mem_group.children) - child_names = set([g.name for g in mem_group.children]) - db_children_qs = self.inventory.groups.filter(name__in=child_names) - for db_child in db_children_qs.filter(children__id=db_group.id): - self.logger.info('Group "%s" already child of group "%s"', db_child.name, db_group.name) - for db_child in db_children_qs.exclude(children__id=db_group.id): - self._batch_add_m2m(db_group.children, db_child) - self.logger.info('Group "%s" added as child of "%s"', db_child.name, db_group.name) - self._batch_add_m2m(db_group.children, flush=True) + for offset in xrange(0, len(all_group_names), self._batch_size): + group_names = all_group_names[offset:(offset + self._batch_size)] + for db_group in self.inventory.groups.filter(name__in=group_names): + mem_group = self.all_group.all_groups[db_group.name] + group_group_count += len(mem_group.children) + child_names = set([g.name for g in mem_group.children]) + db_children_qs = self.inventory.groups.filter(name__in=child_names) + # FIXME: May fail unit tests when len(child_names) > 1000. + for db_child in db_children_qs.filter(children__id=db_group.id): + self.logger.info('Group "%s" already child of group "%s"', db_child.name, db_group.name) + for db_child in db_children_qs.exclude(children__id=db_group.id): + self._batch_add_m2m(db_group.children, db_child) + self.logger.info('Group "%s" added as child of "%s"', db_child.name, db_group.name) + self._batch_add_m2m(db_group.children, flush=True) if settings.SQL_DEBUG: self.logger.warning('Group-group updates took %d queries for %d group-group relationships', len(connection.queries) - queries_before, group_group_count) @@ -973,20 +1035,32 @@ class Command(NoArgsCommand): # belongs. if settings.SQL_DEBUG: queries_before = len(connection.queries) - group_names = [k for k,v in self.all_group.all_groups.iteritems() if v.hosts] + all_group_names = [k for k,v in self.all_group.all_groups.iteritems() if v.hosts] group_host_count = 0 - for db_group in self.inventory.groups.filter(name__in=group_names): - mem_group = self.all_group.all_groups[db_group.name] - group_host_count += len(mem_group.hosts) - host_names = set([h.name for h in mem_group.hosts if not h.instance_id]) - host_instance_ids = set([h.instance_id for h in mem_group.hosts if h.instance_id]) - db_hosts_qs = self.inventory.hosts.filter(Q(name__in=host_names) | Q(instance_id__in=host_instance_ids)) - for db_host in db_hosts_qs.filter(groups__id=db_group.id): - self.logger.info('Host "%s" already in group "%s"', db_host.name, db_group.name) - for db_host in db_hosts_qs.exclude(groups__id=db_group.id): - self._batch_add_m2m(db_group.hosts, db_host) - self.logger.info('Host "%s" added to group "%s"', db_host.name, db_group.name) - self._batch_add_m2m(db_group.hosts, flush=True) + for offset in xrange(0, len(all_group_names), self._batch_size): + group_names = all_group_names[offset:(offset + self._batch_size)] + for db_group in self.inventory.groups.filter(name__in=group_names): + mem_group = self.all_group.all_groups[db_group.name] + group_host_count += len(mem_group.hosts) + all_host_names = [h.name for h in mem_group.hosts if not h.instance_id] + for offset2 in xrange(0, len(all_host_names), self._batch_size): + host_names = all_host_names[offset2:(offset2 + self._batch_size)] + db_hosts_qs = self.inventory.hosts.filter(name__in=host_names) + for db_host in db_hosts_qs.filter(groups__id=db_group.id): + self.logger.info('Host "%s" already in group "%s"', db_host.name, db_group.name) + for db_host in db_hosts_qs.exclude(groups__id=db_group.id): + self._batch_add_m2m(db_group.hosts, db_host) + self.logger.info('Host "%s" added to group "%s"', db_host.name, db_group.name) + all_instance_ids = [h.instance_id for h in mem_group.hosts if h.instance_id] + for offset2 in xrange(0, len(all_instance_ids), self._batch_size): + instance_ids = all_instance_ids[offset2:(offset2 + self._batch_size)] + db_hosts_qs = self.inventory.hosts.filter(instance_id__in=instance_ids) + for db_host in db_hosts_qs.filter(groups__id=db_group.id): + self.logger.info('Host "%s" already in group "%s"', db_host.name, db_group.name) + for db_host in db_hosts_qs.exclude(groups__id=db_group.id): + self._batch_add_m2m(db_group.hosts, db_host) + self.logger.info('Host "%s" added to group "%s"', db_host.name, db_group.name) + self._batch_add_m2m(db_group.hosts, flush=True) if settings.SQL_DEBUG: self.logger.warning('Group-host updates took %d queries for %d group-host relationships', len(connection.queries) - queries_before, group_host_count) @@ -997,6 +1071,8 @@ class Command(NoArgsCommand): merging as appropriate. ''' # FIXME: Attribute changes to superuser? + # Perform __in queries in batches (mainly for unit tests using SQLite). + self._batch_size = 500 self._build_db_instance_id_map() self._build_mem_instance_id_map() if self.overwrite: diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 47d064e469..0bced52af6 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -202,12 +202,15 @@ class Inventory(CommonModel): for host_pk in hosts_to_clear.values_list('pk', flat=True): host_updates = hosts_to_updates.setdefault(host_pk, {}) host_updates['has_inventory_sources'] = False - # Now apply updates to hosts where needed. - for host in hosts_qs.filter(pk__in=hosts_to_update.keys()): - host_updates = hosts_to_update[host.pk] - for field, value in host_updates.items(): - setattr(host, field, value) - host.save(update_fields=host_updates.keys()) + # Now apply updates to hosts where needed (in batches). + all_update_pks = hosts_to_update.keys() + for offset in xrange(0, len(all_update_pks), 500): + update_pks = all_update_pks[offset:(offset + 500)] + for host in hosts_qs.filter(pk__in=update_pks): + host_updates = hosts_to_update[host.pk] + for field, value in host_updates.items(): + setattr(host, field, value) + host.save(update_fields=host_updates.keys()) def update_group_computed_fields(self): ''' @@ -264,16 +267,19 @@ class Inventory(CommonModel): if group_updates['has_active_failures']: failed_group_pks.add(group_pk) - # Now apply updates to each group as needed. - for group in self.groups.filter(pk__in=groups_to_update.keys()): - group_updates = groups_to_update[group.pk] - for field, value in group_updates.items(): - if getattr(group, field) != value: - setattr(group, field, value) - else: - group_updates.pop(field) - if group_updates: - group.save(update_fields=group_updates.keys()) + # Now apply updates to each group as needed (in batches). + all_update_pks = groups_to_update.keys() + for offset in xrange(0, len(all_update_pks), 500): + update_pks = all_update_pks[offset:(offset + 500)] + for group in self.groups.filter(pk__in=update_pks): + group_updates = groups_to_update[group.pk] + for field, value in group_updates.items(): + if getattr(group, field) != value: + setattr(group, field, value) + else: + group_updates.pop(field) + if group_updates: + group.save(update_fields=group_updates.keys()) def update_computed_fields(self, update_groups=True, update_hosts=True): '''