package pipeline_api import ( "context" "fmt" "net/http" "strings" "sync" apiv2 "git.ohea.xyz/cursorius/pipeline-api/go/api/v2" "git.ohea.xyz/cursorius/pipeline-api/go/api/v2/apiv2connect" "git.ohea.xyz/cursorius/server/runnermanager" "github.com/bufbuild/connect-go" "github.com/google/uuid" "github.com/op/go-logging" ) var log = logging.MustGetLogger("cursorius-server") type ApiServer struct { getRunnerCh chan runnermanager.GetRunnerRequest allocatedRunners map[uuid.UUID]*RunnerWrapper allocatedRunnersMutex sync.RWMutex } type RunnerWrapper struct { runner *runnermanager.Runner mutex sync.Mutex } func (r *RunnerWrapper) RunCommand(cmd string, args []string) (int64, string, string, error) { r.mutex.Lock() defer r.mutex.Unlock() return_code, stdout, stderr, err := r.runner.RunCommand(cmd, args) // TODO: run command by sending websocket packet // TODO: get stdout and stderr response return return_code, stdout, stderr, err } func (r *RunnerWrapper) Release() { r.mutex.Lock() defer r.mutex.Unlock() r.runner.Release() } func (s *ApiServer) GetRunnerFromMap(u uuid.UUID) (*RunnerWrapper, bool) { s.allocatedRunnersMutex.RLock() defer s.allocatedRunnersMutex.RUnlock() runner, ok := s.allocatedRunners[u] return runner, ok } func (s *ApiServer) GetRunner( ctx context.Context, req *connect.Request[apiv2.GetRunnerRequest], ) (*connect.Response[apiv2.GetRunnerResponse], error) { respChan := make(chan runnermanager.GetRunnerResponse) s.getRunnerCh <- runnermanager.GetRunnerRequest{ Tags: req.Msg.Tags, RespChan: respChan, } var runnerTagsStr strings.Builder fmt.Fprintf(&runnerTagsStr, "[%v", req.Msg.Tags[0]) for _, tag := range req.Msg.Tags[1:] { fmt.Fprintf(&runnerTagsStr, ", %v", tag) } fmt.Fprintf(&runnerTagsStr, "]") response := <-respChan if response.Err != nil { log.Errorf("Could not get runner with tags \"%v\": %v", runnerTagsStr.String(), response.Err) return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("Could not get runner")) } log.Infof("Got runner with tags: %v", runnerTagsStr.String()) runnerUuid := uuid.New() s.allocatedRunnersMutex.Lock() s.allocatedRunners[runnerUuid] = &RunnerWrapper{runner: response.Runner} s.allocatedRunnersMutex.Unlock() res := connect.NewResponse(&apiv2.GetRunnerResponse{ Id: runnerUuid.String(), }) res.Header().Set("GetRunner-Version", "v2") return res, nil } func (s *ApiServer) ReleaseRunner( ctx context.Context, req *connect.Request[apiv2.ReleaseRunnerRequest], ) (*connect.Response[apiv2.ReleaseRunnerResponse], error) { uuid, err := uuid.Parse(req.Msg.Id) if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id")) } log.Infof("Releasing runner with ID \"%v\"", uuid) s.allocatedRunnersMutex.Lock() runner := s.allocatedRunners[uuid] delete(s.allocatedRunners, uuid) runner.Release() s.allocatedRunnersMutex.Unlock() res := connect.NewResponse(&apiv2.ReleaseRunnerResponse{}) res.Header().Set("ReleaseRunner-Version", "v2") return res, nil } func (s *ApiServer) RunCommand( ctx context.Context, req *connect.Request[apiv2.RunCommandRequest], ) (*connect.Response[apiv2.RunCommandResponse], error) { uuid, err := uuid.Parse(req.Msg.Id) if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id")) } runner, ok := s.GetRunnerFromMap(uuid) if !ok { return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id")) } returnCode, stdout, stderr, err := runner.RunCommand(req.Msg.Command, req.Msg.Args) if err != nil { log.Errorf("Could not run command on runner \"%v\", %v", runner.runner.Id(), err) return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("Could not run command")) } res := connect.NewResponse(&apiv2.RunCommandResponse{ ReturnCode: returnCode, Stdout: stdout, Stderr: stderr, }) res.Header().Set("RunCommand-Version", "v2") return res, nil } func CreateHandler(getRunnerCh chan runnermanager.GetRunnerRequest, mux *http.ServeMux) { api_server := &ApiServer{ getRunnerCh: getRunnerCh, allocatedRunners: make(map[uuid.UUID]*RunnerWrapper), } path, handler := apiv2connect.NewGetRunnerServiceHandler(api_server) mux.Handle(path, handler) path, handler = apiv2connect.NewReleaseRunnerServiceHandler(api_server) mux.Handle(path, handler) path, handler = apiv2connect.NewRunCommandServiceHandler(api_server) mux.Handle(path, handler) }