package poll import ( "context" "time" "git.ohea.xyz/cursorius/server/config" "git.ohea.xyz/cursorius/server/database" "git.ohea.xyz/cursorius/server/pipeline_executor" "github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5/plumbing" "github.com/go-git/go-git/v5/storage/memory" "github.com/google/uuid" "github.com/op/go-logging" ) var log = logging.MustGetLogger("cursorius-server") type branch struct { ref string commitHash string } type tag struct { ref string commitHash string } func pollJob(ctx context.Context, pipeline database.Pipeline, pipelineConf config.PipelineConf, db database.Database) { firstScan := true for { // Don't sleep on first scan to ease testing // TODO: this should be replaced with a script that mocks a webhook if !firstScan { ctx, cancel := context.WithTimeout(ctx, time.Duration(pipeline.PollInterval)*time.Second) select { case <-ctx.Done(): switch ctx.Err() { case context.Canceled: log.Infof("Polling for pipeline %v canceled, stopping", pipeline.Name) cancel() return } } cancel() log.Infof("Polling repo %v", pipeline.Name) } else { firstScan = false } prevRefs, err := db.GetPipelineRefs(pipeline.Id) if err != nil { log.Errorf("Could not get pipeline refs from db: %v", err) return } log.Debugf("Got pipeline hashs for repo %v (id: %v)", pipeline.Name, pipeline.Id) for refName, hash := range prevRefs { log.Debugf("%v: %v: %v", pipeline.Id, refName, hash) } repo, err := git.Clone(memory.NewStorage(), nil, &git.CloneOptions{ URL: pipeline.Url, }) if err != nil { log.Errorf("Could not clone repo %v from url %v: %v", pipeline.Name, pipeline.Url, err) continue } refsToRunFor := []string{} // get branches branches, err := repo.Branches() if err != nil { log.Errorf("Could not enumerate branches in repo %v: %v", pipeline.Name, err) continue } branches.ForEach(func(branch *plumbing.Reference) error { log.Debugf("Processing branch %v from repo %v (id: %v)", branch.Name().String(), pipeline.Name, pipeline.Id) prevRef, ok := prevRefs[branch.Name().String()] if ok { if branch.Hash().String() != prevRef { log.Debugf("Branch %v in repo %v (id: %v) has hash %v, which does not match the previously seen hash of %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String(), prevRef) log.Debugf("Queuing job for branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String()) prevRefs[branch.Name().String()] = branch.Hash().String() refsToRunFor = append(refsToRunFor, branch.Name().String()) } else { log.Debugf("Branch %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String(), prevRef) } } else { log.Debugf("Queuing job for newly discovered branch %v in repo %v (id: %v) with hash %v", branch.Name().String(), pipeline.Name, pipeline.Id, branch.Hash().String()) prevRefs[branch.Name().String()] = branch.Hash().String() refsToRunFor = append(refsToRunFor, branch.Name().String()) } return nil }) tags, err := repo.Tags() if err != nil { log.Errorf("Could not enumerate tags in repo %v: %v", pipeline.Name, err) continue } tags.ForEach(func(tag *plumbing.Reference) error { log.Debugf("Processing tag %v from repo %v (id: %v)", tag.Name().String(), pipeline.Name, pipeline.Id) prevRef, ok := prevRefs[tag.Name().String()] if ok { if tag.Hash().String() != prevRef { log.Debugf("Queuing job for tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String()) prevRefs[tag.Name().String()] = tag.Hash().String() refsToRunFor = append(refsToRunFor, tag.Name().String()) } else { log.Debugf("Tag %v in repo %v (id: %v) has hash %v, which matches the previously seen hash of %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String(), prevRef) } } else { log.Debugf("Queuing job for newly discovered tag %v in repo %v (id: %v) with hash %v", tag.Name().String(), pipeline.Name, pipeline.Id, tag.Hash().String()) prevRefs[tag.Name().String()] = tag.Hash().String() refsToRunFor = append(refsToRunFor, tag.Name().String()) } return nil }) err = db.UpdatePipelineRefs(pipeline.Id, prevRefs) if err != nil { log.Errorf("Could not update pipeline refs: %v", err) return } for _, ref := range refsToRunFor { log.Debugf("Dispatching job for ref %v in repo %v (id: %v)", ref, pipeline.Name, pipeline.Id) run, err := db.CreateRun(pipeline.Id) if err != nil { log.Errorf("Could not create run for pipeline with id \"%v\": ", pipeline.Id, err) continue } pe := pipeline_executor.PipelineExecution{ Pipeline: pipeline, Ref: ref, Run: run, } go pipeline_executor.ExecutePipeline(pe, db, pipelineConf) } } } func launchPollJobs(conf config.PipelineConf, db database.Database, pollChan chan uuid.UUID) { pipelines, err := db.GetPipelines() if err != nil { log.Errorf("Could not get pipelines from database: %w", err) return } pipelineCancelations := make(map[uuid.UUID]context.CancelFunc) for _, pipeline := range pipelines { if pipeline.PollInterval == 0 { continue } else { ctx, cancel := context.WithCancel(context.Background()) pipelineCancelations[pipeline.Id] = cancel log.Infof("Starting polling for pipeline %v with id %v", pipeline.Name, pipeline.Id) go pollJob(ctx, pipeline, conf, db) } } for { jobUUID := <-pollChan pipeline, err := db.GetPipelineById(jobUUID) if err != nil { log.Errorf("Could not get pipeline with id \"%v\" from database: %v", err) continue } // Cancel existing polling job if it exists if cancelFunc, ok := pipelineCancelations[pipeline.Id]; ok { cancelFunc() } // Start new polling job log.Infof("Starting polling for pipeline %v with id %v", pipeline.Name, pipeline.Id) ctx, cancel := context.WithCancel(context.Background()) pipelineCancelations[pipeline.Id] = cancel go pollJob(ctx, pipeline, conf, db) } } func StartPolling(conf config.PipelineConf, db database.Database) chan uuid.UUID { pollChan := make(chan uuid.UUID) go launchPollJobs(conf, db, pollChan) return pollChan }