2020-05-07 22:49:00 +01:00
// Copyright 2020 The Gitea Authors. All rights reserved.
2022-11-27 13:20:29 -05:00
// SPDX-License-Identifier: MIT
2020-05-07 22:49:00 +01:00
package eventsource
import (
"context"
"time"
2022-08-25 10:31:57 +08:00
activities_model "code.gitea.io/gitea/models/activities"
2022-06-13 17:37:59 +08:00
issues_model "code.gitea.io/gitea/models/issues"
2020-05-07 22:49:00 +01:00
"code.gitea.io/gitea/modules/graceful"
2022-04-25 20:45:22 +00:00
"code.gitea.io/gitea/modules/json"
2020-05-07 22:49:00 +01:00
"code.gitea.io/gitea/modules/log"
2022-03-31 18:01:43 +01:00
"code.gitea.io/gitea/modules/process"
2020-05-07 22:49:00 +01:00
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
2022-12-29 03:57:15 +01:00
"code.gitea.io/gitea/services/convert"
2020-05-07 22:49:00 +01:00
)
// Init starts this eventsource
func ( m * Manager ) Init ( ) {
2020-07-03 10:55:36 +01:00
if setting . UI . Notification . EventSourceUpdateTime <= 0 {
return
}
2020-05-07 22:49:00 +01:00
go graceful . GetManager ( ) . RunWithShutdownContext ( m . Run )
}
// Run runs the manager within a provided context
func ( m * Manager ) Run ( ctx context . Context ) {
2022-03-31 18:01:43 +01:00
ctx , _ , finished := process . GetManager ( ) . AddTypedContext ( ctx , "Service: EventSource" , process . SystemProcessType , true )
defer finished ( )
2020-05-07 22:49:00 +01:00
then := timeutil . TimeStampNow ( ) . Add ( - 2 )
timer := time . NewTicker ( setting . UI . Notification . EventSourceUpdateTime )
loop :
for {
select {
case <- ctx . Done ( ) :
timer . Stop ( )
break loop
case <- timer . C :
2021-05-15 22:46:13 +01:00
m . mutex . Lock ( )
connectionCount := len ( m . messengers )
if connectionCount == 0 {
log . Trace ( "Event source has no listeners" )
// empty the connection channel
select {
case <- m . connection :
default :
}
}
m . mutex . Unlock ( )
if connectionCount == 0 {
// No listeners so the source can be paused
log . Trace ( "Pausing the eventsource" )
select {
case <- ctx . Done ( ) :
break loop
case <- m . connection :
log . Trace ( "Connection detected - restarting the eventsource" )
// OK we're back so lets reset the timer and start again
// We won't change the "then" time because there could be concurrency issues
select {
case <- timer . C :
default :
}
continue
}
}
2020-05-07 22:49:00 +01:00
now := timeutil . TimeStampNow ( ) . Add ( - 2 )
2022-08-25 10:31:57 +08:00
uidCounts , err := activities_model . GetUIDsAndNotificationCounts ( then , now )
2020-05-07 22:49:00 +01:00
if err != nil {
log . Error ( "Unable to get UIDcounts: %v" , err )
}
for _ , uidCount := range uidCounts {
m . SendMessage ( uidCount . UserID , & Event {
Name : "notification-count" ,
Data : uidCount ,
} )
}
then = now
2022-04-25 20:45:22 +00:00
if setting . Service . EnableTimetracking {
2022-06-13 17:37:59 +08:00
usersStopwatches , err := issues_model . GetUIDsAndStopwatch ( )
2022-04-25 20:45:22 +00:00
if err != nil {
log . Error ( "Unable to get GetUIDsAndStopwatch: %v" , err )
return
}
for _ , userStopwatches := range usersStopwatches {
apiSWs , err := convert . ToStopWatches ( userStopwatches . StopWatches )
if err != nil {
2022-06-17 22:47:15 +01:00
if ! issues_model . IsErrIssueNotExist ( err ) {
log . Error ( "Unable to APIFormat stopwatches: %v" , err )
}
2022-04-25 20:45:22 +00:00
continue
}
dataBs , err := json . Marshal ( apiSWs )
if err != nil {
log . Error ( "Unable to marshal stopwatches: %v" , err )
continue
}
m . SendMessage ( userStopwatches . UserID , & Event {
Name : "stopwatches" ,
Data : string ( dataBs ) ,
} )
}
}
2020-05-07 22:49:00 +01:00
}
}
m . UnregisterAll ( )
}