Add timeout and retry interval to GetRunner api

This commit is contained in:
2023-04-05 18:38:06 -06:00
parent a8e9a68f0e
commit ed7df18f83
6 changed files with 103 additions and 54 deletions
+43 -7
View File
@@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"sync"
"time"
apiv2 "git.ohea.xyz/cursorius/pipeline-api/go/api/v2"
"git.ohea.xyz/cursorius/pipeline-api/go/api/v2/apiv2connect"
@@ -60,17 +61,52 @@ func (s *ApiServer) GetRunner(
req *connect.Request[apiv2.GetRunnerRequest],
) (*connect.Response[apiv2.GetRunnerResponse], error) {
var response runnermanager.RunnerAllocationResponse
var timeoutCtx *context.Context
var retryInterval int64 = 0
respChan := make(chan runnermanager.RunnerAllocationResponse)
s.allocationCh <- runnermanager.RunnerAllocationRequest{
Tags: req.Msg.Tags,
RespChan: respChan,
}
tagsStr := util.FormatTags(req.Msg.Tags)
response := <-respChan
if response.Err != nil {
log.Errorf("Could not get runner with tags \"%v\": %v", tagsStr, response.Err)
if req.Msg.Options != nil {
if req.Msg.Options.Timeout != 0 {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(req.Msg.Options.Timeout)*time.Second)
timeoutCtx = &ctx
defer cancel()
}
retryInterval = req.Msg.Options.RetryInterval
}
for {
s.allocationCh <- runnermanager.RunnerAllocationRequest{
Tags: req.Msg.Tags,
RespChan: respChan,
}
response = <-respChan
if response.Err == nil {
break
}
log.Infof("Could not get runner with tags \"%v\": %v", tagsStr, response.Err)
// If no timeout is specified, skip after one attempt
if timeoutCtx == nil {
break
}
// If timeout is expired, stop trying to allocate runner
if (*timeoutCtx).Err() != nil {
break
}
log.Infof("Sleeping for %v seconds before retry...", retryInterval)
time.Sleep(time.Duration(retryInterval) * time.Second)
}
if response.Runner == nil {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("Could not get runner"))
}