package pipeline_api import ( "context" "fmt" "net/http" "sync" "time" apiv2 "git.ohea.xyz/cursorius/pipeline-api/go/api/v2" "git.ohea.xyz/cursorius/pipeline-api/go/api/v2/apiv2connect" "git.ohea.xyz/cursorius/server/runnermanager" "git.ohea.xyz/cursorius/server/util" "github.com/bufbuild/connect-go" "github.com/google/uuid" "github.com/op/go-logging" ) var log = logging.MustGetLogger("cursorius-server") type ApiServer struct { allocationCh chan runnermanager.RunnerAllocationRequest releaseCh chan runnermanager.RunnerReleaseRequest allocatedRunners map[uuid.UUID]*RunnerWrapper allocatedRunnersMutex sync.RWMutex } type RunnerWrapper struct { runner *runnermanager.Runner mutex sync.Mutex } func (r *RunnerWrapper) RunCommand(cmd string, args []string) (int64, string, string, error) { r.mutex.Lock() defer r.mutex.Unlock() return_code, stdout, stderr, err := r.runner.RunCommand(cmd, args) return return_code, stdout, stderr, err } func (r *RunnerWrapper) Release(releaseCh chan runnermanager.RunnerReleaseRequest) { r.mutex.Lock() defer r.mutex.Unlock() releaseCh <- runnermanager.RunnerReleaseRequest{ Runner: r.runner, } r.runner = nil } func (s *ApiServer) GetRunnerFromMap(u uuid.UUID) (*RunnerWrapper, bool) { s.allocatedRunnersMutex.RLock() defer s.allocatedRunnersMutex.RUnlock() runner, ok := s.allocatedRunners[u] return runner, ok } func (s *ApiServer) AddRunnerToMap(u uuid.UUID, runner *runnermanager.Runner) { s.allocatedRunnersMutex.Lock() defer s.allocatedRunnersMutex.Unlock() s.allocatedRunners[u] = &RunnerWrapper{runner: runner} } func (s *ApiServer) GetRunner( ctx context.Context, req *connect.Request[apiv2.GetRunnerRequest], ) (*connect.Response[apiv2.GetRunnerResponse], error) { var response runnermanager.RunnerAllocationResponse var timeoutCtx *context.Context var retryInterval int64 = 0 respChan := make(chan runnermanager.RunnerAllocationResponse) tagsStr := util.FormatTags(req.Msg.Tags) if req.Msg.Options != nil { if req.Msg.Options.Timeout != 0 { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(req.Msg.Options.Timeout)*time.Second) timeoutCtx = &ctx defer cancel() } retryInterval = req.Msg.Options.RetryInterval } for { s.allocationCh <- runnermanager.RunnerAllocationRequest{ Tags: req.Msg.Tags, RespChan: respChan, } response = <-respChan if response.Err == nil { break } log.Infof("Could not get runner with tags \"%v\": %v", tagsStr, response.Err) // If no timeout is specified, skip after one attempt if timeoutCtx == nil { break } // If timeout is expired, stop trying to allocate runner if (*timeoutCtx).Err() != nil { break } log.Infof("Sleeping for %v seconds before retry...", retryInterval) time.Sleep(time.Duration(retryInterval) * time.Second) } if response.Runner == nil { return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("Could not get runner")) } log.Infof("Got runner with tags: %v", tagsStr) runnerUuid := uuid.New() s.AddRunnerToMap(runnerUuid, response.Runner) res := connect.NewResponse(&apiv2.GetRunnerResponse{ Id: runnerUuid.String(), }) res.Header().Set("GetRunner-Version", "v2") return res, nil } func (s *ApiServer) ReleaseRunner( ctx context.Context, req *connect.Request[apiv2.ReleaseRunnerRequest], ) (*connect.Response[apiv2.ReleaseRunnerResponse], error) { uuid, err := uuid.Parse(req.Msg.Id) if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id")) } log.Infof("Releasing runner with ID \"%v\"", uuid) s.allocatedRunnersMutex.Lock() runner := s.allocatedRunners[uuid] delete(s.allocatedRunners, uuid) runner.Release(s.releaseCh) s.allocatedRunnersMutex.Unlock() res := connect.NewResponse(&apiv2.ReleaseRunnerResponse{}) res.Header().Set("ReleaseRunner-Version", "v2") return res, nil } func (s *ApiServer) RunCommand( ctx context.Context, req *connect.Request[apiv2.RunCommandRequest], ) (*connect.Response[apiv2.RunCommandResponse], error) { uuid, err := uuid.Parse(req.Msg.Id) if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id")) } runner, ok := s.GetRunnerFromMap(uuid) if !ok { return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id")) } returnCode, stdout, stderr, err := runner.RunCommand(req.Msg.Command, req.Msg.Args) if err != nil { log.Errorf("Could not run command on runner \"%v\", %v", runner.runner.Id(), err) return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("Could not run command")) } res := connect.NewResponse(&apiv2.RunCommandResponse{ ReturnCode: returnCode, Stdout: stdout, Stderr: stderr, }) res.Header().Set("RunCommand-Version", "v2") return res, nil } func CreateHandler(allocationCh chan runnermanager.RunnerAllocationRequest, releaseCh chan runnermanager.RunnerReleaseRequest, mux *http.ServeMux) { api_server := &ApiServer{ allocationCh: allocationCh, releaseCh: releaseCh, allocatedRunners: make(map[uuid.UUID]*RunnerWrapper), } path, handler := apiv2connect.NewGetRunnerServiceHandler(api_server) mux.Handle(path, handler) path, handler = apiv2connect.NewReleaseRunnerServiceHandler(api_server) mux.Handle(path, handler) path, handler = apiv2connect.NewRunCommandServiceHandler(api_server) mux.Handle(path, handler) }