From 2c4eaf68097a374755daceec5d5b612469901d07 Mon Sep 17 00:00:00 2001 From: Vitaliy Pavlov Date: Fri, 23 Aug 2024 20:57:09 +0700 Subject: [PATCH] stage WIP --- amqp/amqp.go | 31 +++++----- amqp/types/agent.go | 37 ++++++------ amqp/types/controller.go | 33 ++++++----- amqp/types/queues_map.go | 8 +++ database/database.go | 4 +- database/task.go | 2 + docs/docs.go | 34 +++++++++++ docs/swagger.json | 34 +++++++++++ docs/swagger.yaml | 22 ++++++++ environment/environment.go | 10 ++++ main.go | 11 +++- plugins/plugins.go | 16 +++++- router.go | 2 + services/agents/agents.go | 13 +++++ services/agents/main_queue.go | 94 +++++++++++++++++++++++++++++++ types/{ => constructor}/plugin.go | 2 +- types/constructor/task.go | 65 +++++++++++++++++++++ types/task.go | 45 --------------- 18 files changed, 358 insertions(+), 105 deletions(-) create mode 100644 amqp/types/queues_map.go create mode 100644 services/agents/agents.go create mode 100644 services/agents/main_queue.go rename types/{ => constructor}/plugin.go (85%) create mode 100644 types/constructor/task.go delete mode 100644 types/task.go diff --git a/amqp/amqp.go b/amqp/amqp.go index d6ebbb9..ba65cba 100644 --- a/amqp/amqp.go +++ b/amqp/amqp.go @@ -3,30 +3,29 @@ package amqp import ( "log" "os" - "strconv" "sync" + "system-trace/core/environment" "system-trace/core/utils" - "time" "github.com/wagslane/go-rabbitmq" ) +var Broker *rabbitmq.Conn + func InitConn() *rabbitmq.Conn { var conn *rabbitmq.Conn - if recon, err := strconv.Atoi(os.Getenv("RMQ_RECONNECT_INTERVAL")); err == nil { - conn, err = rabbitmq.NewConn( - os.Getenv("RMQ_URL"), - rabbitmq.WithConnectionOptionsLogging, - rabbitmq.WithConnectionOptionsReconnectInterval(time.Duration(recon)*time.Millisecond), - ) - if err != nil { - log.Println("[AMQP]", err) - wg := sync.WaitGroup{} - wg.Add(1) - to := time.Second * 3 - utils.WaitTimeout(&wg, to) - return InitConn() - } + conn, err := rabbitmq.NewConn( + os.Getenv("AMQP_URL"), + rabbitmq.WithConnectionOptionsLogging, + rabbitmq.WithConnectionOptionsReconnectInterval(environment.ReconnectionInterval), + ) + if err != nil { + log.Println("[AMQP]", err) + wg := sync.WaitGroup{} + wg.Add(1) + to := environment.ReconnectionInterval + utils.WaitTimeout(&wg, to) + return InitConn() } return conn diff --git a/amqp/types/agent.go b/amqp/types/agent.go index ff7723a..8c57fa9 100644 --- a/amqp/types/agent.go +++ b/amqp/types/agent.go @@ -1,17 +1,18 @@ package types const ( - AgentHello uint8 = iota - AgentError - AgentStatus - AgentPing + AGENT_HELLO uint8 = iota // 0 + AGENT_ERROR // 1 + AGENT_STATUS // 2 + AGENT_PING // 3 ) const ( - AgentStatusPreparing uint8 = iota - AgentStatusRunning - AgentStatusFinished - AgentStatusError + TASK_STATUS_PREPARING uint8 = iota // 0 + TASK_STATUS_RUNNING // 1 + TASK_STATUS_STOPPED // 2 + TASK_STATUS_FINISHED // 3 + TASK_STATUS_ERROR // 4 ) type AgentMessage struct { @@ -26,23 +27,17 @@ type agentMessageData struct { } type agentErrorData struct { - Error string + Error string `json:"error"` } type agentStatusData struct { - Error string `json:"error"` - Status uint8 `json:"status"` - TaskID int64 `json:"taskId"` - Progress uint8 `json:"progress"` // 0-100, for AgentStatusRunning + Status uint8 `json:"status"` + TaskID int64 `json:"taskId"` + Progress uint8 `json:"progress"` // 0-100, for TASK_STATUS_RUNNING } type agentHelloData struct { - Hostname string `json:"hostname"` - Version string `json:"version"` - Interfaces []interfaceData `json:"interfaces"` -} - -type interfaceData struct { - Name string `json:"name"` - IP string `json:"ip"` + Hostname string `json:"hostname"` + Version string `json:"version"` + Interfaces []string `json:"interfaces"` } diff --git a/amqp/types/controller.go b/amqp/types/controller.go index 4216f49..72b874c 100644 --- a/amqp/types/controller.go +++ b/amqp/types/controller.go @@ -1,37 +1,36 @@ package types -import "system-trace/core/types" - -const ( - ControllerTask uint8 = iota +import ( + "system-trace/core/types/constructor" ) const ( - TaskStart uint8 = iota - TaskPause - TaskStop + CONTROLLER_TASK_START uint8 = iota // 0 + CONTROLLER_TASK_PAUSE // 1 + CONTROLLER_TASK_STOP // 2 + CONTROLLER_PONG // 3 + CONTROLLER_GET_TASKS_STATUS // 4 ) type ControllerMessage struct { - Message uint8 `json:"message"` - Data *controllerMessageData `json:"data"` + Message uint8 `json:"message" validate:"required,min=0,max=255"` + Data *controllerMessageData `json:"data" validate:"required"` } type controllerMessageData struct { taskStartData - taskPauseData - taskStopData + taskStatusData + getTasksStatus } -type taskStopData struct { - TaskID int64 `json:"taskId"` +type taskStatusData struct { + TaskID int64 `json:"taskId" validate:"required"` } -type taskPauseData struct { - TaskID int64 +type getTasksStatus struct { + Tasks []int64 `json:"tasks" validate:"required"` } type taskStartData struct { - TaskID int64 - Data *types.TaskVariables `json:"data"` + Data *constructor.TaskData `json:"data" validate:"required"` } diff --git a/amqp/types/queues_map.go b/amqp/types/queues_map.go new file mode 100644 index 0000000..69066c4 --- /dev/null +++ b/amqp/types/queues_map.go @@ -0,0 +1,8 @@ +package types + +import "github.com/wagslane/go-rabbitmq" + +type QueuesPair struct { + Publisher *rabbitmq.Publisher + Consumer *rabbitmq.Consumer +} diff --git a/database/database.go b/database/database.go index e0cfbd6..9d9ad2c 100644 --- a/database/database.go +++ b/database/database.go @@ -19,7 +19,7 @@ import ( var PG *bun.DB var ctx = context.Background() -func Connect() { +func Connect() *bun.DB { pgconn := pgdriver.NewConnector( pgdriver.WithNetwork("tcp"), pgdriver.WithAddr(fmt.Sprintf("%s:%s", os.Getenv("DB_HOST"), os.Getenv("DB_PORT"))), @@ -51,7 +51,7 @@ func Connect() { } } - PG = db + return db } func createSchema(db *bun.DB) error { diff --git a/database/task.go b/database/task.go index 59e91ed..7b7defb 100644 --- a/database/task.go +++ b/database/task.go @@ -11,6 +11,8 @@ type Task struct { ID int64 `bun:",pk,autoincrement"` IssuerID int32 `bun:",notnull" json:"issuerID"` Issuer *User `bun:"rel:belongs-to,join:issuer_id=id" json:"issuer"` + Status int8 `bun:"default:0" json:"status"` + Error string `json:"error"` Configuration map[string]interface{} `bun:"type:jsonb" json:"configuration"` CreatedAt time.Time `bun:",notnull,default:current_timestamp" json:"createdAt"` FinishedAt time.Time `json:"finishedAt"` diff --git a/docs/docs.go b/docs/docs.go index 33753af..b40f023 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -236,6 +236,26 @@ const docTemplate = `{ } } }, + "/plugins": { + "get": { + "description": "Returns key-value map with plugins", + "produces": [ + "application/json" + ], + "summary": "Get list of available plugins", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/constructor.Plugin" + } + } + } + } + } + }, "/users": { "get": { "description": "Returns array of users and count", @@ -498,6 +518,20 @@ const docTemplate = `{ } }, "definitions": { + "constructor.Plugin": { + "type": "object", + "properties": { + "ID": { + "type": "integer" + }, + "name": { + "type": "string" + }, + "onlyUdp": { + "type": "boolean" + } + } + }, "database.Group": { "type": "object", "properties": { diff --git a/docs/swagger.json b/docs/swagger.json index 4fbc02e..e019c96 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -228,6 +228,26 @@ } } }, + "/plugins": { + "get": { + "description": "Returns key-value map with plugins", + "produces": [ + "application/json" + ], + "summary": "Get list of available plugins", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/constructor.Plugin" + } + } + } + } + } + }, "/users": { "get": { "description": "Returns array of users and count", @@ -490,6 +510,20 @@ } }, "definitions": { + "constructor.Plugin": { + "type": "object", + "properties": { + "ID": { + "type": "integer" + }, + "name": { + "type": "string" + }, + "onlyUdp": { + "type": "boolean" + } + } + }, "database.Group": { "type": "object", "properties": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 448eb59..1f45724 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -1,5 +1,14 @@ basePath: /v1 definitions: + constructor.Plugin: + properties: + ID: + type: integer + name: + type: string + onlyUdp: + type: boolean + type: object database.Group: properties: createdAt: @@ -253,6 +262,19 @@ paths: type: integer type: object summary: Get list of permissions + /plugins: + get: + description: Returns key-value map with plugins + produces: + - application/json + responses: + "200": + description: OK + schema: + items: + $ref: '#/definitions/constructor.Plugin' + type: array + summary: Get list of available plugins /users: delete: description: Delete users by ID diff --git a/environment/environment.go b/environment/environment.go index 2d92c29..4ed8eff 100644 --- a/environment/environment.go +++ b/environment/environment.go @@ -3,15 +3,25 @@ package environment import ( "log" "os" + "strconv" + "time" "github.com/joho/godotenv" ) +var ReconnectionInterval time.Duration + func Load() { err := godotenv.Load() if err != nil { log.Fatal("Error loading .env file") } + + if recon, err := strconv.Atoi(os.Getenv("RECONNECT_INTERVAL")); err == nil { + ReconnectionInterval = time.Duration(recon) * time.Millisecond + } else { + log.Fatal("[AMQP]", err) + } } func IsDebug() bool { diff --git a/main.go b/main.go index be05153..d7bd3fd 100644 --- a/main.go +++ b/main.go @@ -1,9 +1,11 @@ package main import ( + "system-trace/core/amqp" "system-trace/core/database" "system-trace/core/environment" "system-trace/core/plugins" + "system-trace/core/services/agents" "system-trace/core/validators" ) @@ -19,6 +21,13 @@ func main() { environment.Load() plugins.LoadPlugins() validators.RegisterValidators() - database.Connect() + + database.PG = database.Connect() + + amqp.Broker = amqp.InitConn() + { + go agents.CreateMainQueueConsumer(amqp.Broker) + } + serveApp() } diff --git a/plugins/plugins.go b/plugins/plugins.go index 57a652c..c90af45 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -4,10 +4,12 @@ import ( "encoding/json" "log" "os" - "system-trace/core/types" + "system-trace/core/types/constructor" + + "github.com/gofiber/fiber/v2" ) -var plugins *[]types.Plugin +var plugins *[]constructor.Plugin func LoadPlugins() { dictpath := "plugins/dictionary/dict.json" @@ -20,3 +22,13 @@ func LoadPlugins() { log.Fatal(err) } } + +// MARK: GetPlugins godoc +// @Summary Get list of available plugins +// @Description Returns array of plugins +// @Produce json +// @Success 200 {object} []constructor.Plugin +// @Router /plugins [get] +func GetPlugins(c *fiber.Ctx) error { + return c.Status(fiber.StatusOK).JSON(plugins) +} diff --git a/router.go b/router.go index 261c2bf..0da253c 100644 --- a/router.go +++ b/router.go @@ -2,6 +2,7 @@ package main import ( _ "system-trace/core/docs" + "system-trace/core/plugins" "system-trace/core/services" "system-trace/core/services/auth" "system-trace/core/services/groups" @@ -17,6 +18,7 @@ func initRouter(app *fiber.App) { { v1.Get("/permissions", services.GetPermissions) + v1.Get("/plugins", plugins.GetPlugins) ag := v1.Group("/auth") { diff --git a/services/agents/agents.go b/services/agents/agents.go new file mode 100644 index 0000000..8863411 --- /dev/null +++ b/services/agents/agents.go @@ -0,0 +1,13 @@ +package agents + +import "system-trace/core/amqp/types" + +var queues = make(map[string]*types.QueuesPair) + +func getQueuePair(hostname string) *types.QueuesPair { + if pair, ok := queues[hostname]; ok { + return pair + } + + return nil +} diff --git a/services/agents/main_queue.go b/services/agents/main_queue.go new file mode 100644 index 0000000..dfc298b --- /dev/null +++ b/services/agents/main_queue.go @@ -0,0 +1,94 @@ +package agents + +import ( + "encoding/json" + "fmt" + "log" + agent "system-trace/core/amqp/types" + + "github.com/wagslane/go-rabbitmq" +) + +var mainQueueName string = "SYSTEM_TRACE_MAIN" + +func CreateMainQueueConsumer(broker *rabbitmq.Conn) { + consumer, err := rabbitmq.NewConsumer( + broker, + mainQueueName, + rabbitmq.WithConsumerOptionsRoutingKey(""), + rabbitmq.WithConsumerOptionsExchangeName("controller"), + rabbitmq.WithConsumerOptionsExchangeDeclare, + ) + if err != nil { + log.Fatal(err) + } + + if err = consumer.Run(handleMessageFromMainQueue); err != nil { + log.Fatal(err) + } +} + +func handleMessageFromMainQueue(message rabbitmq.Delivery) rabbitmq.Action { + am := new(agent.AgentMessage) + if err := json.Unmarshal(message.Body, &am); err != nil { + // TODO error log + return rabbitmq.NackDiscard + } + action := handleAgentMessage(am) + return action +} + +func handleAgentMessage(message *agent.AgentMessage) rabbitmq.Action { + switch message.Message { + case agent.AGENT_HELLO: + { + if message.Data == nil { + fmt.Println("No data in received message hello") + // TODO error log + return rabbitmq.NackDiscard + } + if len(message.Data.Hostname) <= 0 { + fmt.Println("Hostname field are not presented in received message hello") + // TODO error log + return rabbitmq.NackDiscard + } + if len(message.Data.Interfaces) <= 0 { + fmt.Println("Interfaces field are not presented in received message hello") + // TODO error log + return rabbitmq.NackDiscard + } + if len(message.Data.Version) <= 0 { + fmt.Println("Version field are not presented in received message hello") + // TODO error log + return rabbitmq.NackDiscard + } + // TODO create or update agent from hello message + return rabbitmq.Ack + } + case agent.AGENT_ERROR: + { + if message.Data == nil { + fmt.Println("No data in received message hello") + // TODO error log + return rabbitmq.NackDiscard + } + if len(message.Data.Error) <= 0 { + fmt.Println("Error field are not presented in received message hello") + // TODO error log + return rabbitmq.NackDiscard + } + // TODO save log + return rabbitmq.Ack + } + // case agent.AGENT_STATUS: + // { + + // } + // case agent.AGENT_PING: + // { + + // } + } + + return rabbitmq.NackDiscard +} diff --git a/types/plugin.go b/types/constructor/plugin.go similarity index 85% rename from types/plugin.go rename to types/constructor/plugin.go index 489ce30..d95255f 100644 --- a/types/plugin.go +++ b/types/constructor/plugin.go @@ -1,4 +1,4 @@ -package types +package constructor type Plugin struct { ID uint32 `json:"ID"` diff --git a/types/constructor/task.go b/types/constructor/task.go new file mode 100644 index 0000000..2036a79 --- /dev/null +++ b/types/constructor/task.go @@ -0,0 +1,65 @@ +package constructor + +const ( + MODE_THROUGHPUT_BPS uint8 = iota // 0 + MODE_THROUGHPUT_PPS // 1 + MODE_TCP_CONNECTIONS // 2 + MODE_USERS // 3 +) + +const ( + TYPE_VFIO uint8 = iota // 0 + TYPE_POSIX // 1 +) + +const ( + MAC_MODE_ROUND_ROBIN uint8 = iota // 0 // every packet receives round-robin balancing mac + MAC_MODE_IP_HASH // 1 // ip hash, divide and bind to every IPv4/IPv6. MAC-адресов не может быть больше IP-адресов + MAC_MODE_RANDOM // 2 // random MAC every packet + MAC_MODE_GENERATE // 3 // generate mac for every IPv4/IPv6 and bind it +) + +type TaskData struct { + Type uint8 `json:"type" validate:"required,min=0,max=255"` + Mode uint8 `json:"mode" validate:"required,min=0,max=255"` + Time uint32 `json:"time" validate:"required,min=0,max=4294967295"` // in seconds + SourceClient map[string]sourceData `json:"sourceClient" validate:"required"` + SourceReceiver map[string]sourceData `json:"sourceReceiver" validate:"required"` + Plugins []pluginData `json:"plugins" validate:"required"` + Performance performanceData `json:"performance" validate:"required"` + Tweaks tweaksData `json:"tweaks" validate:"required"` // [[time_in_seconds, value]]: [[10, 6570]] +} + +type performanceData struct { // Depends on selected mode + // MODE_THROUGHPUT_BPS + MODE_TCP_CONNECTIONS + MaxBPS uint64 `json:"maxBps" validate:"min=0,max=18446744073709551615"` + // MODE_THROUGHPUT_PPS + MODE_TCP_CONNECTIONS + MaxPPS uint32 `json:"maxPps" validate:"min=0,max=4294967295"` + // MODE_TCP_CONNECTIONS + MaxTCPConnections uint32 `json:"maxTcpConnections" validate:"min=0,max=4294967295"` + // MODE_USERS + MaxUsers uint32 `json:"maxUsers" validate:"min=0,max=4294967295"` +} + +type tweaksData [][]uint8 + +type pluginData struct { + Plugin uint32 `json:"plugin" validate:"min=0,max=4294967295"` + Weight uint8 `json:"weight" validate:"min=0,max=100"` // 0-100 +} + +type sourceData struct { + IPs []string `json:"IPs" validate:"required"` // SRC-IP mask cannot be duplicated into two or more entities within 1 network interface + MACs macData `json:"MACs" validate:"required"` + NextHops macData `json:"nextHops" validate:"required"` +} + +type macData struct { + Mode uint8 `json:"mode" validate:"required,min=0,max=255"` + Addresses []macAddressData `json:"addresses"` +} + +type macAddressData struct { + Address string `json:"address" validate:"required"` + Weight uint8 `json:"weight" validate:"min=0,max=100"` // 0-100 +} diff --git a/types/task.go b/types/task.go deleted file mode 100644 index 248d57a..0000000 --- a/types/task.go +++ /dev/null @@ -1,45 +0,0 @@ -package types - -const ( - MODE_THROUGHPUT_BPS uint8 = iota - MODE_THROUGHPUT_PPS - MODE_TCP_CONNECTIONS - MODE_USERS -) - -type TaskVariables struct { - Mode uint8 `json:"mode" validate:"min=0,max=255"` - Time uint32 `json:"time" validate:"min=0,max=4294967295"` // in seconds - Plugins []pluginData `json:"plugins" validate:"required"` - Performance performanceData `json:"performance" validate:"required"` - Tweaks performanceTweaks `json:"tweaks" validate:"required"` // [[time_in_seconds, value]]: [[10, 6570]] -} - -type performanceData struct { // Depends on selected mode - // MODE_THROUGHPUT_BPS - MaxBPS uint64 `json:"maxBps" validate:"min=0,max=18446744073709551615"` - // MODE_THROUGHPUT_PPS - MaxPPS uint64 `json:"maxPps" validate:"min=0,max=18446744073709551615"` - // MODE_TCP_CONNECTIONS - MaxTCPConnections uint32 `json:"maxTcpConn" validate:"min=0,max=4294967295"` - // MODE_USERS - MaxUsers uint32 `json:"maxUsers" validate:"min=0,max=4294967295"` -} - -type performanceTweaks struct { - // MODE_THROUGHPUT_BPS - BPS tweaksData `json:"bps"` - // MODE_THROUGHPUT_PPS - PPS tweaksData `json:"pps"` - // MODE_TCP_CONNECTIONS - TCPConnections tweaksData `json:"tcpConn"` - // MODE_USERS - Users tweaksData `json:"users"` -} - -type tweaksData [][]uint8 - -type pluginData struct { - Plugin uint32 `json:"plugin" validate:"min=0,max=4294967295"` - Weight uint8 `json:"weight" validate:"min=0,max=100"` // 0-100 -}