From c731e560afab484a14b7fdcd4260c1456acd1c67 Mon Sep 17 00:00:00 2001 From: restitux Date: Sat, 24 Dec 2022 00:10:06 -0700 Subject: [PATCH] Change launch logic to receive config in scheduler --- jobscheduler/jobscheduler.go | 82 +++++++++++++++++------------------- listen/listen.go | 24 +++++++---- main.go | 4 +- poll/poll.go | 5 ++- util/util.go | 14 ------ 5 files changed, 59 insertions(+), 70 deletions(-) delete mode 100644 util/util.go diff --git a/jobscheduler/jobscheduler.go b/jobscheduler/jobscheduler.go index 31b8e6f..60d0831 100644 --- a/jobscheduler/jobscheduler.go +++ b/jobscheduler/jobscheduler.go @@ -37,12 +37,12 @@ type jobScheduler struct { registerCh chan RunnerRegistration connectedRunners []Runner configuredRunners map[string]config.Runner - jobs map[string]config.Job } type Run struct { - Name string - Ref string + JobName string + JobConfig config.Job + Ref string } type runnerJob struct { @@ -54,49 +54,46 @@ func runJobScheduler(j jobScheduler) { for { select { case run := <-j.runCh: - log.Infof("Got run: %v", run) - if job, exists := j.jobs[run.Name]; exists { - log.Debugf("Finding runner for job \"%v\"", run.Name) - rJ := runnerJob{ - Id: run.Name, - URL: job.URL, - } - launched := false - for i, runner := range j.connectedRunners { - // don't send job to runner that is already occupied - if !runner.running { - // don't send job to runner with closed receiveChan (is defunct) - // there should never be messages to read on an inactive runner, - // so we aren't losing any data here - select { - case <-runner.receiveChan: - // if the receive channel is closed, swap delete the runner as it's defunct - j.connectedRunners[i] = j.connectedRunners[len(j.connectedRunners)-1] - j.connectedRunners = j.connectedRunners[:len(j.connectedRunners)-1] - default: - err := wsjson.Write(context.Background(), runner.conn, rJ) - if err != nil { - log.Debugf("Could not launch run: %v", err) - } else { - log.Infof("Launched run for job %v on runner %v", run.Name, runner.id) - launched = true - j.connectedRunners[i].running = true - break - } + log.Infof("Launching run for job \"%v\" on ref \"%v\"", run.JobName, run.Ref) + log.Debugf("Finding runner for job \"%v\"", run.JobName) + rJ := runnerJob{ + Id: run.JobName, + URL: run.JobConfig.URL, + } + launched := false + for i, runner := range j.connectedRunners { + // don't send job to runner that is already occupied + if !runner.running { + // don't send job to runner with closed receiveChan (is defunct) + // there should never be messages to read on an inactive runner, + // so we aren't losing any data here + select { + case <-runner.receiveChan: + // if the receive channel is closed, swap delete the runner as it's defunct + j.connectedRunners[i] = j.connectedRunners[len(j.connectedRunners)-1] + j.connectedRunners = j.connectedRunners[:len(j.connectedRunners)-1] + default: + err := wsjson.Write(context.Background(), runner.conn, rJ) + if err != nil { + log.Errorf("Could not launch run: %v", err) + break + } else { + log.Infof("Launched run for job %v on runner %v", run.JobName, runner.id) + launched = true + j.connectedRunners[i].running = true + break } - } else { - log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id) } + } else { + log.Debugf("Skipping runner %v, as runner is activly running another job", runner.id) } - if !launched { - errorMsg := "could not find valid runner" - if len(j.connectedRunners) == 0 { - errorMsg = "no connected runners" - } - log.Errorf("Could not launch run for job \"%v\": %v", run.Name, errorMsg) + } + if !launched { + errorMsg := "could not find valid runner" + if len(j.connectedRunners) == 0 { + errorMsg = "no connected runners" } - } else { - log.Errorf("No configured job with name %v", run.Name) + log.Errorf("Could not launch run for job \"%v\": %v", run.JobName, errorMsg) } case registration := <-j.registerCh: log.Debugf("New runner appeared with id: %v and secret: %v", registration.Id, registration.Secret) @@ -149,7 +146,6 @@ func StartJobScheduler(jobs map[string]config.Job, configuredRunners map[string] registerCh: make(chan RunnerRegistration), connectedRunners: make([]Runner, 0), configuredRunners: configuredRunners, - jobs: jobs, } go runJobScheduler(scheduler) diff --git a/listen/listen.go b/listen/listen.go index e5d5254..7a22c16 100644 --- a/listen/listen.go +++ b/listen/listen.go @@ -15,7 +15,7 @@ import ( var log = logging.MustGetLogger("cursorius-server") func setupHTTPServer(runCh chan jobscheduler.Run, registerCh chan jobscheduler.RunnerRegistration, - webhookConfig map[string]config.Webhook) { + jobs map[string]config.Job) { http.HandleFunc("/webhook/", func(w http.ResponseWriter, r *http.Request) { switch r.Method { case "POST": @@ -29,11 +29,15 @@ func setupHTTPServer(runCh chan jobscheduler.Run, registerCh chan jobscheduler.R // TODO: verify that this handles all valid URL formats webhookJobName := splitUrl[2] - for jobName, jobConfig := range webhookConfig { + for jobName, jobConfig := range jobs { if webhookJobName == jobName { - switch jobConfig.Sender { + if jobConfig.Webhook == nil { + log.Errorf("Matching job does not have webhook configuration, ignoring....") + return + } + switch jobConfig.Webhook.Sender { case config.Gitea: - hook, err := gitea.New(gitea.Options.Secret(jobConfig.Secret)) + hook, err := gitea.New(gitea.Options.Secret(jobConfig.Webhook.Secret)) if err != nil { log.Errorf("Could not create Gitea webhook handler: %v", err) return @@ -48,10 +52,14 @@ func setupHTTPServer(runCh chan jobscheduler.Run, registerCh chan jobscheduler.R return } log.Infof("Got webhook with payload %v", payload) - runCh <- jobscheduler.Run{Name: "test"} + runCh <- jobscheduler.Run{ + JobName: jobName, + JobConfig: jobConfig, + Ref: "master", + } return default: - log.Errorf("Job configured with unkown webhook sender \"%v\", igonring...", jobConfig.Sender) + log.Errorf("Job configured with unknown webhook sender \"%v\", igonring...", jobConfig.Webhook.Sender) return } @@ -75,9 +83,9 @@ func setupHTTPServer(runCh chan jobscheduler.Run, registerCh chan jobscheduler.R }) } -func Listen(address string, port int, runCh chan jobscheduler.Run, registerCh chan jobscheduler.RunnerRegistration, webhookConfig map[string]config.Webhook) { +func Listen(address string, port int, runCh chan jobscheduler.Run, registerCh chan jobscheduler.RunnerRegistration, jobs map[string]config.Job) { - setupHTTPServer(runCh, registerCh, webhookConfig) + setupHTTPServer(runCh, registerCh, jobs) connect_string := fmt.Sprintf("%v:%v", address, port) log.Noticef("Launching HTTP server on %v\n", connect_string) diff --git a/main.go b/main.go index 7d55468..77a8ec3 100644 --- a/main.go +++ b/main.go @@ -7,7 +7,6 @@ import ( "git.ohea.xyz/cursorius/server/jobscheduler" "git.ohea.xyz/cursorius/server/listen" "git.ohea.xyz/cursorius/server/poll" - "git.ohea.xyz/cursorius/server/util" "github.com/op/go-logging" ) @@ -39,7 +38,6 @@ func main() { poll.StartPolling(configData.Config.Jobs, runCh) - webhookConfig := util.CreateWebhookConfig(configData.Config.Jobs) - listen.Listen(configData.Config.Address, configData.Config.Port, runCh, registerCh, webhookConfig) + listen.Listen(configData.Config.Address, configData.Config.Port, runCh, registerCh, configData.Config.Jobs) } diff --git a/poll/poll.go b/poll/poll.go index 835bdcc..ca1485d 100644 --- a/poll/poll.go +++ b/poll/poll.go @@ -90,8 +90,9 @@ func pollJob(repoName string, jobConfig config.Job, runCh chan jobscheduler.Run) for _, ref := range refsToRunFor { log.Debugf("Dispatching job for ref %v in repo %v", ref, repoName) runCh <- jobscheduler.Run{ - Name: repoName, - Ref: ref, + JobName: repoName, + JobConfig: jobConfig, + Ref: ref, } } } diff --git a/util/util.go b/util/util.go deleted file mode 100644 index 2df9878..0000000 --- a/util/util.go +++ /dev/null @@ -1,14 +0,0 @@ -package util - -import "git.ohea.xyz/cursorius/server/config" - -// Generated map for webhook handler config -func CreateWebhookConfig(conf map[string]config.Job) map[string]config.Webhook { - webhookConfig := make(map[string]config.Webhook, 0) - for k, v := range conf { - if v.Webhook != nil { - webhookConfig[k] = *v.Webhook - } - } - return webhookConfig -}