2019-11-02 01:51:22 +03:00
// Copyright 2019 The Gitea Authors. All rights reserved.
2022-11-27 21:20:29 +03:00
// SPDX-License-Identifier: MIT
2019-11-02 01:51:22 +03:00
package webhook
import (
2019-12-15 12:51:28 +03:00
"context"
2021-06-27 22:21:09 +03:00
"crypto/hmac"
"crypto/sha1"
2019-11-02 01:51:22 +03:00
"crypto/tls"
2021-06-27 22:21:09 +03:00
"encoding/hex"
2019-11-02 01:51:22 +03:00
"fmt"
2021-06-27 22:21:09 +03:00
"io"
2019-11-02 01:51:22 +03:00
"net/http"
"net/url"
"strings"
2019-11-09 00:25:53 +03:00
"sync"
2019-11-02 01:51:22 +03:00
"time"
2021-11-10 08:13:16 +03:00
webhook_model "code.gitea.io/gitea/models/webhook"
2019-12-15 12:51:28 +03:00
"code.gitea.io/gitea/modules/graceful"
2021-11-20 12:34:05 +03:00
"code.gitea.io/gitea/modules/hostmatcher"
2019-11-02 01:51:22 +03:00
"code.gitea.io/gitea/modules/log"
2022-11-23 17:10:04 +03:00
"code.gitea.io/gitea/modules/process"
2021-08-18 16:10:39 +03:00
"code.gitea.io/gitea/modules/proxy"
2022-04-25 21:03:01 +03:00
"code.gitea.io/gitea/modules/queue"
2019-11-02 01:51:22 +03:00
"code.gitea.io/gitea/modules/setting"
2023-05-04 02:53:43 +03:00
"code.gitea.io/gitea/modules/timeutil"
2023-01-01 18:23:15 +03:00
webhook_module "code.gitea.io/gitea/modules/webhook"
2021-11-10 08:13:16 +03:00
2019-11-09 00:25:53 +03:00
"github.com/gobwas/glob"
2023-02-22 22:21:46 +03:00
"github.com/minio/sha256-simd"
2019-11-02 01:51:22 +03:00
)
// Deliver deliver hook task
2022-03-31 20:01:43 +03:00
func Deliver ( ctx context . Context , t * webhook_model . HookTask ) error {
2021-11-10 08:13:16 +03:00
w , err := webhook_model . GetWebhookByID ( t . HookID )
2021-06-27 22:21:09 +03:00
if err != nil {
return err
}
2020-05-15 03:06:00 +03:00
defer func ( ) {
err := recover ( )
if err == nil {
return
}
// There was a panic whilst delivering a hook...
2022-11-23 17:10:04 +03:00
log . Error ( "PANIC whilst trying to deliver webhook task[%d] to webhook %s Panic: %v\nStacktrace: %s" , t . ID , w . URL , err , log . Stack ( 2 ) )
2020-05-15 03:06:00 +03:00
} ( )
2021-06-27 22:21:09 +03:00
2019-11-02 01:51:22 +03:00
t . IsDelivered = true
var req * http . Request
2021-06-27 22:21:09 +03:00
switch w . HTTPMethod {
2019-11-02 01:51:22 +03:00
case "" :
2022-11-23 17:10:04 +03:00
log . Info ( "HTTP Method for webhook %s empty, setting to POST as default" , w . URL )
2019-11-02 01:51:22 +03:00
fallthrough
case http . MethodPost :
2021-06-27 22:21:09 +03:00
switch w . ContentType {
2021-11-10 08:13:16 +03:00
case webhook_model . ContentTypeJSON :
2021-06-27 22:21:09 +03:00
req , err = http . NewRequest ( "POST" , w . URL , strings . NewReader ( t . PayloadContent ) )
2019-11-02 01:51:22 +03:00
if err != nil {
return err
}
req . Header . Set ( "Content-Type" , "application/json" )
2021-11-10 08:13:16 +03:00
case webhook_model . ContentTypeForm :
2022-01-20 20:46:10 +03:00
forms := url . Values {
2019-11-02 01:51:22 +03:00
"payload" : [ ] string { t . PayloadContent } ,
}
2021-06-27 22:21:09 +03:00
req , err = http . NewRequest ( "POST" , w . URL , strings . NewReader ( forms . Encode ( ) ) )
2019-11-02 01:51:22 +03:00
if err != nil {
return err
}
req . Header . Set ( "Content-Type" , "application/x-www-form-urlencoded" )
}
case http . MethodGet :
2021-06-27 22:21:09 +03:00
u , err := url . Parse ( w . URL )
2019-11-02 01:51:22 +03:00
if err != nil {
2022-11-23 17:10:04 +03:00
return fmt . Errorf ( "unable to deliver webhook task[%d] as cannot parse webhook url %s: %w" , t . ID , w . URL , err )
2019-11-02 01:51:22 +03:00
}
vals := u . Query ( )
vals [ "payload" ] = [ ] string { t . PayloadContent }
u . RawQuery = vals . Encode ( )
req , err = http . NewRequest ( "GET" , u . String ( ) , nil )
if err != nil {
2022-11-23 17:10:04 +03:00
return fmt . Errorf ( "unable to deliver webhook task[%d] as unable to create HTTP request for webhook url %s: %w" , t . ID , w . URL , err )
2019-11-02 01:51:22 +03:00
}
2020-07-31 01:04:19 +03:00
case http . MethodPut :
2021-06-27 22:21:09 +03:00
switch w . Type {
2023-01-01 18:23:15 +03:00
case webhook_module . MATRIX :
2022-11-03 21:23:20 +03:00
txnID , err := getMatrixTxnID ( [ ] byte ( t . PayloadContent ) )
if err != nil {
return err
}
url := fmt . Sprintf ( "%s/%s" , w . URL , url . PathEscape ( txnID ) )
req , err = http . NewRequest ( "PUT" , url , strings . NewReader ( t . PayloadContent ) )
2020-07-31 01:04:19 +03:00
if err != nil {
2022-11-23 17:10:04 +03:00
return fmt . Errorf ( "unable to deliver webhook task[%d] as cannot create matrix request for webhook url %s: %w" , t . ID , w . URL , err )
2020-07-31 01:04:19 +03:00
}
default :
2022-11-23 17:10:04 +03:00
return fmt . Errorf ( "invalid http method for webhook task[%d] in webhook %s: %v" , t . ID , w . URL , w . HTTPMethod )
2020-07-31 01:04:19 +03:00
}
2019-11-02 01:51:22 +03:00
default :
2022-11-23 17:10:04 +03:00
return fmt . Errorf ( "invalid http method for webhook task[%d] in webhook %s: %v" , t . ID , w . URL , w . HTTPMethod )
2021-06-27 22:21:09 +03:00
}
var signatureSHA1 string
var signatureSHA256 string
if len ( w . Secret ) > 0 {
sig1 := hmac . New ( sha1 . New , [ ] byte ( w . Secret ) )
sig256 := hmac . New ( sha256 . New , [ ] byte ( w . Secret ) )
_ , err = io . MultiWriter ( sig1 , sig256 ) . Write ( [ ] byte ( t . PayloadContent ) )
if err != nil {
log . Error ( "prepareWebhooks.sigWrite: %v" , err )
}
signatureSHA1 = hex . EncodeToString ( sig1 . Sum ( nil ) )
signatureSHA256 = hex . EncodeToString ( sig256 . Sum ( nil ) )
2019-11-02 01:51:22 +03:00
}
2021-10-05 20:12:17 +03:00
event := t . EventType . Event ( )
eventType := string ( t . EventType )
2019-11-02 01:51:22 +03:00
req . Header . Add ( "X-Gitea-Delivery" , t . UUID )
2021-10-05 20:12:17 +03:00
req . Header . Add ( "X-Gitea-Event" , event )
req . Header . Add ( "X-Gitea-Event-Type" , eventType )
2021-06-27 22:21:09 +03:00
req . Header . Add ( "X-Gitea-Signature" , signatureSHA256 )
2019-11-02 01:51:22 +03:00
req . Header . Add ( "X-Gogs-Delivery" , t . UUID )
2021-10-05 20:12:17 +03:00
req . Header . Add ( "X-Gogs-Event" , event )
req . Header . Add ( "X-Gogs-Event-Type" , eventType )
2021-06-27 22:21:09 +03:00
req . Header . Add ( "X-Gogs-Signature" , signatureSHA256 )
req . Header . Add ( "X-Hub-Signature" , "sha1=" + signatureSHA1 )
req . Header . Add ( "X-Hub-Signature-256" , "sha256=" + signatureSHA256 )
2019-11-02 01:51:22 +03:00
req . Header [ "X-GitHub-Delivery" ] = [ ] string { t . UUID }
2021-10-05 20:12:17 +03:00
req . Header [ "X-GitHub-Event" ] = [ ] string { event }
req . Header [ "X-GitHub-Event-Type" ] = [ ] string { eventType }
2019-11-02 01:51:22 +03:00
2022-11-03 21:23:20 +03:00
// Add Authorization Header
authorization , err := w . HeaderAuthorization ( )
if err != nil {
log . Error ( "Webhook could not get Authorization header [%d]: %v" , w . ID , err )
return err
}
if authorization != "" {
req . Header [ "Authorization" ] = [ ] string { authorization }
}
2019-11-02 01:51:22 +03:00
// Record delivery information.
2021-11-10 08:13:16 +03:00
t . RequestInfo = & webhook_model . HookRequest {
2021-06-27 22:21:09 +03:00
URL : req . URL . String ( ) ,
HTTPMethod : req . Method ,
Headers : map [ string ] string { } ,
2019-11-02 01:51:22 +03:00
}
for k , vals := range req . Header {
t . RequestInfo . Headers [ k ] = strings . Join ( vals , "," )
}
2021-11-10 08:13:16 +03:00
t . ResponseInfo = & webhook_model . HookResponse {
2019-11-02 01:51:22 +03:00
Headers : map [ string ] string { } ,
}
2022-11-23 17:10:04 +03:00
// OK We're now ready to attempt to deliver the task - we must double check that it
// has not been delivered in the meantime
updated , err := webhook_model . MarkTaskDelivered ( ctx , t )
if err != nil {
log . Error ( "MarkTaskDelivered[%d]: %v" , t . ID , err )
return fmt . Errorf ( "unable to mark task[%d] delivered in the db: %w" , t . ID , err )
}
if ! updated {
// This webhook task has already been attempted to be delivered or is in the process of being delivered
log . Trace ( "Webhook Task[%d] already delivered" , t . ID )
return nil
}
// All code from this point will update the hook task
2019-11-02 01:51:22 +03:00
defer func ( ) {
2023-05-04 02:53:43 +03:00
t . Delivered = timeutil . TimeStampNanoNow ( )
2019-11-02 01:51:22 +03:00
if t . IsSucceed {
log . Trace ( "Hook delivered: %s" , t . UUID )
2022-03-28 06:17:21 +03:00
} else if ! w . IsActive {
log . Trace ( "Hook delivery skipped as webhook is inactive: %s" , t . UUID )
2019-11-02 01:51:22 +03:00
} else {
log . Trace ( "Hook delivery failed: %s" , t . UUID )
}
2021-11-10 08:13:16 +03:00
if err := webhook_model . UpdateHookTask ( t ) ; err != nil {
2019-11-02 01:51:22 +03:00
log . Error ( "UpdateHookTask [%d]: %v" , t . ID , err )
}
// Update webhook last delivery status.
if t . IsSucceed {
2023-01-01 18:23:15 +03:00
w . LastStatus = webhook_module . HookStatusSucceed
2019-11-02 01:51:22 +03:00
} else {
2023-01-01 18:23:15 +03:00
w . LastStatus = webhook_module . HookStatusFail
2019-11-02 01:51:22 +03:00
}
2021-11-10 08:13:16 +03:00
if err = webhook_model . UpdateWebhookLastStatus ( w ) ; err != nil {
2019-11-02 01:51:22 +03:00
log . Error ( "UpdateWebhookLastStatus: %v" , err )
return
}
} ( )
2021-02-11 20:34:34 +03:00
if setting . DisableWebhooks {
2021-11-20 12:34:05 +03:00
return fmt . Errorf ( "webhook task skipped (webhooks disabled): [%d]" , t . ID )
2021-02-11 20:34:34 +03:00
}
2022-03-28 06:17:21 +03:00
if ! w . IsActive {
2022-11-23 17:10:04 +03:00
log . Trace ( "Webhook %s in Webhook Task[%d] is not active" , w . URL , t . ID )
2022-03-28 06:17:21 +03:00
return nil
}
2022-03-31 20:01:43 +03:00
resp , err := webhookHTTPClient . Do ( req . WithContext ( ctx ) )
2019-11-02 01:51:22 +03:00
if err != nil {
t . ResponseInfo . Body = fmt . Sprintf ( "Delivery: %v" , err )
2022-11-23 17:10:04 +03:00
return fmt . Errorf ( "unable to deliver webhook task[%d] in %s due to error in http client: %w" , t . ID , w . URL , err )
2019-11-02 01:51:22 +03:00
}
defer resp . Body . Close ( )
// Status code is 20x can be seen as succeed.
t . IsSucceed = resp . StatusCode / 100 == 2
t . ResponseInfo . Status = resp . StatusCode
for k , vals := range resp . Header {
t . ResponseInfo . Headers [ k ] = strings . Join ( vals , "," )
}
2021-09-22 08:38:34 +03:00
p , err := io . ReadAll ( resp . Body )
2019-11-02 01:51:22 +03:00
if err != nil {
t . ResponseInfo . Body = fmt . Sprintf ( "read body: %s" , err )
2022-11-23 17:10:04 +03:00
return fmt . Errorf ( "unable to deliver webhook task[%d] in %s as unable to read response body: %w" , t . ID , w . URL , err )
2019-11-02 01:51:22 +03:00
}
t . ResponseInfo . Body = string ( p )
return nil
}
2019-11-09 00:25:53 +03:00
var (
webhookHTTPClient * http . Client
once sync . Once
hostMatchers [ ] glob . Glob
)
func webhookProxy ( ) func ( req * http . Request ) ( * url . URL , error ) {
if setting . Webhook . ProxyURL == "" {
2021-08-18 16:10:39 +03:00
return proxy . Proxy ( )
2019-11-09 00:25:53 +03:00
}
once . Do ( func ( ) {
for _ , h := range setting . Webhook . ProxyHosts {
if g , err := glob . Compile ( h ) ; err == nil {
hostMatchers = append ( hostMatchers , g )
} else {
log . Error ( "glob.Compile %s failed: %v" , h , err )
}
}
} )
return func ( req * http . Request ) ( * url . URL , error ) {
for _ , v := range hostMatchers {
if v . Match ( req . URL . Host ) {
return http . ProxyURL ( setting . Webhook . ProxyURLFixed ) ( req )
}
}
return http . ProxyFromEnvironment ( req )
}
}
2019-11-02 01:51:22 +03:00
2022-04-25 21:03:01 +03:00
// Init starts the hooks delivery thread
func Init ( ) error {
2019-11-02 01:51:22 +03:00
timeout := time . Duration ( setting . Webhook . DeliverTimeout ) * time . Second
2021-11-20 12:34:05 +03:00
allowedHostListValue := setting . Webhook . AllowedHostList
if allowedHostListValue == "" {
allowedHostListValue = hostmatcher . MatchBuiltinExternal
}
allowedHostMatcher := hostmatcher . ParseHostMatchList ( "webhook.ALLOWED_HOST_LIST" , allowedHostListValue )
2019-11-02 01:51:22 +03:00
webhookHTTPClient = & http . Client {
2021-11-01 11:39:52 +03:00
Timeout : timeout ,
2019-11-02 01:51:22 +03:00
Transport : & http . Transport {
TLSClientConfig : & tls . Config { InsecureSkipVerify : setting . Webhook . SkipTLSVerify } ,
2019-11-09 00:25:53 +03:00
Proxy : webhookProxy ( ) ,
2021-11-20 12:34:05 +03:00
DialContext : hostmatcher . NewDialContext ( "webhook" , allowedHostMatcher , nil ) ,
2019-11-02 01:51:22 +03:00
} ,
}
Rewrite queue (#24505)
# ⚠️ Breaking
Many deprecated queue config options are removed (actually, they should
have been removed in 1.18/1.19).
If you see the fatal message when starting Gitea: "Please update your
app.ini to remove deprecated config options", please follow the error
messages to remove these options from your app.ini.
Example:
```
2023/05/06 19:39:22 [E] Removed queue option: `[indexer].ISSUE_INDEXER_QUEUE_TYPE`. Use new options in `[queue.issue_indexer]`
2023/05/06 19:39:22 [E] Removed queue option: `[indexer].UPDATE_BUFFER_LEN`. Use new options in `[queue.issue_indexer]`
2023/05/06 19:39:22 [F] Please update your app.ini to remove deprecated config options
```
Many options in `[queue]` are are dropped, including:
`WRAP_IF_NECESSARY`, `MAX_ATTEMPTS`, `TIMEOUT`, `WORKERS`,
`BLOCK_TIMEOUT`, `BOOST_TIMEOUT`, `BOOST_WORKERS`, they can be removed
from app.ini.
# The problem
The old queue package has some legacy problems:
* complexity: I doubt few people could tell how it works.
* maintainability: Too many channels and mutex/cond are mixed together,
too many different structs/interfaces depends each other.
* stability: due to the complexity & maintainability, sometimes there
are strange bugs and difficult to debug, and some code doesn't have test
(indeed some code is difficult to test because a lot of things are mixed
together).
* general applicability: although it is called "queue", its behavior is
not a well-known queue.
* scalability: it doesn't seem easy to make it work with a cluster
without breaking its behaviors.
It came from some very old code to "avoid breaking", however, its
technical debt is too heavy now. It's a good time to introduce a better
"queue" package.
# The new queue package
It keeps using old config and concept as much as possible.
* It only contains two major kinds of concepts:
* The "base queue": channel, levelqueue, redis
* They have the same abstraction, the same interface, and they are
tested by the same testing code.
* The "WokerPoolQueue", it uses the "base queue" to provide "worker
pool" function, calls the "handler" to process the data in the base
queue.
* The new code doesn't do "PushBack"
* Think about a queue with many workers, the "PushBack" can't guarantee
the order for re-queued unhandled items, so in new code it just does
"normal push"
* The new code doesn't do "pause/resume"
* The "pause/resume" was designed to handle some handler's failure: eg:
document indexer (elasticsearch) is down
* If a queue is paused for long time, either the producers blocks or the
new items are dropped.
* The new code doesn't do such "pause/resume" trick, it's not a common
queue's behavior and it doesn't help much.
* If there are unhandled items, the "push" function just blocks for a
few seconds and then re-queue them and retry.
* The new code doesn't do "worker booster"
* Gitea's queue's handlers are light functions, the cost is only the
go-routine, so it doesn't make sense to "boost" them.
* The new code only use "max worker number" to limit the concurrent
workers.
* The new "Push" never blocks forever
* Instead of creating more and more blocking goroutines, return an error
is more friendly to the server and to the end user.
There are more details in code comments: eg: the "Flush" problem, the
strange "code.index" hanging problem, the "immediate" queue problem.
Almost ready for review.
TODO:
* [x] add some necessary comments during review
* [x] add some more tests if necessary
* [x] update documents and config options
* [x] test max worker / active worker
* [x] re-run the CI tasks to see whether any test is flaky
* [x] improve the `handleOldLengthConfiguration` to provide more
friendly messages
* [x] fine tune default config values (eg: length?)
## Code coverage:
![image](https://user-images.githubusercontent.com/2114189/236620635-55576955-f95d-4810-b12f-879026a3afdf.png)
2023-05-08 14:49:59 +03:00
hookQueue = queue . CreateUniqueQueue ( "webhook_sender" , handler )
2022-04-25 21:03:01 +03:00
if hookQueue == nil {
return fmt . Errorf ( "Unable to create webhook_sender Queue" )
}
go graceful . GetManager ( ) . RunWithShutdownFns ( hookQueue . Run )
2022-11-23 17:10:04 +03:00
go graceful . GetManager ( ) . RunWithShutdownContext ( populateWebhookSendingQueue )
return nil
}
func populateWebhookSendingQueue ( ctx context . Context ) {
ctx , _ , finished := process . GetManager ( ) . AddContext ( ctx , "Webhook: Populate sending queue" )
defer finished ( )
2022-10-21 19:21:56 +03:00
2022-11-23 17:10:04 +03:00
lowerID := int64 ( 0 )
for {
taskIDs , err := webhook_model . FindUndeliveredHookTaskIDs ( ctx , lowerID )
if err != nil {
log . Error ( "Unable to populate webhook queue as FindUndeliveredHookTaskIDs failed: %v" , err )
return
}
if len ( taskIDs ) == 0 {
return
}
lowerID = taskIDs [ len ( taskIDs ) - 1 ]
for _ , taskID := range taskIDs {
select {
case <- ctx . Done ( ) :
log . Warn ( "Shutdown before Webhook Sending queue finishing being populated" )
return
default :
}
if err := enqueueHookTask ( taskID ) ; err != nil {
log . Error ( "Unable to push HookTask[%d] to the Webhook Sending queue: %v" , taskID , err )
}
2022-10-21 19:21:56 +03:00
}
}
2019-11-02 01:51:22 +03:00
}