Compare commits
2 Commits
711a0257fc
...
6fc9af8280
| Author | SHA1 | Date | |
|---|---|---|---|
| 6fc9af8280 | |||
| ff78167325 |
@@ -19,7 +19,8 @@ var log = logging.MustGetLogger("cursorius-server")
|
||||
|
||||
type ApiServer struct {
|
||||
getRunnerCh chan runnermanager.GetRunnerRequest
|
||||
allocatedRunners map[uuid.UUID]RunnerWrapper
|
||||
allocatedRunners map[uuid.UUID]*RunnerWrapper
|
||||
allocatedRunnersMutex sync.RWMutex
|
||||
}
|
||||
|
||||
type RunnerWrapper struct {
|
||||
@@ -27,6 +28,23 @@ type RunnerWrapper struct {
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
func (r *RunnerWrapper) RunCommand(cmd string) (int64, string, string, error) {
|
||||
r.mutex.Unlock()
|
||||
defer r.mutex.Lock()
|
||||
|
||||
return_code, stdout, stderr, err := r.runner.RunCommand(cmd)
|
||||
|
||||
// TODO: run command by sending websocket packet
|
||||
// TODO: get stdout and stderr response
|
||||
return return_code, stdout, stderr, err
|
||||
}
|
||||
|
||||
func (s *ApiServer) GetRunnerFromMap(uuid uuid) *RunnerWrapper {
|
||||
s.allocatedRunnersMutex.RLock()
|
||||
defer s.allocatedRunnersMutex.RUnlock()
|
||||
return s.allocatedRunners[uuid]
|
||||
}
|
||||
|
||||
func (s *ApiServer) GetRunner(
|
||||
ctx context.Context,
|
||||
req *connect.Request[apiv1.GetRunnerRequest],
|
||||
@@ -54,7 +72,9 @@ func (s *ApiServer) GetRunner(
|
||||
|
||||
runnerUuid := uuid.New()
|
||||
|
||||
s.allocatedRunners[runnerUuid] = RunnerWrapper{runner: response.Runner}
|
||||
s.allocatedRunnersMutex.Lock()
|
||||
s.allocatedRunners[runnerUuid] = &RunnerWrapper{runner: response.Runner}
|
||||
s.allocatedRunnersMutex.Unlock()
|
||||
|
||||
res := connect.NewResponse(&apiv1.GetRunnerResponse{
|
||||
Id: runnerUuid.String(),
|
||||
@@ -68,10 +88,23 @@ func (s *ApiServer) RunCommand(
|
||||
req *connect.Request[apiv1.RunCommandRequest],
|
||||
) (*connect.Response[apiv1.RunCommandResponse], error) {
|
||||
|
||||
uuid, err := uuid.Parse(req.Msg.Id)
|
||||
if err != nil {
|
||||
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id"))
|
||||
}
|
||||
|
||||
runner := s.GetRunnerFromMap(uuid)
|
||||
|
||||
return_code, stdout, stderr, err := runner.RunCommand(req.Msg.Command)
|
||||
if err != nil {
|
||||
log.Errorf("Could not run command on runner \"%v\", %v", runner.runner.Id(), err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("Could not run command"))
|
||||
}
|
||||
|
||||
res := connect.NewResponse(&apiv1.RunCommandResponse{
|
||||
ReturnCode: 0,
|
||||
Stdout: "",
|
||||
Stderr: "",
|
||||
ReturnCode: return_code,
|
||||
Stdout: stdout,
|
||||
Stderr: stderr,
|
||||
})
|
||||
res.Header().Set("RunCommand-Version", "v1")
|
||||
return res, nil
|
||||
@@ -80,7 +113,7 @@ func (s *ApiServer) RunCommand(
|
||||
func CreateHandler(mux *http.ServeMux, getRunnerCh chan runnermanager.GetRunnerRequest) {
|
||||
api_server := &ApiServer{
|
||||
getRunnerCh: getRunnerCh,
|
||||
allocatedRunners: make(map[uuid.UUID]RunnerWrapper),
|
||||
allocatedRunners: make(map[uuid.UUID]*RunnerWrapper),
|
||||
}
|
||||
path, handler := apiv1connect.NewGetRunnerServiceHandler(api_server)
|
||||
mux.Handle(path, handler)
|
||||
|
||||
@@ -5,7 +5,7 @@ package api.v1;
|
||||
option go_package = "git.ohea.xyz/cursorius/server/proto/gen/api/v1;apiv1";
|
||||
|
||||
message RunCommandRequest {
|
||||
int64 runner_id = 1;
|
||||
string id = 1;
|
||||
string command = 2;
|
||||
}
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ type RunCommandRequest struct {
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
RunnerId int64 `protobuf:"varint,1,opt,name=runner_id,json=runnerId,proto3" json:"runner_id,omitempty"`
|
||||
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
|
||||
Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"`
|
||||
}
|
||||
|
||||
@@ -61,11 +61,11 @@ func (*RunCommandRequest) Descriptor() ([]byte, []int) {
|
||||
return file_api_v1_run_command_proto_rawDescGZIP(), []int{0}
|
||||
}
|
||||
|
||||
func (x *RunCommandRequest) GetRunnerId() int64 {
|
||||
func (x *RunCommandRequest) GetId() string {
|
||||
if x != nil {
|
||||
return x.RunnerId
|
||||
return x.Id
|
||||
}
|
||||
return 0
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *RunCommandRequest) GetCommand() string {
|
||||
@@ -143,28 +143,27 @@ var File_api_v1_run_command_proto protoreflect.FileDescriptor
|
||||
var file_api_v1_run_command_proto_rawDesc = []byte{
|
||||
0x0a, 0x18, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x75, 0x6e, 0x5f, 0x63, 0x6f, 0x6d,
|
||||
0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x61, 0x70, 0x69, 0x2e,
|
||||
0x76, 0x31, 0x22, 0x4a, 0x0a, 0x11, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
|
||||
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x72, 0x75, 0x6e, 0x6e, 0x65,
|
||||
0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x72, 0x75, 0x6e, 0x6e,
|
||||
0x65, 0x72, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18,
|
||||
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x22, 0x65,
|
||||
0x0a, 0x12, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70,
|
||||
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x75, 0x72, 0x6e, 0x5f, 0x63,
|
||||
0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x72, 0x65, 0x74, 0x75, 0x72,
|
||||
0x6e, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18,
|
||||
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a,
|
||||
0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73,
|
||||
0x74, 0x64, 0x65, 0x72, 0x72, 0x32, 0x5a, 0x0a, 0x11, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d,
|
||||
0x61, 0x6e, 0x64, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x45, 0x0a, 0x0a, 0x52, 0x75,
|
||||
0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x19, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76,
|
||||
0x31, 0x2e, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x75,
|
||||
0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x75, 0x6e,
|
||||
0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
|
||||
0x00, 0x42, 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x2e, 0x6f, 0x68, 0x65, 0x61, 0x2e, 0x78, 0x79,
|
||||
0x7a, 0x2f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x69, 0x75, 0x73, 0x2f, 0x73, 0x65, 0x72, 0x76,
|
||||
0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x61, 0x70, 0x69,
|
||||
0x2f, 0x76, 0x31, 0x3b, 0x61, 0x70, 0x69, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x33,
|
||||
0x76, 0x31, 0x22, 0x3d, 0x0a, 0x11, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
|
||||
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20,
|
||||
0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61,
|
||||
0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e,
|
||||
0x64, 0x22, 0x65, 0x0a, 0x12, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52,
|
||||
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x75, 0x72,
|
||||
0x6e, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x72, 0x65,
|
||||
0x74, 0x75, 0x72, 0x6e, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f,
|
||||
0x75, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74,
|
||||
0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
|
||||
0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x32, 0x5a, 0x0a, 0x11, 0x52, 0x75, 0x6e, 0x43,
|
||||
0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x45, 0x0a,
|
||||
0x0a, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x19, 0x2e, 0x61, 0x70,
|
||||
0x69, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52,
|
||||
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e,
|
||||
0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
|
||||
0x73, 0x65, 0x22, 0x00, 0x42, 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x2e, 0x6f, 0x68, 0x65, 0x61,
|
||||
0x2e, 0x78, 0x79, 0x7a, 0x2f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x69, 0x75, 0x73, 0x2f, 0x73,
|
||||
0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f,
|
||||
0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x3b, 0x61, 0x70, 0x69, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72,
|
||||
0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
package runnermanager
|
||||
|
||||
import "nhooyr.io/websocket"
|
||||
|
||||
//var log = logging.MustGetLogger("cursorius-server")
|
||||
|
||||
type RunnerData struct {
|
||||
msgType websocket.MessageType
|
||||
data []byte
|
||||
}
|
||||
|
||||
type Runner struct {
|
||||
id string
|
||||
tags []string
|
||||
conn *websocket.Conn
|
||||
receiveChan chan RunnerData
|
||||
running bool
|
||||
}
|
||||
|
||||
func (r *Runner) Id() string {
|
||||
return r.id
|
||||
}
|
||||
|
||||
func (r *Runner) RunCommand() (int64, string, string, error) {
|
||||
return 0, "", "", nil
|
||||
}
|
||||
@@ -21,18 +21,6 @@ type RunnerRegistration struct {
|
||||
conn *websocket.Conn
|
||||
}
|
||||
|
||||
type RunnerData struct {
|
||||
msgType websocket.MessageType
|
||||
data []byte
|
||||
}
|
||||
type Runner struct {
|
||||
id string
|
||||
tags []string
|
||||
conn *websocket.Conn
|
||||
receiveChan chan RunnerData
|
||||
running bool
|
||||
}
|
||||
|
||||
type runnerManager struct {
|
||||
getRunnerCh chan GetRunnerRequest
|
||||
registerCh chan RunnerRegistration
|
||||
@@ -55,15 +43,10 @@ type runnerJob struct {
|
||||
URL string
|
||||
}
|
||||
|
||||
func runRunnerManager(r runnerManager) {
|
||||
for {
|
||||
msgCase:
|
||||
select {
|
||||
case request := <-r.getRunnerCh:
|
||||
|
||||
func (r *runnerManager) processRequest(req GetRunnerRequest) {
|
||||
var runnerTagsStr strings.Builder
|
||||
fmt.Fprintf(&runnerTagsStr, "%v", request.Tags[0])
|
||||
for _, tag := range request.Tags[1:] {
|
||||
fmt.Fprintf(&runnerTagsStr, "%v", req.Tags[0])
|
||||
for _, tag := range req.Tags[1:] {
|
||||
fmt.Fprintf(&runnerTagsStr, " %v", tag)
|
||||
}
|
||||
log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String())
|
||||
@@ -83,6 +66,7 @@ func runRunnerManager(r runnerManager) {
|
||||
case _, ok := <-runner.receiveChan:
|
||||
if ok {
|
||||
// this should never happen
|
||||
// TODO: should we disconnect the runner?
|
||||
log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id)
|
||||
continue
|
||||
}
|
||||
@@ -92,11 +76,11 @@ func runRunnerManager(r runnerManager) {
|
||||
r.connectedRunners = r.connectedRunners[:len(r.connectedRunners)-1]
|
||||
default:
|
||||
runner.running = true
|
||||
request.RespChan <- GetRunnerResponse{
|
||||
req.RespChan <- GetRunnerResponse{
|
||||
Runner: runner,
|
||||
Err: nil,
|
||||
}
|
||||
break msgCase
|
||||
return
|
||||
}
|
||||
}
|
||||
errorMsg := "could not find valid runner"
|
||||
@@ -104,20 +88,22 @@ func runRunnerManager(r runnerManager) {
|
||||
errorMsg = "no connected runners"
|
||||
}
|
||||
log.Errorf("Could not allocate runner with tags \"%v\": %v", runnerTagsStr.String(), errorMsg)
|
||||
request.RespChan <- GetRunnerResponse{
|
||||
req.RespChan <- GetRunnerResponse{
|
||||
Runner: Runner{},
|
||||
Err: fmt.Errorf("Could not allocate runner: %v", errorMsg),
|
||||
}
|
||||
|
||||
case registration := <-r.registerCh:
|
||||
log.Debugf("New runner appeared with id: %v and secret: %v", registration.Id, registration.Secret)
|
||||
if configuredRunner, doesExist := r.configuredRunners[registration.Id]; doesExist {
|
||||
if configuredRunner.Secret == registration.Secret {
|
||||
log.Infof("Registering runner \"%v\" with tags %v", registration.Id, registration.Tags)
|
||||
}
|
||||
|
||||
func (r *runnerManager) processRegistration(reg RunnerRegistration) {
|
||||
log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret)
|
||||
if configuredRunner, doesExist := r.configuredRunners[reg.Id]; doesExist {
|
||||
if configuredRunner.Secret == reg.Secret {
|
||||
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
|
||||
runner := Runner{
|
||||
id: registration.Id,
|
||||
tags: registration.Tags,
|
||||
conn: registration.conn,
|
||||
id: reg.Id,
|
||||
tags: reg.Tags,
|
||||
conn: reg.conn,
|
||||
receiveChan: make(chan RunnerData),
|
||||
running: false,
|
||||
}
|
||||
@@ -126,7 +112,7 @@ func runRunnerManager(r runnerManager) {
|
||||
// this is required to keep the connection functioning
|
||||
go func() {
|
||||
for {
|
||||
msgType, data, err := registration.conn.Read(context.Background())
|
||||
msgType, data, err := reg.conn.Read(context.Background())
|
||||
if err != nil {
|
||||
// TODO: this is still racy, since a runner could be alloctade between the
|
||||
// connection returning an err and the channel closing
|
||||
@@ -143,13 +129,23 @@ func runRunnerManager(r runnerManager) {
|
||||
}()
|
||||
|
||||
} else {
|
||||
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", registration.Id, registration.Secret)
|
||||
registration.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
||||
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", reg.Id, registration.Secret)
|
||||
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
||||
}
|
||||
} else {
|
||||
log.Errorf("Disconnecting runner with invalid id: %v", registration.Id)
|
||||
registration.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
||||
log.Errorf("Disconnecting runner with invalid id: %v", reg.Id)
|
||||
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
|
||||
}
|
||||
}
|
||||
|
||||
func runRunnerManager(r runnerManager) {
|
||||
for {
|
||||
select {
|
||||
case request := <-r.getRunnerCh:
|
||||
r.processRequest(request)
|
||||
|
||||
case registration := <-r.registerCh:
|
||||
r.processRegistration(registration)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user