2019-07-31 11:12:43 -04:00
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package promql
import (
2020-01-27 23:29:44 +01:00
"context"
2019-07-31 11:12:43 -04:00
"encoding/json"
2024-05-27 17:14:17 +02:00
"errors"
2024-04-29 16:16:51 +02:00
"fmt"
2022-02-27 20:49:33 +08:00
"io"
2024-09-09 21:41:53 -04:00
"log/slog"
2019-07-31 11:12:43 -04:00
"os"
"path/filepath"
"strings"
"time"
"unicode/utf8"
2019-08-24 09:38:48 +08:00
"github.com/edsrzf/mmap-go"
2019-07-31 11:12:43 -04:00
)
type ActiveQueryTracker struct {
2024-09-11 06:32:03 +10:00
mmappedFile [ ] byte
2020-01-28 21:38:49 +01:00
getNextIndex chan int
2024-09-09 21:41:53 -04:00
logger * slog . Logger
2022-02-27 20:49:33 +08:00
closer io . Closer
2020-01-28 21:38:49 +01:00
maxConcurrent int
2019-07-31 11:12:43 -04:00
}
2024-04-29 16:16:51 +02:00
var _ io . Closer = & ActiveQueryTracker { }
2019-07-31 11:12:43 -04:00
type Entry struct {
Query string ` json:"query" `
Timestamp int64 ` json:"timestamp_sec" `
}
const (
entrySize int = 1000
)
2020-08-02 16:48:57 +08:00
func parseBrokenJSON ( brokenJSON [ ] byte ) ( string , bool ) {
2020-01-09 22:11:39 +01:00
queries := strings . ReplaceAll ( string ( brokenJSON ) , "\x00" , "" )
2019-10-18 02:21:58 +03:00
if len ( queries ) > 0 {
queries = queries [ : len ( queries ) - 1 ] + "]"
}
2019-07-31 11:12:43 -04:00
// Conditional because of implementation detail: len() = 1 implies file consisted of a single char: '['.
2019-10-18 02:21:58 +03:00
if len ( queries ) <= 1 {
2020-08-02 16:48:57 +08:00
return "[]" , false
2019-07-31 11:12:43 -04:00
}
2020-08-02 16:48:57 +08:00
return queries , true
2019-07-31 11:12:43 -04:00
}
2024-09-09 21:41:53 -04:00
func logUnfinishedQueries ( filename string , filesize int , logger * slog . Logger ) {
2019-07-31 11:12:43 -04:00
if _ , err := os . Stat ( filename ) ; err == nil {
fd , err := os . Open ( filename )
if err != nil {
2024-09-09 21:41:53 -04:00
logger . Error ( "Failed to open query log file" , "err" , err )
2019-07-31 11:12:43 -04:00
return
}
2022-08-17 17:57:27 +08:00
defer fd . Close ( )
2019-07-31 11:12:43 -04:00
2020-01-09 22:11:39 +01:00
brokenJSON := make ( [ ] byte , filesize )
_ , err = fd . Read ( brokenJSON )
2019-07-31 11:12:43 -04:00
if err != nil {
2024-09-09 21:41:53 -04:00
logger . Error ( "Failed to read query log file" , "err" , err )
2019-07-31 11:12:43 -04:00
return
}
2020-08-02 16:48:57 +08:00
queries , queriesExist := parseBrokenJSON ( brokenJSON )
2019-07-31 11:12:43 -04:00
if ! queriesExist {
return
}
2024-09-09 21:41:53 -04:00
logger . Info ( "These queries didn't finish in prometheus' last run:" , "queries" , queries )
2019-07-31 11:12:43 -04:00
}
}
2024-09-11 06:32:03 +10:00
type mmappedFile struct {
2024-04-29 16:16:51 +02:00
f io . Closer
m mmap . MMap
}
2024-09-11 06:32:03 +10:00
func ( f * mmappedFile ) Close ( ) error {
2024-04-29 16:16:51 +02:00
err := f . m . Unmap ( )
if err != nil {
2024-09-11 06:32:03 +10:00
err = fmt . Errorf ( "mmappedFile: unmapping: %w" , err )
2024-04-29 16:16:51 +02:00
}
2024-05-27 17:14:17 +02:00
if fErr := f . f . Close ( ) ; fErr != nil {
2024-09-11 06:32:03 +10:00
return errors . Join ( fmt . Errorf ( "close mmappedFile.f: %w" , fErr ) , err )
2024-05-27 17:14:17 +02:00
}
return err
2024-04-29 16:16:51 +02:00
}
2024-09-09 21:41:53 -04:00
func getMMappedFile ( filename string , filesize int , logger * slog . Logger ) ( [ ] byte , io . Closer , error ) {
2021-10-22 10:06:44 +02:00
file , err := os . OpenFile ( filename , os . O_CREATE | os . O_RDWR | os . O_TRUNC , 0 o666 )
2019-07-31 11:12:43 -04:00
if err != nil {
2022-02-16 17:43:15 +01:00
absPath , pathErr := filepath . Abs ( filename )
if pathErr != nil {
absPath = filename
}
2024-09-09 21:41:53 -04:00
logger . Error ( "Error opening query log file" , "file" , absPath , "err" , err )
2022-02-27 20:49:33 +08:00
return nil , nil , err
2019-07-31 11:12:43 -04:00
}
err = file . Truncate ( int64 ( filesize ) )
if err != nil {
2024-04-30 16:47:10 +08:00
file . Close ( )
2024-09-09 21:41:53 -04:00
logger . Error ( "Error setting filesize." , "filesize" , filesize , "err" , err )
2022-02-27 20:49:33 +08:00
return nil , nil , err
2019-07-31 11:12:43 -04:00
}
fileAsBytes , err := mmap . Map ( file , mmap . RDWR , 0 )
if err != nil {
2024-04-30 16:47:10 +08:00
file . Close ( )
2024-09-09 21:41:53 -04:00
logger . Error ( "Failed to mmap" , "file" , filename , "Attempted size" , filesize , "err" , err )
2022-02-27 20:49:33 +08:00
return nil , nil , err
2019-07-31 11:12:43 -04:00
}
2024-09-11 06:32:03 +10:00
return fileAsBytes , & mmappedFile { f : file , m : fileAsBytes } , err
2019-07-31 11:12:43 -04:00
}
2024-09-09 21:41:53 -04:00
func NewActiveQueryTracker ( localStoragePath string , maxConcurrent int , logger * slog . Logger ) * ActiveQueryTracker {
2021-10-22 10:06:44 +02:00
err := os . MkdirAll ( localStoragePath , 0 o777 )
2019-07-31 11:12:43 -04:00
if err != nil {
2024-09-09 21:41:53 -04:00
logger . Error ( "Failed to create directory for logging active queries" )
2019-07-31 11:12:43 -04:00
}
2020-01-28 21:38:49 +01:00
filename , filesize := filepath . Join ( localStoragePath , "queries.active" ) , 1 + maxConcurrent * entrySize
2019-07-31 11:12:43 -04:00
logUnfinishedQueries ( filename , filesize , logger )
2024-09-11 06:32:03 +10:00
fileAsBytes , closer , err := getMMappedFile ( filename , filesize , logger )
2019-07-31 11:12:43 -04:00
if err != nil {
panic ( "Unable to create mmap-ed active query log" )
}
copy ( fileAsBytes , "[" )
activeQueryTracker := ActiveQueryTracker {
2024-09-11 06:32:03 +10:00
mmappedFile : fileAsBytes ,
2022-02-27 20:49:33 +08:00
closer : closer ,
2020-01-28 21:38:49 +01:00
getNextIndex : make ( chan int , maxConcurrent ) ,
logger : logger ,
maxConcurrent : maxConcurrent ,
2019-07-31 11:12:43 -04:00
}
2020-01-28 21:38:49 +01:00
activeQueryTracker . generateIndices ( maxConcurrent )
2019-07-31 11:12:43 -04:00
return & activeQueryTracker
}
func trimStringByBytes ( str string , size int ) string {
bytesStr := [ ] byte ( str )
trimIndex := len ( bytesStr )
if size < len ( bytesStr ) {
for ! utf8 . RuneStart ( bytesStr [ size ] ) {
2019-08-24 09:38:48 +08:00
size --
2019-07-31 11:12:43 -04:00
}
trimIndex = size
}
return string ( bytesStr [ : trimIndex ] )
}
2024-09-09 21:41:53 -04:00
func _newJSONEntry ( query string , timestamp int64 , logger * slog . Logger ) [ ] byte {
2019-07-31 11:12:43 -04:00
entry := Entry { query , timestamp }
jsonEntry , err := json . Marshal ( entry )
if err != nil {
2024-09-09 21:41:53 -04:00
logger . Error ( "Cannot create json of query" , "query" , query )
2019-07-31 11:12:43 -04:00
return [ ] byte { }
}
return jsonEntry
}
2024-09-09 21:41:53 -04:00
func newJSONEntry ( query string , logger * slog . Logger ) [ ] byte {
2019-07-31 11:12:43 -04:00
timestamp := time . Now ( ) . Unix ( )
2020-01-09 22:11:39 +01:00
minEntryJSON := _newJSONEntry ( "" , timestamp , logger )
2019-07-31 11:12:43 -04:00
2020-01-09 22:11:39 +01:00
query = trimStringByBytes ( query , entrySize - ( len ( minEntryJSON ) + 1 ) )
jsonEntry := _newJSONEntry ( query , timestamp , logger )
2019-07-31 11:12:43 -04:00
return jsonEntry
}
2020-01-28 21:38:49 +01:00
func ( tracker ActiveQueryTracker ) generateIndices ( maxConcurrent int ) {
for i := 0 ; i < maxConcurrent ; i ++ {
2019-07-31 11:12:43 -04:00
tracker . getNextIndex <- 1 + ( i * entrySize )
}
}
2020-01-28 21:38:49 +01:00
func ( tracker ActiveQueryTracker ) GetMaxConcurrent ( ) int {
return tracker . maxConcurrent
}
2019-07-31 11:12:43 -04:00
func ( tracker ActiveQueryTracker ) Delete ( insertIndex int ) {
2024-09-11 06:32:03 +10:00
copy ( tracker . mmappedFile [ insertIndex : ] , strings . Repeat ( "\x00" , entrySize ) )
2019-07-31 11:12:43 -04:00
tracker . getNextIndex <- insertIndex
}
2020-01-27 23:29:44 +01:00
func ( tracker ActiveQueryTracker ) Insert ( ctx context . Context , query string ) ( int , error ) {
select {
case i := <- tracker . getNextIndex :
2024-09-11 06:32:03 +10:00
fileBytes := tracker . mmappedFile
2020-01-27 23:29:44 +01:00
entry := newJSONEntry ( query , tracker . logger )
start , end := i , i + entrySize
copy ( fileBytes [ start : ] , entry )
copy ( fileBytes [ end - 1 : ] , "," )
return i , nil
case <- ctx . Done ( ) :
return 0 , ctx . Err ( )
}
2019-07-31 11:12:43 -04:00
}
2022-02-27 20:49:33 +08:00
2024-04-29 16:16:51 +02:00
// Close closes tracker.
func ( tracker * ActiveQueryTracker ) Close ( ) error {
2022-02-27 20:49:33 +08:00
if tracker == nil || tracker . closer == nil {
2024-04-29 16:16:51 +02:00
return nil
}
if err := tracker . closer . Close ( ) ; err != nil {
return fmt . Errorf ( "close ActiveQueryTracker.closer: %w" , err )
2022-02-27 20:49:33 +08:00
}
2024-04-29 16:16:51 +02:00
return nil
2022-02-27 20:49:33 +08:00
}