2020-02-13 14:06:17 +08:00
// Copyright 2019 The Gitea Authors. All rights reserved.
2022-11-27 13:20:29 -05:00
// SPDX-License-Identifier: MIT
2020-02-13 14:06:17 +08:00
package issues
import (
"context"
"errors"
"fmt"
2022-01-27 10:30:51 +02:00
"net"
2020-02-13 14:06:17 +08:00
"strconv"
2022-01-27 10:30:51 +02:00
"sync"
2020-02-13 14:06:17 +08:00
"time"
2022-01-27 10:30:51 +02:00
"code.gitea.io/gitea/modules/graceful"
2020-02-13 14:06:17 +08:00
"code.gitea.io/gitea/modules/log"
"github.com/olivere/elastic/v7"
)
2022-01-20 18:46:10 +01:00
var _ Indexer = & ElasticSearchIndexer { }
2020-02-13 14:06:17 +08:00
// ElasticSearchIndexer implements Indexer interface
type ElasticSearchIndexer struct {
Rewrite queue (#24505)
# ⚠️ Breaking
Many deprecated queue config options are removed (actually, they should
have been removed in 1.18/1.19).
If you see the fatal message when starting Gitea: "Please update your
app.ini to remove deprecated config options", please follow the error
messages to remove these options from your app.ini.
Example:
```
2023/05/06 19:39:22 [E] Removed queue option: `[indexer].ISSUE_INDEXER_QUEUE_TYPE`. Use new options in `[queue.issue_indexer]`
2023/05/06 19:39:22 [E] Removed queue option: `[indexer].UPDATE_BUFFER_LEN`. Use new options in `[queue.issue_indexer]`
2023/05/06 19:39:22 [F] Please update your app.ini to remove deprecated config options
```
Many options in `[queue]` are are dropped, including:
`WRAP_IF_NECESSARY`, `MAX_ATTEMPTS`, `TIMEOUT`, `WORKERS`,
`BLOCK_TIMEOUT`, `BOOST_TIMEOUT`, `BOOST_WORKERS`, they can be removed
from app.ini.
# The problem
The old queue package has some legacy problems:
* complexity: I doubt few people could tell how it works.
* maintainability: Too many channels and mutex/cond are mixed together,
too many different structs/interfaces depends each other.
* stability: due to the complexity & maintainability, sometimes there
are strange bugs and difficult to debug, and some code doesn't have test
(indeed some code is difficult to test because a lot of things are mixed
together).
* general applicability: although it is called "queue", its behavior is
not a well-known queue.
* scalability: it doesn't seem easy to make it work with a cluster
without breaking its behaviors.
It came from some very old code to "avoid breaking", however, its
technical debt is too heavy now. It's a good time to introduce a better
"queue" package.
# The new queue package
It keeps using old config and concept as much as possible.
* It only contains two major kinds of concepts:
* The "base queue": channel, levelqueue, redis
* They have the same abstraction, the same interface, and they are
tested by the same testing code.
* The "WokerPoolQueue", it uses the "base queue" to provide "worker
pool" function, calls the "handler" to process the data in the base
queue.
* The new code doesn't do "PushBack"
* Think about a queue with many workers, the "PushBack" can't guarantee
the order for re-queued unhandled items, so in new code it just does
"normal push"
* The new code doesn't do "pause/resume"
* The "pause/resume" was designed to handle some handler's failure: eg:
document indexer (elasticsearch) is down
* If a queue is paused for long time, either the producers blocks or the
new items are dropped.
* The new code doesn't do such "pause/resume" trick, it's not a common
queue's behavior and it doesn't help much.
* If there are unhandled items, the "push" function just blocks for a
few seconds and then re-queue them and retry.
* The new code doesn't do "worker booster"
* Gitea's queue's handlers are light functions, the cost is only the
go-routine, so it doesn't make sense to "boost" them.
* The new code only use "max worker number" to limit the concurrent
workers.
* The new "Push" never blocks forever
* Instead of creating more and more blocking goroutines, return an error
is more friendly to the server and to the end user.
There are more details in code comments: eg: the "Flush" problem, the
strange "code.index" hanging problem, the "immediate" queue problem.
Almost ready for review.
TODO:
* [x] add some necessary comments during review
* [x] add some more tests if necessary
* [x] update documents and config options
* [x] test max worker / active worker
* [x] re-run the CI tasks to see whether any test is flaky
* [x] improve the `handleOldLengthConfiguration` to provide more
friendly messages
* [x] fine tune default config values (eg: length?)
## Code coverage:
![image](https://user-images.githubusercontent.com/2114189/236620635-55576955-f95d-4810-b12f-879026a3afdf.png)
2023-05-08 19:49:59 +08:00
client * elastic . Client
indexerName string
available bool
stopTimer chan struct { }
lock sync . RWMutex
2020-02-13 14:06:17 +08:00
}
type elasticLogger struct {
2020-10-31 05:36:46 +00:00
log . LevelLogger
2020-02-13 14:06:17 +08:00
}
func ( l elasticLogger ) Printf ( format string , args ... interface { } ) {
2020-10-31 05:36:46 +00:00
_ = l . Log ( 2 , l . GetLevel ( ) , format , args ... )
2020-02-13 14:06:17 +08:00
}
// NewElasticSearchIndexer creates a new elasticsearch indexer
func NewElasticSearchIndexer ( url , indexerName string ) ( * ElasticSearchIndexer , error ) {
opts := [ ] elastic . ClientOptionFunc {
elastic . SetURL ( url ) ,
elastic . SetSniff ( false ) ,
elastic . SetHealthcheckInterval ( 10 * time . Second ) ,
elastic . SetGzip ( false ) ,
}
logger := elasticLogger { log . GetLogger ( log . DEFAULT ) }
if logger . GetLevel ( ) == log . TRACE || logger . GetLevel ( ) == log . DEBUG {
opts = append ( opts , elastic . SetTraceLog ( logger ) )
} else if logger . GetLevel ( ) == log . ERROR || logger . GetLevel ( ) == log . CRITICAL || logger . GetLevel ( ) == log . FATAL {
opts = append ( opts , elastic . SetErrorLog ( logger ) )
} else if logger . GetLevel ( ) == log . INFO || logger . GetLevel ( ) == log . WARN {
opts = append ( opts , elastic . SetInfoLog ( logger ) )
}
client , err := elastic . NewClient ( opts ... )
if err != nil {
return nil , err
}
2022-01-27 10:30:51 +02:00
indexer := & ElasticSearchIndexer {
2020-02-13 14:06:17 +08:00
client : client ,
indexerName : indexerName ,
2022-01-27 10:30:51 +02:00
available : true ,
stopTimer : make ( chan struct { } ) ,
}
ticker := time . NewTicker ( 10 * time . Second )
go func ( ) {
for {
select {
case <- ticker . C :
indexer . checkAvailability ( )
case <- indexer . stopTimer :
ticker . Stop ( )
return
}
}
} ( )
return indexer , nil
2020-02-13 14:06:17 +08:00
}
const (
defaultMapping = ` {
"mappings" : {
"properties" : {
"id" : {
"type" : "integer" ,
"index" : true
} ,
"repo_id" : {
"type" : "integer" ,
"index" : true
} ,
"title" : {
"type" : "text" ,
"index" : true
} ,
"content" : {
"type" : "text" ,
"index" : true
} ,
"comments" : {
"type" : "text" ,
"index" : true
}
}
}
} `
)
// Init will initialize the indexer
func ( b * ElasticSearchIndexer ) Init ( ) ( bool , error ) {
2022-01-27 10:30:51 +02:00
ctx := graceful . GetManager ( ) . HammerContext ( )
2020-02-13 14:06:17 +08:00
exists , err := b . client . IndexExists ( b . indexerName ) . Do ( ctx )
if err != nil {
2022-01-27 10:30:51 +02:00
return false , b . checkError ( err )
2020-02-13 14:06:17 +08:00
}
if ! exists {
2022-01-20 18:46:10 +01:00
mapping := defaultMapping
2020-02-13 14:06:17 +08:00
createIndex , err := b . client . CreateIndex ( b . indexerName ) . BodyString ( mapping ) . Do ( ctx )
if err != nil {
2022-01-27 10:30:51 +02:00
return false , b . checkError ( err )
2020-02-13 14:06:17 +08:00
}
if ! createIndex . Acknowledged {
return false , errors . New ( "init failed" )
}
return false , nil
}
return true , nil
}
2022-01-27 10:30:51 +02:00
// Ping checks if elastic is available
func ( b * ElasticSearchIndexer ) Ping ( ) bool {
b . lock . RLock ( )
defer b . lock . RUnlock ( )
return b . available
}
2020-02-13 14:06:17 +08:00
// Index will save the index data
func ( b * ElasticSearchIndexer ) Index ( issues [ ] * IndexerData ) error {
if len ( issues ) == 0 {
return nil
} else if len ( issues ) == 1 {
issue := issues [ 0 ]
_ , err := b . client . Index ( ) .
Index ( b . indexerName ) .
Id ( fmt . Sprintf ( "%d" , issue . ID ) ) .
BodyJson ( map [ string ] interface { } {
"id" : issue . ID ,
"repo_id" : issue . RepoID ,
"title" : issue . Title ,
"content" : issue . Content ,
"comments" : issue . Comments ,
} ) .
2022-01-27 10:30:51 +02:00
Do ( graceful . GetManager ( ) . HammerContext ( ) )
return b . checkError ( err )
2020-02-13 14:06:17 +08:00
}
reqs := make ( [ ] elastic . BulkableRequest , 0 )
for _ , issue := range issues {
reqs = append ( reqs ,
elastic . NewBulkIndexRequest ( ) .
Index ( b . indexerName ) .
Id ( fmt . Sprintf ( "%d" , issue . ID ) ) .
Doc ( map [ string ] interface { } {
"id" : issue . ID ,
"repo_id" : issue . RepoID ,
"title" : issue . Title ,
"content" : issue . Content ,
"comments" : issue . Comments ,
} ) ,
)
}
_ , err := b . client . Bulk ( ) .
Index ( b . indexerName ) .
Add ( reqs ... ) .
2022-01-27 10:30:51 +02:00
Do ( graceful . GetManager ( ) . HammerContext ( ) )
return b . checkError ( err )
2020-02-13 14:06:17 +08:00
}
// Delete deletes indexes by ids
func ( b * ElasticSearchIndexer ) Delete ( ids ... int64 ) error {
if len ( ids ) == 0 {
return nil
} else if len ( ids ) == 1 {
_ , err := b . client . Delete ( ) .
Index ( b . indexerName ) .
Id ( fmt . Sprintf ( "%d" , ids [ 0 ] ) ) .
2022-01-27 10:30:51 +02:00
Do ( graceful . GetManager ( ) . HammerContext ( ) )
return b . checkError ( err )
2020-02-13 14:06:17 +08:00
}
reqs := make ( [ ] elastic . BulkableRequest , 0 )
for _ , id := range ids {
reqs = append ( reqs ,
elastic . NewBulkDeleteRequest ( ) .
Index ( b . indexerName ) .
Id ( fmt . Sprintf ( "%d" , id ) ) ,
)
}
_ , err := b . client . Bulk ( ) .
Index ( b . indexerName ) .
Add ( reqs ... ) .
2022-01-27 10:30:51 +02:00
Do ( graceful . GetManager ( ) . HammerContext ( ) )
return b . checkError ( err )
2020-02-13 14:06:17 +08:00
}
// Search searches for issues by given conditions.
// Returns the matching issue IDs
2022-01-27 10:30:51 +02:00
func ( b * ElasticSearchIndexer ) Search ( ctx context . Context , keyword string , repoIDs [ ] int64 , limit , start int ) ( * SearchResult , error ) {
2020-02-13 14:06:17 +08:00
kwQuery := elastic . NewMultiMatchQuery ( keyword , "title" , "content" , "comments" )
query := elastic . NewBoolQuery ( )
query = query . Must ( kwQuery )
if len ( repoIDs ) > 0 {
2022-01-20 18:46:10 +01:00
repoStrs := make ( [ ] interface { } , 0 , len ( repoIDs ) )
2020-02-13 14:06:17 +08:00
for _ , repoID := range repoIDs {
repoStrs = append ( repoStrs , repoID )
}
repoQuery := elastic . NewTermsQuery ( "repo_id" , repoStrs ... )
query = query . Must ( repoQuery )
}
searchResult , err := b . client . Search ( ) .
Index ( b . indexerName ) .
Query ( query ) .
2021-01-16 06:55:17 +02:00
Sort ( "_score" , false ) .
2020-02-13 14:06:17 +08:00
From ( start ) . Size ( limit ) .
2022-01-27 10:30:51 +02:00
Do ( ctx )
2020-02-13 14:06:17 +08:00
if err != nil {
2022-01-27 10:30:51 +02:00
return nil , b . checkError ( err )
2020-02-13 14:06:17 +08:00
}
hits := make ( [ ] Match , 0 , limit )
for _ , hit := range searchResult . Hits . Hits {
id , _ := strconv . ParseInt ( hit . Id , 10 , 64 )
hits = append ( hits , Match {
ID : id ,
} )
}
return & SearchResult {
Total : searchResult . TotalHits ( ) ,
Hits : hits ,
} , nil
}
// Close implements indexer
2022-01-27 10:30:51 +02:00
func ( b * ElasticSearchIndexer ) Close ( ) {
select {
case <- b . stopTimer :
default :
close ( b . stopTimer )
}
}
func ( b * ElasticSearchIndexer ) checkError ( err error ) error {
var opErr * net . OpError
if ! ( elastic . IsConnErr ( err ) || ( errors . As ( err , & opErr ) && ( opErr . Op == "dial" || opErr . Op == "read" ) ) ) {
return err
}
b . setAvailability ( false )
return err
}
func ( b * ElasticSearchIndexer ) checkAvailability ( ) {
if b . Ping ( ) {
return
}
// Request cluster state to check if elastic is available again
_ , err := b . client . ClusterState ( ) . Do ( graceful . GetManager ( ) . ShutdownContext ( ) )
if err != nil {
b . setAvailability ( false )
return
}
b . setAvailability ( true )
}
func ( b * ElasticSearchIndexer ) setAvailability ( available bool ) {
b . lock . Lock ( )
defer b . lock . Unlock ( )
if b . available == available {
return
}
b . available = available
}