From 954966db580a298fcba1c5948ee1f42ff4708d07 Mon Sep 17 00:00:00 2001 From: restitux Date: Fri, 7 Apr 2023 17:40:49 -0600 Subject: [PATCH] Start/restart poll job when created or updated This currently contains the logic for restarting updated jobs, but nothing exercises this logic. The logic for starting polling for a newly created pipeline is implemented. --- admin_api/admin_api.go | 8 +++++--- listen/listen.go | 7 ++++++- main.go | 3 ++- poll/poll.go | 46 ++++++++++++++++++++++++++++++++++-------- 4 files changed, 51 insertions(+), 13 deletions(-) diff --git a/admin_api/admin_api.go b/admin_api/admin_api.go index 27847fe..ad294da 100644 --- a/admin_api/admin_api.go +++ b/admin_api/admin_api.go @@ -13,7 +13,7 @@ import ( var log = logging.MustGetLogger("cursorius-server") -func createSchema(db database.Database) (graphql.Schema, error) { +func createSchema(db database.Database, pollChan chan uuid.UUID) (graphql.Schema, error) { runnerType := graphql.NewObject(graphql.ObjectConfig{ Name: "Runner", Description: "A runner available for use inside of a pipeline.", @@ -454,6 +454,8 @@ func createSchema(db database.Database) (graphql.Schema, error) { return nil, err } + pollChan <- pipeline.Id + return pipeline, nil }, }, @@ -681,9 +683,9 @@ func createSchema(db database.Database) (graphql.Schema, error) { return schema, nil } -func CreateHandler(db database.Database, mux *http.ServeMux) error { +func CreateHandler(db database.Database, pollChan chan uuid.UUID, mux *http.ServeMux) error { - schema, err := createSchema(db) + schema, err := createSchema(db, pollChan) if err != nil { return err } diff --git a/listen/listen.go b/listen/listen.go index 4b1ba56..0970640 100644 --- a/listen/listen.go +++ b/listen/listen.go @@ -10,6 +10,8 @@ import ( "git.ohea.xyz/cursorius/server/pipeline_api" "git.ohea.xyz/cursorius/server/runnermanager" "git.ohea.xyz/cursorius/server/webhook" + + "github.com/google/uuid" "github.com/op/go-logging" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" @@ -23,13 +25,14 @@ func setupHTTPServer( conf config.PipelineConf, db database.Database, runnerManagerChans runnermanager.RunnerManagerChans, + pollChan chan uuid.UUID, ) error { webhook.CreateWebhookHandler(db, conf, mux) pipeline_api.CreateHandler(runnerManagerChans.Allocation, runnerManagerChans.Release, mux) - err := admin_api.CreateHandler(db, mux) + err := admin_api.CreateHandler(db, pollChan, mux) if err != nil { return fmt.Errorf("Could not create admin api handler: %w", err) } @@ -52,6 +55,7 @@ func Listen( conf config.PipelineConf, db database.Database, runnerManagerChans runnermanager.RunnerManagerChans, + pollChan chan uuid.UUID, ) error { err := setupHTTPServer( @@ -59,6 +63,7 @@ func Listen( conf, db, runnerManagerChans, + pollChan, ) if err != nil { return fmt.Errorf("Could not setup http endpoints: %w", err) diff --git a/main.go b/main.go index e6298da..e91a556 100644 --- a/main.go +++ b/main.go @@ -46,7 +46,7 @@ func main() { return } - _ = poll.StartPolling(configData.Config.PipelineConf, db) + pollChan := poll.StartPolling(configData.Config.PipelineConf, db) mux := http.NewServeMux() @@ -57,5 +57,6 @@ func main() { configData.Config.PipelineConf, db, runnerManagerChans, + pollChan, )) } diff --git a/poll/poll.go b/poll/poll.go index 27170f7..7d8a28a 100644 --- a/poll/poll.go +++ b/poll/poll.go @@ -1,17 +1,18 @@ package poll import ( + "context" "time" - "github.com/google/uuid" - "github.com/op/go-logging" - "git.ohea.xyz/cursorius/server/config" "git.ohea.xyz/cursorius/server/database" "git.ohea.xyz/cursorius/server/pipeline_executor" + "github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5/plumbing" "github.com/go-git/go-git/v5/storage/memory" + "github.com/google/uuid" + "github.com/op/go-logging" ) var log = logging.MustGetLogger("cursorius-server") @@ -26,13 +27,27 @@ type tag struct { commitHash string } -func pollJob(pipeline database.Pipeline, pipelineConf config.PipelineConf, db database.Database) { +func pollJob(ctx context.Context, pipeline database.Pipeline, pipelineConf config.PipelineConf, db database.Database) { firstScan := true for { // Don't sleep on first scan to ease testing // TODO: this should be replaced with a script that mocks a webhook if !firstScan { - time.Sleep(time.Duration(pipeline.PollInterval) * time.Second) + + ctx, cancel := context.WithTimeout(ctx, time.Duration(pipeline.PollInterval)*time.Second) + + select { + case <-ctx.Done(): + switch ctx.Err() { + case context.Canceled: + log.Infof("Polling for pipeline %v canceled, stopping", pipeline.Name) + cancel() + return + } + } + + cancel() + log.Infof("Polling repo %v", pipeline.Name) } else { firstScan = false @@ -136,11 +151,17 @@ func launchPollJobs(conf config.PipelineConf, db database.Database, pollChan cha return } + pipelineCancelations := make(map[uuid.UUID]context.CancelFunc) + for _, pipeline := range pipelines { if pipeline.PollInterval == 0 { continue } else { - go pollJob(pipeline, conf, db) + log.Infof("Starting polling for pipeline %v with id %v", pipeline.Name, pipeline.Id) + ctx, cancel := context.WithCancel(context.Background()) + pipelineCancelations[pipeline.Id] = cancel + + go pollJob(ctx, pipeline, conf, db) } } @@ -151,8 +172,17 @@ func launchPollJobs(conf config.PipelineConf, db database.Database, pollChan cha log.Errorf("Could not get pipeline with id \"%v\" from database: %v", err) continue } - // TODO: stop existing polling process for given uuid - go pollJob(pipeline, conf, db) + + // Cancel existing polling job if it exists + if cancelFunc, ok := pipelineCancelations[pipeline.Id]; ok { + cancelFunc() + } + + // Start new polling job + log.Infof("Starting polling for pipeline %v with id %v", pipeline.Name, pipeline.Id) + ctx, cancel := context.WithCancel(context.Background()) + pipelineCancelations[pipeline.Id] = cancel + go pollJob(ctx, pipeline, conf, db) } }