Compare commits

...

2 Commits

Author SHA1 Message Date
restitux 6fc9af8280 Implement skeleton of RunCommand 2023-01-01 13:44:09 -07:00
restitux ff78167325 Cleanup runnermanager code 2023-01-01 13:43:45 -07:00
5 changed files with 190 additions and 136 deletions
+39 -6
View File
@@ -19,7 +19,8 @@ var log = logging.MustGetLogger("cursorius-server")
type ApiServer struct { type ApiServer struct {
getRunnerCh chan runnermanager.GetRunnerRequest getRunnerCh chan runnermanager.GetRunnerRequest
allocatedRunners map[uuid.UUID]RunnerWrapper allocatedRunners map[uuid.UUID]*RunnerWrapper
allocatedRunnersMutex sync.RWMutex
} }
type RunnerWrapper struct { type RunnerWrapper struct {
@@ -27,6 +28,23 @@ type RunnerWrapper struct {
mutex sync.Mutex mutex sync.Mutex
} }
func (r *RunnerWrapper) RunCommand(cmd string) (int64, string, string, error) {
r.mutex.Unlock()
defer r.mutex.Lock()
return_code, stdout, stderr, err := r.runner.RunCommand(cmd)
// TODO: run command by sending websocket packet
// TODO: get stdout and stderr response
return return_code, stdout, stderr, err
}
func (s *ApiServer) GetRunnerFromMap(uuid uuid) *RunnerWrapper {
s.allocatedRunnersMutex.RLock()
defer s.allocatedRunnersMutex.RUnlock()
return s.allocatedRunners[uuid]
}
func (s *ApiServer) GetRunner( func (s *ApiServer) GetRunner(
ctx context.Context, ctx context.Context,
req *connect.Request[apiv1.GetRunnerRequest], req *connect.Request[apiv1.GetRunnerRequest],
@@ -54,7 +72,9 @@ func (s *ApiServer) GetRunner(
runnerUuid := uuid.New() runnerUuid := uuid.New()
s.allocatedRunners[runnerUuid] = RunnerWrapper{runner: response.Runner} s.allocatedRunnersMutex.Lock()
s.allocatedRunners[runnerUuid] = &RunnerWrapper{runner: response.Runner}
s.allocatedRunnersMutex.Unlock()
res := connect.NewResponse(&apiv1.GetRunnerResponse{ res := connect.NewResponse(&apiv1.GetRunnerResponse{
Id: runnerUuid.String(), Id: runnerUuid.String(),
@@ -68,10 +88,23 @@ func (s *ApiServer) RunCommand(
req *connect.Request[apiv1.RunCommandRequest], req *connect.Request[apiv1.RunCommandRequest],
) (*connect.Response[apiv1.RunCommandResponse], error) { ) (*connect.Response[apiv1.RunCommandResponse], error) {
uuid, err := uuid.Parse(req.Msg.Id)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id"))
}
runner := s.GetRunnerFromMap(uuid)
return_code, stdout, stderr, err := runner.RunCommand(req.Msg.Command)
if err != nil {
log.Errorf("Could not run command on runner \"%v\", %v", runner.runner.Id(), err)
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("Could not run command"))
}
res := connect.NewResponse(&apiv1.RunCommandResponse{ res := connect.NewResponse(&apiv1.RunCommandResponse{
ReturnCode: 0, ReturnCode: return_code,
Stdout: "", Stdout: stdout,
Stderr: "", Stderr: stderr,
}) })
res.Header().Set("RunCommand-Version", "v1") res.Header().Set("RunCommand-Version", "v1")
return res, nil return res, nil
@@ -80,7 +113,7 @@ func (s *ApiServer) RunCommand(
func CreateHandler(mux *http.ServeMux, getRunnerCh chan runnermanager.GetRunnerRequest) { func CreateHandler(mux *http.ServeMux, getRunnerCh chan runnermanager.GetRunnerRequest) {
api_server := &ApiServer{ api_server := &ApiServer{
getRunnerCh: getRunnerCh, getRunnerCh: getRunnerCh,
allocatedRunners: make(map[uuid.UUID]RunnerWrapper), allocatedRunners: make(map[uuid.UUID]*RunnerWrapper),
} }
path, handler := apiv1connect.NewGetRunnerServiceHandler(api_server) path, handler := apiv1connect.NewGetRunnerServiceHandler(api_server)
mux.Handle(path, handler) mux.Handle(path, handler)
+1 -1
View File
@@ -5,7 +5,7 @@ package api.v1;
option go_package = "git.ohea.xyz/cursorius/server/proto/gen/api/v1;apiv1"; option go_package = "git.ohea.xyz/cursorius/server/proto/gen/api/v1;apiv1";
message RunCommandRequest { message RunCommandRequest {
int64 runner_id = 1; string id = 1;
string command = 2; string command = 2;
} }
+25 -26
View File
@@ -25,7 +25,7 @@ type RunCommandRequest struct {
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
RunnerId int64 `protobuf:"varint,1,opt,name=runner_id,json=runnerId,proto3" json:"runner_id,omitempty"` Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"` Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"`
} }
@@ -61,11 +61,11 @@ func (*RunCommandRequest) Descriptor() ([]byte, []int) {
return file_api_v1_run_command_proto_rawDescGZIP(), []int{0} return file_api_v1_run_command_proto_rawDescGZIP(), []int{0}
} }
func (x *RunCommandRequest) GetRunnerId() int64 { func (x *RunCommandRequest) GetId() string {
if x != nil { if x != nil {
return x.RunnerId return x.Id
} }
return 0 return ""
} }
func (x *RunCommandRequest) GetCommand() string { func (x *RunCommandRequest) GetCommand() string {
@@ -143,28 +143,27 @@ var File_api_v1_run_command_proto protoreflect.FileDescriptor
var file_api_v1_run_command_proto_rawDesc = []byte{ var file_api_v1_run_command_proto_rawDesc = []byte{
0x0a, 0x18, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x75, 0x6e, 0x5f, 0x63, 0x6f, 0x6d, 0x0a, 0x18, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x75, 0x6e, 0x5f, 0x63, 0x6f, 0x6d,
0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x61, 0x70, 0x69, 0x2e, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x61, 0x70, 0x69, 0x2e,
0x76, 0x31, 0x22, 0x4a, 0x0a, 0x11, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x76, 0x31, 0x22, 0x3d, 0x0a, 0x11, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x72, 0x75, 0x6e, 0x6e, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20,
0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x72, 0x75, 0x6e, 0x6e, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61,
0x65, 0x72, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x22, 0x65, 0x64, 0x22, 0x65, 0x0a, 0x12, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52,
0x0a, 0x12, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x75, 0x72,
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x75, 0x72, 0x6e, 0x5f, 0x63, 0x6e, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x72, 0x65,
0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x72, 0x65, 0x74, 0x75, 0x72, 0x74, 0x75, 0x72, 0x6e, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f,
0x6e, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x75, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x32, 0x5a, 0x0a, 0x11, 0x52, 0x75, 0x6e, 0x43,
0x74, 0x64, 0x65, 0x72, 0x72, 0x32, 0x5a, 0x0a, 0x11, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x45, 0x0a,
0x61, 0x6e, 0x64, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x45, 0x0a, 0x0a, 0x52, 0x75, 0x0a, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x19, 0x2e, 0x61, 0x70,
0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x19, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52,
0x31, 0x2e, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e,
0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x75, 0x6e, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x73, 0x65, 0x22, 0x00, 0x42, 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x2e, 0x6f, 0x68, 0x65, 0x61,
0x00, 0x42, 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x2e, 0x6f, 0x68, 0x65, 0x61, 0x2e, 0x78, 0x79, 0x2e, 0x78, 0x79, 0x7a, 0x2f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x69, 0x75, 0x73, 0x2f, 0x73,
0x7a, 0x2f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x69, 0x75, 0x73, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f,
0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x61, 0x70, 0x69, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x3b, 0x61, 0x70, 0x69, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72,
0x2f, 0x76, 0x31, 0x3b, 0x61, 0x70, 0x69, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x6f, 0x74, 0x6f, 0x33,
0x33,
} }
var ( var (
+26
View File
@@ -0,0 +1,26 @@
package runnermanager
import "nhooyr.io/websocket"
//var log = logging.MustGetLogger("cursorius-server")
type RunnerData struct {
msgType websocket.MessageType
data []byte
}
type Runner struct {
id string
tags []string
conn *websocket.Conn
receiveChan chan RunnerData
running bool
}
func (r *Runner) Id() string {
return r.id
}
func (r *Runner) RunCommand() (int64, string, string, error) {
return 0, "", "", nil
}
+32 -36
View File
@@ -21,18 +21,6 @@ type RunnerRegistration struct {
conn *websocket.Conn conn *websocket.Conn
} }
type RunnerData struct {
msgType websocket.MessageType
data []byte
}
type Runner struct {
id string
tags []string
conn *websocket.Conn
receiveChan chan RunnerData
running bool
}
type runnerManager struct { type runnerManager struct {
getRunnerCh chan GetRunnerRequest getRunnerCh chan GetRunnerRequest
registerCh chan RunnerRegistration registerCh chan RunnerRegistration
@@ -55,15 +43,10 @@ type runnerJob struct {
URL string URL string
} }
func runRunnerManager(r runnerManager) { func (r *runnerManager) processRequest(req GetRunnerRequest) {
for {
msgCase:
select {
case request := <-r.getRunnerCh:
var runnerTagsStr strings.Builder var runnerTagsStr strings.Builder
fmt.Fprintf(&runnerTagsStr, "%v", request.Tags[0]) fmt.Fprintf(&runnerTagsStr, "%v", req.Tags[0])
for _, tag := range request.Tags[1:] { for _, tag := range req.Tags[1:] {
fmt.Fprintf(&runnerTagsStr, " %v", tag) fmt.Fprintf(&runnerTagsStr, " %v", tag)
} }
log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String()) log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String())
@@ -83,6 +66,7 @@ func runRunnerManager(r runnerManager) {
case _, ok := <-runner.receiveChan: case _, ok := <-runner.receiveChan:
if ok { if ok {
// this should never happen // this should never happen
// TODO: should we disconnect the runner?
log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id) log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id)
continue continue
} }
@@ -92,11 +76,11 @@ func runRunnerManager(r runnerManager) {
r.connectedRunners = r.connectedRunners[:len(r.connectedRunners)-1] r.connectedRunners = r.connectedRunners[:len(r.connectedRunners)-1]
default: default:
runner.running = true runner.running = true
request.RespChan <- GetRunnerResponse{ req.RespChan <- GetRunnerResponse{
Runner: runner, Runner: runner,
Err: nil, Err: nil,
} }
break msgCase return
} }
} }
errorMsg := "could not find valid runner" errorMsg := "could not find valid runner"
@@ -104,20 +88,22 @@ func runRunnerManager(r runnerManager) {
errorMsg = "no connected runners" errorMsg = "no connected runners"
} }
log.Errorf("Could not allocate runner with tags \"%v\": %v", runnerTagsStr.String(), errorMsg) log.Errorf("Could not allocate runner with tags \"%v\": %v", runnerTagsStr.String(), errorMsg)
request.RespChan <- GetRunnerResponse{ req.RespChan <- GetRunnerResponse{
Runner: Runner{}, Runner: Runner{},
Err: fmt.Errorf("Could not allocate runner: %v", errorMsg), Err: fmt.Errorf("Could not allocate runner: %v", errorMsg),
} }
case registration := <-r.registerCh: }
log.Debugf("New runner appeared with id: %v and secret: %v", registration.Id, registration.Secret)
if configuredRunner, doesExist := r.configuredRunners[registration.Id]; doesExist { func (r *runnerManager) processRegistration(reg RunnerRegistration) {
if configuredRunner.Secret == registration.Secret { log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret)
log.Infof("Registering runner \"%v\" with tags %v", registration.Id, registration.Tags) if configuredRunner, doesExist := r.configuredRunners[reg.Id]; doesExist {
if configuredRunner.Secret == reg.Secret {
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
runner := Runner{ runner := Runner{
id: registration.Id, id: reg.Id,
tags: registration.Tags, tags: reg.Tags,
conn: registration.conn, conn: reg.conn,
receiveChan: make(chan RunnerData), receiveChan: make(chan RunnerData),
running: false, running: false,
} }
@@ -126,7 +112,7 @@ func runRunnerManager(r runnerManager) {
// this is required to keep the connection functioning // this is required to keep the connection functioning
go func() { go func() {
for { for {
msgType, data, err := registration.conn.Read(context.Background()) msgType, data, err := reg.conn.Read(context.Background())
if err != nil { if err != nil {
// TODO: this is still racy, since a runner could be alloctade between the // TODO: this is still racy, since a runner could be alloctade between the
// connection returning an err and the channel closing // connection returning an err and the channel closing
@@ -143,14 +129,24 @@ func runRunnerManager(r runnerManager) {
}() }()
} else { } else {
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", registration.Id, registration.Secret) log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", reg.Id, registration.Secret)
registration.conn.Close(websocket.StatusNormalClosure, "registration invalid") reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
} }
} else { } else {
log.Errorf("Disconnecting runner with invalid id: %v", registration.Id) log.Errorf("Disconnecting runner with invalid id: %v", reg.Id)
registration.conn.Close(websocket.StatusNormalClosure, "registration invalid") reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
} }
} }
func runRunnerManager(r runnerManager) {
for {
select {
case request := <-r.getRunnerCh:
r.processRequest(request)
case registration := <-r.registerCh:
r.processRegistration(registration)
}
} }
} }