From 4f7b315f54d7d93d20171c372f9d5d417c7daf72 Mon Sep 17 00:00:00 2001 From: restitux Date: Sat, 8 Apr 2023 16:28:00 -0600 Subject: [PATCH] Complete support for cron pipeline triggering --- admin_api/admin_api.go | 14 +++-- cron/cron.go | 121 +++++++++++++++++++++++++++++++++++++++++ database/func.go | 44 +++++++++++++-- database/types.go | 7 ++- go.mod | 1 + go.sum | 2 + listen/listen.go | 5 +- main.go | 4 ++ 8 files changed, 186 insertions(+), 12 deletions(-) create mode 100644 cron/cron.go diff --git a/admin_api/admin_api.go b/admin_api/admin_api.go index 5ffc8da..aa61bc2 100644 --- a/admin_api/admin_api.go +++ b/admin_api/admin_api.go @@ -13,7 +13,7 @@ import ( var log = logging.MustGetLogger("cursorius-server") -func createSchema(db database.Database, pollChan chan uuid.UUID) (graphql.Schema, error) { +func createSchema(db database.Database, pollChan chan uuid.UUID, cronChan chan uuid.UUID) (graphql.Schema, error) { runnerType := graphql.NewObject(graphql.ObjectConfig{ Name: "Runner", Description: "A runner available for use inside of a pipeline.", @@ -806,7 +806,7 @@ func createSchema(db database.Database, pollChan chan uuid.UUID) (graphql.Schema return nil, err } - err = db.AddCronForPipeline(pipelineId, cron, pattern) + cronObj, err := db.AddCronForPipeline(pipelineId, cron, pattern) if err != nil { return nil, err } @@ -815,6 +815,9 @@ func createSchema(db database.Database, pollChan chan uuid.UUID) (graphql.Schema if err != nil { return nil, err } + + cronChan <- cronObj.Id + return pipeline, nil }, }, @@ -850,6 +853,9 @@ func createSchema(db database.Database, pollChan chan uuid.UUID) (graphql.Schema if err != nil { return nil, err } + + cronChan <- cronId + return pipeline, nil }, }, @@ -867,9 +873,9 @@ func createSchema(db database.Database, pollChan chan uuid.UUID) (graphql.Schema return schema, nil } -func CreateHandler(db database.Database, pollChan chan uuid.UUID, mux *http.ServeMux) error { +func CreateHandler(db database.Database, pollChan chan uuid.UUID, cronChan chan uuid.UUID, mux *http.ServeMux) error { - schema, err := createSchema(db, pollChan) + schema, err := createSchema(db, pollChan, cronChan) if err != nil { return err } diff --git a/cron/cron.go b/cron/cron.go new file mode 100644 index 0000000..4758ad3 --- /dev/null +++ b/cron/cron.go @@ -0,0 +1,121 @@ +package cron + +import ( + "fmt" + "regexp" + + "git.ohea.xyz/cursorius/server/config" + "git.ohea.xyz/cursorius/server/database" + "git.ohea.xyz/cursorius/server/pipeline_executor" + + "github.com/google/uuid" + "github.com/op/go-logging" + "github.com/robfig/cron/v3" +) + +var log = logging.MustGetLogger("cursorius-server") + +func runPipeline(db database.Database, conf config.PipelineConf, pipelineId uuid.UUID, cron database.Cron) error { + run, err := db.CreateRun(pipelineId) + if err != nil { + return fmt.Errorf("Could not create run for pipeline with id %v: %w", pipelineId, err) + } + + refs, err := db.GetPipelineRefs(pipelineId) + if err != nil { + return fmt.Errorf("Could not get refs for pipeline with id %v: %w", pipelineId, err) + } + + useRef, err := regexp.Compile(cron.Pattern) + if err != nil { + return fmt.Errorf("Could not compile regex for cron %v for pipeline %v: %w", cron.Id, pipelineId, err) + } + + for ref := range refs { + if !useRef.MatchString(ref) { + log.Debugf("Skipping ref %v for pipeline %v as regex %v in cron %v doesn't match", ref, pipelineId, cron.Pattern, cron.Id) + continue + } + + pipeline, err := db.GetPipelineById(pipelineId) + if err != nil { + return fmt.Errorf("could not get pipeline with id %v from db: %w", pipelineId, err) + } + + pe := pipeline_executor.PipelineExecution{ + Pipeline: pipeline, + Ref: ref, + Run: run, + } + + go pipeline_executor.ExecutePipeline(pe, db, conf) + } + return nil +} + +func launchCrons(db database.Database, conf config.PipelineConf, updateChan chan uuid.UUID) { + pipelines, err := db.GetPipelines() + if err != nil { + log.Errorf("Could not get pipelines from database: %w", err) + return + } + + cronManager := cron.New() + + cronEntries := make(map[uuid.UUID]cron.EntryID) + + for _, pipeline := range pipelines { + crons, err := db.GetCronsForPipeline(pipeline.Id) + if err != nil { + log.Errorf("Could not get crons for pipeline with id \"%v\": %w", pipeline.Id, err) + return + } + + log.Infof("Starting crons for pipeline %v with id %v", pipeline.Name, pipeline.Id) + for _, cron := range crons { + cronEntries[cron.Id], err = cronManager.AddFunc(cron.Cron, func() { + log.Infof("Triggering cron with value \"%v\" for pipeline with id \"%v\"", cron.Cron, cron.PipelineId) + err := runPipeline(db, conf, pipeline.Id, cron) + if err != nil { + log.Errorf("Could not run pipeline with id \"%v\": %w", pipeline.Id, err) + } + }) + if err != nil { + log.Errorf("Could not configure cron for pipeline with id \"%v\": %w", pipeline.Id, err) + } + } + } + cronManager.Start() + + for { + cronUUID := <-updateChan + + if entryId, ok := cronEntries[cronUUID]; ok { + log.Infof("Canceling cron %v", cronUUID) + cronManager.Remove(entryId) + } + + cron, err := db.GetCronById(cronUUID) + // if cron no longer exists, don't try to restart it + // TODO: this squashes other DB errors + if err != nil { + continue + } + + log.Infof("Starting cron %v with value %v for pipeline %v", cron.Id, cron.Cron, cron.PipelineId) + + cronEntries[cron.Id], err = cronManager.AddFunc(cron.Cron, func() { + err := runPipeline(db, conf, cron.PipelineId, cron) + if err != nil { + log.Errorf("Could not setup run pipeline with id \"%v\": %w", cron.PipelineId, err) + } + }) + } + +} + +func StartCrons(conf config.PipelineConf, db database.Database) chan uuid.UUID { + cronChan := make(chan uuid.UUID) + go launchCrons(db, conf, cronChan) + return cronChan +} diff --git a/database/func.go b/database/func.go index 818e0b5..b2e44cc 100644 --- a/database/func.go +++ b/database/func.go @@ -705,13 +705,49 @@ WHERE pipeline_id=$1;` return crons, nil } -func (db *Database) AddCronForPipeline(pipelineId uuid.UUID, cron string, pattern string) error { +func (db *Database) GetCronById(id uuid.UUID) (Cron, error) { + + query := ` +SELECT cron, pipeline_id, pattern +FROM crons +WHERE id=$1;` + + cron := Cron{ + Id: id, + } + + var pipelineIdStr string + + err := db.Conn.QueryRow(context.Background(), query, id).Scan(&cron.Cron, &pipelineIdStr, &cron.Pattern) + if err != nil { + return cron, fmt.Errorf("Could not query database for cron with id %v: %w", id.String(), err) + } + + cron.PipelineId, err = uuid.Parse(pipelineIdStr) + return cron, err +} + +func (db *Database) AddCronForPipeline(pipelineId uuid.UUID, cronStr string, pattern string) (Cron, error) { query := ` INSERT INTO crons (id, pipeline_id, cron, pattern) -VALUES (uuid_generate_v4(), $1, $2, $3);` +VALUES (uuid_generate_v4(), $1, $2, $3) +RETURNING id;` - _, err := db.Conn.Exec(context.Background(), query, pipelineId, cron, pattern) - return err + cron := Cron{ + PipelineId: pipelineId, + Cron: cronStr, + Pattern: pattern, + } + + var idStr string + + err := db.Conn.QueryRow(context.Background(), query, pipelineId, cronStr, pattern).Scan(&idStr) + if err != nil { + return cron, fmt.Errorf("Could not create cron: %w", err) + } + + cron.Id, err = uuid.Parse(idStr) + return cron, err } func (db *Database) RemoveCronForPipeline(cronId uuid.UUID) error { diff --git a/database/types.go b/database/types.go index feb9c7e..ac49fdf 100644 --- a/database/types.go +++ b/database/types.go @@ -81,7 +81,8 @@ type Runner struct { } type Cron struct { - Id uuid.UUID - Cron string - Pattern string + Id uuid.UUID + PipelineId uuid.UUID + Cron string + Pattern string } diff --git a/go.mod b/go.mod index 3442428..fe8a372 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/jackc/pgx/v5 v5.2.0 github.com/jhoonb/archivex v0.0.0-20201016144719-6a343cdae81d github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 + github.com/robfig/cron/v3 v3.0.1 golang.org/x/net v0.2.0 google.golang.org/protobuf v1.30.0 nhooyr.io/websocket v1.8.7 diff --git a/go.sum b/go.sum index 90fc856..df2639f 100644 --- a/go.sum +++ b/go.sum @@ -1210,6 +1210,8 @@ github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.2/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/robertkrimen/godocdown v0.0.0-20130622164427-0bfa04905481/go.mod h1:C9WhFzY47SzYBIvzFqSvHIR6ROgDo4TtdTuRaOMjF/s= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.1.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= diff --git a/listen/listen.go b/listen/listen.go index 0970640..c357562 100644 --- a/listen/listen.go +++ b/listen/listen.go @@ -26,13 +26,14 @@ func setupHTTPServer( db database.Database, runnerManagerChans runnermanager.RunnerManagerChans, pollChan chan uuid.UUID, + cronChan chan uuid.UUID, ) error { webhook.CreateWebhookHandler(db, conf, mux) pipeline_api.CreateHandler(runnerManagerChans.Allocation, runnerManagerChans.Release, mux) - err := admin_api.CreateHandler(db, pollChan, mux) + err := admin_api.CreateHandler(db, pollChan, cronChan, mux) if err != nil { return fmt.Errorf("Could not create admin api handler: %w", err) } @@ -56,6 +57,7 @@ func Listen( db database.Database, runnerManagerChans runnermanager.RunnerManagerChans, pollChan chan uuid.UUID, + cronChan chan uuid.UUID, ) error { err := setupHTTPServer( @@ -64,6 +66,7 @@ func Listen( db, runnerManagerChans, pollChan, + cronChan, ) if err != nil { return fmt.Errorf("Could not setup http endpoints: %w", err) diff --git a/main.go b/main.go index e91a556..177cf8e 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "os" "git.ohea.xyz/cursorius/server/config" + "git.ohea.xyz/cursorius/server/cron" "git.ohea.xyz/cursorius/server/database" "git.ohea.xyz/cursorius/server/listen" "git.ohea.xyz/cursorius/server/poll" @@ -48,6 +49,8 @@ func main() { pollChan := poll.StartPolling(configData.Config.PipelineConf, db) + cronChan := cron.StartCrons(configData.Config.PipelineConf, db) + mux := http.NewServeMux() log.Fatal(listen.Listen( @@ -58,5 +61,6 @@ func main() { db, runnerManagerChans, pollChan, + cronChan, )) }