2020-01-21 21:27:53 +03:00
#! /usr/bin/env awx-python
#
# !!! READ BEFORE POINTING THIS AT YOUR FOOT !!!
#
# This script attempts to connect to an AWX database and insert (by default)
# a billion main_jobevent rows as screamingly fast as possible.
#
# tl;dr for best results, feed it high IOPS.
#
# this script exists *solely* for the purpose of generating *test* data very
# quickly; do *not* point this at a production installation or you *will* be
# very unhappy
#
# Before running this script, you should give postgres *GOBS* of memory
# and disk so it can create indexes and constraints as quickly as possible.
# In fact, it's probably not smart to attempt this on anything less than 8 core,
# 32GB of RAM, and tens of thousands of IOPS.
#
# Also, a billion events is a *lot* of data; make sure you've
# provisioned *at least* 750GB of disk space
#
# if you want this script to complete in a few hours, a good starting point
# is something like m5.4xlarge w/ 1TB provisioned IOPS SSD (io1)
#
import argparse
import datetime
import json
import multiprocessing
2020-02-18 16:55:06 +03:00
import pkg_resources
2020-01-21 21:27:53 +03:00
import subprocess
2020-02-18 16:55:06 +03:00
import sys
2020-01-21 21:27:53 +03:00
from io import StringIO
2020-02-18 16:55:06 +03:00
from time import time
from random import randint
2020-01-21 21:27:53 +03:00
from uuid import uuid4
import psycopg2
2020-02-18 16:55:06 +03:00
from django import setup as setup_django
from django . db import connection
from django . db . models . sql import InsertQuery
from django . utils . timezone import now
2020-01-21 21:27:53 +03:00
db = json . loads (
subprocess . check_output (
[ ' awx-manage ' , ' print_settings ' , ' DATABASES ' , ' --format ' , ' json ' ]
)
)
name = db [ ' DATABASES ' ] [ ' default ' ] [ ' NAME ' ]
user = db [ ' DATABASES ' ] [ ' default ' ] [ ' USER ' ]
pw = db [ ' DATABASES ' ] [ ' default ' ] [ ' PASSWORD ' ]
host = db [ ' DATABASES ' ] [ ' default ' ] [ ' HOST ' ]
dsn = f ' dbname= { name } user= { user } password= { pw } host= { host } '
u = str ( uuid4 ( ) )
2020-02-21 04:53:11 +03:00
STATUS_OPTIONS = ( ' successful ' , ' failed ' , ' error ' , ' canceled ' )
2020-01-21 21:27:53 +03:00
class YieldedRows ( StringIO ) :
def __init__ ( self , job_id , rows , * args , * * kwargs ) :
self . rows = rows
self . row = " \t " . join ( [
" 2020-01-02 12:00:00 " ,
" 2020-01-02 12:00:01 " ,
" playbook_on_start " ,
" {} " ,
' false ' ,
' false ' ,
" localhost " ,
" Example Play " ,
" Hello World " ,
" " ,
" 0 " ,
" 1 " ,
job_id ,
u ,
" " ,
" 1 " ,
" hello_world.yml " ,
" 0 " ,
" X " ,
" 1 " ,
] ) + ' \n '
def read ( self , x ) :
if self . rows < = 0 :
self . close ( )
return ' '
self . rows - = 10000
return self . row * 10000
def firehose ( job , count ) :
conn = psycopg2 . connect ( dsn )
f = YieldedRows ( job , count )
with conn . cursor ( ) as cursor :
cursor . copy_expert ( (
' COPY '
' main_jobevent( '
' created, modified, event, event_data, failed, changed, '
' host_name, play, role, task, counter, host_id, job_id, uuid, '
' parent_uuid, end_line, playbook, start_line, stdout, verbosity '
' ) '
' FROM STDIN '
) , f , size = 1024 * 1000 )
conn . commit ( )
conn . close ( )
def cleanup ( sql ) :
print ( sql )
conn = psycopg2 . connect ( dsn )
with conn . cursor ( ) as cursor :
cursor . execute ( sql )
conn . commit ( )
conn . close ( )
2020-02-18 16:55:06 +03:00
def generate_jobs ( jobs ) :
print ( f ' inserting { jobs } job(s) ' )
sys . path . insert ( 0 , pkg_resources . get_distribution ( ' awx ' ) . module_path )
from awx import prepare_env
prepare_env ( )
setup_django ( )
from awx . main . models import UnifiedJob , Job , JobTemplate
2020-02-18 18:34:51 +03:00
fields = list ( set ( Job . _meta . fields ) - set ( UnifiedJob . _meta . fields ) )
job_field_names = set ( [ f . attname for f in fields ] )
2020-02-21 04:53:11 +03:00
# extra unified job field names from base class
for field_name in ( ' name ' , ' created_by_id ' , ' modified_by_id ' ) :
job_field_names . add ( field_name )
jt_count = JobTemplate . objects . count ( )
def make_batch ( N , jt_pos = 0 ) :
jt = None
while not jt :
try :
jt = JobTemplate . objects . all ( ) [ jt_pos % jt_count ]
except IndexError as e :
# seems to happen every now and then due to some race condition
print ( ' Warning: IndexError on {} JT, error: {} ' . format (
jt_pos % jt_count , e
) )
jt_pos + = 1
jt_defaults = dict (
( f . attname , getattr ( jt , f . attname ) )
for f in JobTemplate . _meta . get_fields ( )
if f . editable and f . attname in job_field_names and getattr ( jt , f . attname )
)
jt_defaults [ ' job_template_id ' ] = jt . pk
jt_defaults [ ' unified_job_template_id ' ] = jt . pk # populated by save method
2020-02-18 16:55:06 +03:00
2020-02-18 18:34:51 +03:00
jobs = [
2020-02-21 04:53:11 +03:00
Job (
status = STATUS_OPTIONS [ i % len ( STATUS_OPTIONS ) ] ,
started = now ( ) , created = now ( ) , modified = now ( ) , finished = now ( ) ,
elapsed = 0. , * * jt_defaults )
2020-02-18 18:34:51 +03:00
for i in range ( N )
]
2020-02-18 16:55:06 +03:00
ujs = UnifiedJob . objects . bulk_create ( jobs )
query = InsertQuery ( Job )
query . insert_values ( fields , ujs )
with connection . cursor ( ) as cursor :
query , params = query . sql_with_params ( ) [ 0 ]
cursor . execute ( query , params )
2020-02-21 04:53:11 +03:00
return ujs [ - 1 ] , jt_pos
2020-02-18 16:55:06 +03:00
i = 1
2020-02-21 04:53:11 +03:00
jt_pos = 0
s = time ( )
2020-02-18 16:55:06 +03:00
while jobs > 0 :
2020-02-21 04:53:11 +03:00
s_loop = time ( )
2020-02-18 16:55:06 +03:00
print ( ' running batch {} , runtime {} ' . format ( i , time ( ) - s ) )
2020-02-21 04:53:11 +03:00
created , jt_pos = make_batch ( min ( jobs , 1000 ) , jt_pos )
print ( ' took {} ' . format ( time ( ) - s_loop ) )
2020-02-18 16:55:06 +03:00
i + = 1
jobs - = 1000
return created
def generate_events ( events , job ) :
2020-01-21 21:27:53 +03:00
conn = psycopg2 . connect ( dsn )
cursor = conn . cursor ( )
print ( ' removing indexes and constraints ' )
# get all the indexes for main_jobevent
try :
# disable WAL to drastically increase write speed
# we're not doing replication, and the goal of this script is to just
# insert data as quickly as possible without concern for the risk of
# data loss on crash
# see: https://www.compose.com/articles/faster-performance-with-unlogged-tables-in-postgresql/
cursor . execute ( ' ALTER TABLE main_jobevent SET UNLOGGED ' )
cursor . execute ( " SELECT indexname, indexdef FROM pg_indexes WHERE tablename= ' main_jobevent ' AND indexname != ' main_jobevent_pkey ' ; " )
indexes = cursor . fetchall ( )
cursor . execute ( " SELECT conname, contype, pg_catalog.pg_get_constraintdef(r.oid, true) as condef FROM pg_catalog.pg_constraint r WHERE r.conrelid = ' main_jobevent ' ::regclass AND conname != ' main_jobevent_pkey ' ; " )
constraints = cursor . fetchall ( )
# drop all indexes for speed
for indexname , indexdef in indexes :
cursor . execute ( f ' DROP INDEX IF EXISTS { indexname } ' )
print ( f ' DROP INDEX IF EXISTS { indexname } ' )
for conname , contype , condef in constraints :
cursor . execute ( f ' ALTER TABLE main_jobevent DROP CONSTRAINT IF EXISTS { conname } ' )
print ( f ' ALTER TABLE main_jobevent DROP CONSTRAINT IF EXISTS { conname } ' )
conn . commit ( )
2020-02-18 16:55:06 +03:00
print ( f ' attaching { events } events to job { job } ' )
2020-01-21 21:27:53 +03:00
cores = multiprocessing . cpu_count ( )
workers = [ ]
for i in range ( cores ) :
2020-02-18 16:55:06 +03:00
p = multiprocessing . Process ( target = firehose , args = ( job , events / cores ) )
2020-01-21 21:27:53 +03:00
p . daemon = True
workers . append ( p )
for w in workers :
w . start ( )
for w in workers :
w . join ( )
workers = [ ]
2020-02-12 18:22:31 +03:00
print ( ' generating unique start/end line counts ' )
cursor . execute ( ' CREATE SEQUENCE IF NOT EXISTS firehose_seq; ' )
2020-02-18 19:31:05 +03:00
cursor . execute ( ' CREATE SEQUENCE IF NOT EXISTS firehose_line_seq MINVALUE 0; ' )
cursor . execute ( ' ALTER SEQUENCE firehose_seq RESTART WITH 1; ' )
cursor . execute ( ' ALTER SEQUENCE firehose_line_seq RESTART WITH 0; ' )
cursor . execute ( " SELECT nextval( ' firehose_line_seq ' ) " )
2020-02-12 18:22:31 +03:00
conn . commit ( )
cursor . execute (
" UPDATE main_jobevent SET "
" counter=nextval( ' firehose_seq ' )::integer, "
2020-02-18 19:31:05 +03:00
" start_line=nextval( ' firehose_line_seq ' )::integer, "
2020-02-20 00:15:09 +03:00
" end_line=currval( ' firehose_line_seq ' )::integer + 2 "
f " WHERE job_id= { job } "
2020-02-12 18:22:31 +03:00
)
conn . commit ( )
2020-01-21 21:27:53 +03:00
finally :
# restore all indexes
print ( datetime . datetime . utcnow ( ) . isoformat ( ) )
print ( ' restoring indexes and constraints (this may take awhile) ' )
workers = [ ]
for indexname , indexdef in indexes :
p = multiprocessing . Process ( target = cleanup , args = ( indexdef , ) )
p . daemon = True
workers . append ( p )
for w in workers :
w . start ( )
for w in workers :
w . join ( )
for conname , contype , condef in constraints :
if contype == ' c ' :
# if there are any check constraints, don't add them back
# (historically, these are > 0 checks, which are basically
# worthless, because Ansible doesn't emit counters, line
# numbers, verbosity, etc... < 0)
continue
sql = f ' ALTER TABLE main_jobevent ADD CONSTRAINT { conname } { condef } '
cleanup ( sql )
conn . close ( )
print ( datetime . datetime . utcnow ( ) . isoformat ( ) )
2020-02-18 16:55:06 +03:00
if __name__ == ' __main__ ' :
2020-02-21 04:53:11 +03:00
parser = argparse . ArgumentParser ( formatter_class = argparse . ArgumentDefaultsHelpFormatter )
parser . add_argument (
' --jobs ' , type = int , help = ' Number of jobs to create. ' ,
default = 1000000 ) # 1M by default
parser . add_argument (
' --events ' , type = int , help = ' Number of events to create. ' ,
default = 1000000000 ) # 1B by default
2020-02-18 16:55:06 +03:00
params = parser . parse_args ( )
jobs = params . jobs
events = params . events
print ( datetime . datetime . utcnow ( ) . isoformat ( ) )
created = generate_jobs ( jobs )
generate_events ( events , str ( created . pk ) )