Complete support for cron pipeline triggering

This commit is contained in:
2023-04-08 16:28:00 -06:00
parent fe9e1cac15
commit 4f7b315f54
8 changed files with 186 additions and 12 deletions
+121
View File
@@ -0,0 +1,121 @@
package cron
import (
"fmt"
"regexp"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
"git.ohea.xyz/cursorius/server/pipeline_executor"
"github.com/google/uuid"
"github.com/op/go-logging"
"github.com/robfig/cron/v3"
)
var log = logging.MustGetLogger("cursorius-server")
func runPipeline(db database.Database, conf config.PipelineConf, pipelineId uuid.UUID, cron database.Cron) error {
run, err := db.CreateRun(pipelineId)
if err != nil {
return fmt.Errorf("Could not create run for pipeline with id %v: %w", pipelineId, err)
}
refs, err := db.GetPipelineRefs(pipelineId)
if err != nil {
return fmt.Errorf("Could not get refs for pipeline with id %v: %w", pipelineId, err)
}
useRef, err := regexp.Compile(cron.Pattern)
if err != nil {
return fmt.Errorf("Could not compile regex for cron %v for pipeline %v: %w", cron.Id, pipelineId, err)
}
for ref := range refs {
if !useRef.MatchString(ref) {
log.Debugf("Skipping ref %v for pipeline %v as regex %v in cron %v doesn't match", ref, pipelineId, cron.Pattern, cron.Id)
continue
}
pipeline, err := db.GetPipelineById(pipelineId)
if err != nil {
return fmt.Errorf("could not get pipeline with id %v from db: %w", pipelineId, err)
}
pe := pipeline_executor.PipelineExecution{
Pipeline: pipeline,
Ref: ref,
Run: run,
}
go pipeline_executor.ExecutePipeline(pe, db, conf)
}
return nil
}
func launchCrons(db database.Database, conf config.PipelineConf, updateChan chan uuid.UUID) {
pipelines, err := db.GetPipelines()
if err != nil {
log.Errorf("Could not get pipelines from database: %w", err)
return
}
cronManager := cron.New()
cronEntries := make(map[uuid.UUID]cron.EntryID)
for _, pipeline := range pipelines {
crons, err := db.GetCronsForPipeline(pipeline.Id)
if err != nil {
log.Errorf("Could not get crons for pipeline with id \"%v\": %w", pipeline.Id, err)
return
}
log.Infof("Starting crons for pipeline %v with id %v", pipeline.Name, pipeline.Id)
for _, cron := range crons {
cronEntries[cron.Id], err = cronManager.AddFunc(cron.Cron, func() {
log.Infof("Triggering cron with value \"%v\" for pipeline with id \"%v\"", cron.Cron, cron.PipelineId)
err := runPipeline(db, conf, pipeline.Id, cron)
if err != nil {
log.Errorf("Could not run pipeline with id \"%v\": %w", pipeline.Id, err)
}
})
if err != nil {
log.Errorf("Could not configure cron for pipeline with id \"%v\": %w", pipeline.Id, err)
}
}
}
cronManager.Start()
for {
cronUUID := <-updateChan
if entryId, ok := cronEntries[cronUUID]; ok {
log.Infof("Canceling cron %v", cronUUID)
cronManager.Remove(entryId)
}
cron, err := db.GetCronById(cronUUID)
// if cron no longer exists, don't try to restart it
// TODO: this squashes other DB errors
if err != nil {
continue
}
log.Infof("Starting cron %v with value %v for pipeline %v", cron.Id, cron.Cron, cron.PipelineId)
cronEntries[cron.Id], err = cronManager.AddFunc(cron.Cron, func() {
err := runPipeline(db, conf, cron.PipelineId, cron)
if err != nil {
log.Errorf("Could not setup run pipeline with id \"%v\": %w", cron.PipelineId, err)
}
})
}
}
func StartCrons(conf config.PipelineConf, db database.Database) chan uuid.UUID {
cronChan := make(chan uuid.UUID)
go launchCrons(db, conf, cronChan)
return cronChan
}