stage WIP

This commit is contained in:
Vitaliy Pavlov 2024-08-23 20:57:09 +07:00
parent 327208b333
commit 2c4eaf6809
18 changed files with 358 additions and 105 deletions

View File

@ -3,31 +3,30 @@ package amqp
import (
"log"
"os"
"strconv"
"sync"
"system-trace/core/environment"
"system-trace/core/utils"
"time"
"github.com/wagslane/go-rabbitmq"
)
var Broker *rabbitmq.Conn
func InitConn() *rabbitmq.Conn {
var conn *rabbitmq.Conn
if recon, err := strconv.Atoi(os.Getenv("RMQ_RECONNECT_INTERVAL")); err == nil {
conn, err = rabbitmq.NewConn(
os.Getenv("RMQ_URL"),
conn, err := rabbitmq.NewConn(
os.Getenv("AMQP_URL"),
rabbitmq.WithConnectionOptionsLogging,
rabbitmq.WithConnectionOptionsReconnectInterval(time.Duration(recon)*time.Millisecond),
rabbitmq.WithConnectionOptionsReconnectInterval(environment.ReconnectionInterval),
)
if err != nil {
log.Println("[AMQP]", err)
wg := sync.WaitGroup{}
wg.Add(1)
to := time.Second * 3
to := environment.ReconnectionInterval
utils.WaitTimeout(&wg, to)
return InitConn()
}
}
return conn
}

View File

@ -1,17 +1,18 @@
package types
const (
AgentHello uint8 = iota
AgentError
AgentStatus
AgentPing
AGENT_HELLO uint8 = iota // 0
AGENT_ERROR // 1
AGENT_STATUS // 2
AGENT_PING // 3
)
const (
AgentStatusPreparing uint8 = iota
AgentStatusRunning
AgentStatusFinished
AgentStatusError
TASK_STATUS_PREPARING uint8 = iota // 0
TASK_STATUS_RUNNING // 1
TASK_STATUS_STOPPED // 2
TASK_STATUS_FINISHED // 3
TASK_STATUS_ERROR // 4
)
type AgentMessage struct {
@ -26,23 +27,17 @@ type agentMessageData struct {
}
type agentErrorData struct {
Error string
Error string `json:"error"`
}
type agentStatusData struct {
Error string `json:"error"`
Status uint8 `json:"status"`
TaskID int64 `json:"taskId"`
Progress uint8 `json:"progress"` // 0-100, for AgentStatusRunning
Progress uint8 `json:"progress"` // 0-100, for TASK_STATUS_RUNNING
}
type agentHelloData struct {
Hostname string `json:"hostname"`
Version string `json:"version"`
Interfaces []interfaceData `json:"interfaces"`
}
type interfaceData struct {
Name string `json:"name"`
IP string `json:"ip"`
Interfaces []string `json:"interfaces"`
}

View File

@ -1,37 +1,36 @@
package types
import "system-trace/core/types"
const (
ControllerTask uint8 = iota
import (
"system-trace/core/types/constructor"
)
const (
TaskStart uint8 = iota
TaskPause
TaskStop
CONTROLLER_TASK_START uint8 = iota // 0
CONTROLLER_TASK_PAUSE // 1
CONTROLLER_TASK_STOP // 2
CONTROLLER_PONG // 3
CONTROLLER_GET_TASKS_STATUS // 4
)
type ControllerMessage struct {
Message uint8 `json:"message"`
Data *controllerMessageData `json:"data"`
Message uint8 `json:"message" validate:"required,min=0,max=255"`
Data *controllerMessageData `json:"data" validate:"required"`
}
type controllerMessageData struct {
taskStartData
taskPauseData
taskStopData
taskStatusData
getTasksStatus
}
type taskStopData struct {
TaskID int64 `json:"taskId"`
type taskStatusData struct {
TaskID int64 `json:"taskId" validate:"required"`
}
type taskPauseData struct {
TaskID int64
type getTasksStatus struct {
Tasks []int64 `json:"tasks" validate:"required"`
}
type taskStartData struct {
TaskID int64
Data *types.TaskVariables `json:"data"`
Data *constructor.TaskData `json:"data" validate:"required"`
}

8
amqp/types/queues_map.go Normal file
View File

@ -0,0 +1,8 @@
package types
import "github.com/wagslane/go-rabbitmq"
type QueuesPair struct {
Publisher *rabbitmq.Publisher
Consumer *rabbitmq.Consumer
}

View File

@ -19,7 +19,7 @@ import (
var PG *bun.DB
var ctx = context.Background()
func Connect() {
func Connect() *bun.DB {
pgconn := pgdriver.NewConnector(
pgdriver.WithNetwork("tcp"),
pgdriver.WithAddr(fmt.Sprintf("%s:%s", os.Getenv("DB_HOST"), os.Getenv("DB_PORT"))),
@ -51,7 +51,7 @@ func Connect() {
}
}
PG = db
return db
}
func createSchema(db *bun.DB) error {

View File

@ -11,6 +11,8 @@ type Task struct {
ID int64 `bun:",pk,autoincrement"`
IssuerID int32 `bun:",notnull" json:"issuerID"`
Issuer *User `bun:"rel:belongs-to,join:issuer_id=id" json:"issuer"`
Status int8 `bun:"default:0" json:"status"`
Error string `json:"error"`
Configuration map[string]interface{} `bun:"type:jsonb" json:"configuration"`
CreatedAt time.Time `bun:",notnull,default:current_timestamp" json:"createdAt"`
FinishedAt time.Time `json:"finishedAt"`

View File

@ -236,6 +236,26 @@ const docTemplate = `{
}
}
},
"/plugins": {
"get": {
"description": "Returns key-value map with plugins",
"produces": [
"application/json"
],
"summary": "Get list of available plugins",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/constructor.Plugin"
}
}
}
}
}
},
"/users": {
"get": {
"description": "Returns array of users and count",
@ -498,6 +518,20 @@ const docTemplate = `{
}
},
"definitions": {
"constructor.Plugin": {
"type": "object",
"properties": {
"ID": {
"type": "integer"
},
"name": {
"type": "string"
},
"onlyUdp": {
"type": "boolean"
}
}
},
"database.Group": {
"type": "object",
"properties": {

View File

@ -228,6 +228,26 @@
}
}
},
"/plugins": {
"get": {
"description": "Returns key-value map with plugins",
"produces": [
"application/json"
],
"summary": "Get list of available plugins",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/constructor.Plugin"
}
}
}
}
}
},
"/users": {
"get": {
"description": "Returns array of users and count",
@ -490,6 +510,20 @@
}
},
"definitions": {
"constructor.Plugin": {
"type": "object",
"properties": {
"ID": {
"type": "integer"
},
"name": {
"type": "string"
},
"onlyUdp": {
"type": "boolean"
}
}
},
"database.Group": {
"type": "object",
"properties": {

View File

@ -1,5 +1,14 @@
basePath: /v1
definitions:
constructor.Plugin:
properties:
ID:
type: integer
name:
type: string
onlyUdp:
type: boolean
type: object
database.Group:
properties:
createdAt:
@ -253,6 +262,19 @@ paths:
type: integer
type: object
summary: Get list of permissions
/plugins:
get:
description: Returns key-value map with plugins
produces:
- application/json
responses:
"200":
description: OK
schema:
items:
$ref: '#/definitions/constructor.Plugin'
type: array
summary: Get list of available plugins
/users:
delete:
description: Delete users by ID

View File

@ -3,15 +3,25 @@ package environment
import (
"log"
"os"
"strconv"
"time"
"github.com/joho/godotenv"
)
var ReconnectionInterval time.Duration
func Load() {
err := godotenv.Load()
if err != nil {
log.Fatal("Error loading .env file")
}
if recon, err := strconv.Atoi(os.Getenv("RECONNECT_INTERVAL")); err == nil {
ReconnectionInterval = time.Duration(recon) * time.Millisecond
} else {
log.Fatal("[AMQP]", err)
}
}
func IsDebug() bool {

11
main.go
View File

@ -1,9 +1,11 @@
package main
import (
"system-trace/core/amqp"
"system-trace/core/database"
"system-trace/core/environment"
"system-trace/core/plugins"
"system-trace/core/services/agents"
"system-trace/core/validators"
)
@ -19,6 +21,13 @@ func main() {
environment.Load()
plugins.LoadPlugins()
validators.RegisterValidators()
database.Connect()
database.PG = database.Connect()
amqp.Broker = amqp.InitConn()
{
go agents.CreateMainQueueConsumer(amqp.Broker)
}
serveApp()
}

View File

@ -4,10 +4,12 @@ import (
"encoding/json"
"log"
"os"
"system-trace/core/types"
"system-trace/core/types/constructor"
"github.com/gofiber/fiber/v2"
)
var plugins *[]types.Plugin
var plugins *[]constructor.Plugin
func LoadPlugins() {
dictpath := "plugins/dictionary/dict.json"
@ -20,3 +22,13 @@ func LoadPlugins() {
log.Fatal(err)
}
}
// MARK: GetPlugins godoc
// @Summary Get list of available plugins
// @Description Returns array of plugins
// @Produce json
// @Success 200 {object} []constructor.Plugin
// @Router /plugins [get]
func GetPlugins(c *fiber.Ctx) error {
return c.Status(fiber.StatusOK).JSON(plugins)
}

View File

@ -2,6 +2,7 @@ package main
import (
_ "system-trace/core/docs"
"system-trace/core/plugins"
"system-trace/core/services"
"system-trace/core/services/auth"
"system-trace/core/services/groups"
@ -17,6 +18,7 @@ func initRouter(app *fiber.App) {
{
v1.Get("/permissions", services.GetPermissions)
v1.Get("/plugins", plugins.GetPlugins)
ag := v1.Group("/auth")
{

13
services/agents/agents.go Normal file
View File

@ -0,0 +1,13 @@
package agents
import "system-trace/core/amqp/types"
var queues = make(map[string]*types.QueuesPair)
func getQueuePair(hostname string) *types.QueuesPair {
if pair, ok := queues[hostname]; ok {
return pair
}
return nil
}

View File

@ -0,0 +1,94 @@
package agents
import (
"encoding/json"
"fmt"
"log"
agent "system-trace/core/amqp/types"
"github.com/wagslane/go-rabbitmq"
)
var mainQueueName string = "SYSTEM_TRACE_MAIN"
func CreateMainQueueConsumer(broker *rabbitmq.Conn) {
consumer, err := rabbitmq.NewConsumer(
broker,
mainQueueName,
rabbitmq.WithConsumerOptionsRoutingKey(""),
rabbitmq.WithConsumerOptionsExchangeName("controller"),
rabbitmq.WithConsumerOptionsExchangeDeclare,
)
if err != nil {
log.Fatal(err)
}
if err = consumer.Run(handleMessageFromMainQueue); err != nil {
log.Fatal(err)
}
}
func handleMessageFromMainQueue(message rabbitmq.Delivery) rabbitmq.Action {
am := new(agent.AgentMessage)
if err := json.Unmarshal(message.Body, &am); err != nil {
// TODO error log
return rabbitmq.NackDiscard
}
action := handleAgentMessage(am)
return action
}
func handleAgentMessage(message *agent.AgentMessage) rabbitmq.Action {
switch message.Message {
case agent.AGENT_HELLO:
{
if message.Data == nil {
fmt.Println("No data in received message hello")
// TODO error log
return rabbitmq.NackDiscard
}
if len(message.Data.Hostname) <= 0 {
fmt.Println("Hostname field are not presented in received message hello")
// TODO error log
return rabbitmq.NackDiscard
}
if len(message.Data.Interfaces) <= 0 {
fmt.Println("Interfaces field are not presented in received message hello")
// TODO error log
return rabbitmq.NackDiscard
}
if len(message.Data.Version) <= 0 {
fmt.Println("Version field are not presented in received message hello")
// TODO error log
return rabbitmq.NackDiscard
}
// TODO create or update agent from hello message
return rabbitmq.Ack
}
case agent.AGENT_ERROR:
{
if message.Data == nil {
fmt.Println("No data in received message hello")
// TODO error log
return rabbitmq.NackDiscard
}
if len(message.Data.Error) <= 0 {
fmt.Println("Error field are not presented in received message hello")
// TODO error log
return rabbitmq.NackDiscard
}
// TODO save log
return rabbitmq.Ack
}
// case agent.AGENT_STATUS:
// {
// }
// case agent.AGENT_PING:
// {
// }
}
return rabbitmq.NackDiscard
}

View File

@ -1,4 +1,4 @@
package types
package constructor
type Plugin struct {
ID uint32 `json:"ID"`

65
types/constructor/task.go Normal file
View File

@ -0,0 +1,65 @@
package constructor
const (
MODE_THROUGHPUT_BPS uint8 = iota // 0
MODE_THROUGHPUT_PPS // 1
MODE_TCP_CONNECTIONS // 2
MODE_USERS // 3
)
const (
TYPE_VFIO uint8 = iota // 0
TYPE_POSIX // 1
)
const (
MAC_MODE_ROUND_ROBIN uint8 = iota // 0 // every packet receives round-robin balancing mac
MAC_MODE_IP_HASH // 1 // ip hash, divide and bind to every IPv4/IPv6. MAC-адресов не может быть больше IP-адресов
MAC_MODE_RANDOM // 2 // random MAC every packet
MAC_MODE_GENERATE // 3 // generate mac for every IPv4/IPv6 and bind it
)
type TaskData struct {
Type uint8 `json:"type" validate:"required,min=0,max=255"`
Mode uint8 `json:"mode" validate:"required,min=0,max=255"`
Time uint32 `json:"time" validate:"required,min=0,max=4294967295"` // in seconds
SourceClient map[string]sourceData `json:"sourceClient" validate:"required"`
SourceReceiver map[string]sourceData `json:"sourceReceiver" validate:"required"`
Plugins []pluginData `json:"plugins" validate:"required"`
Performance performanceData `json:"performance" validate:"required"`
Tweaks tweaksData `json:"tweaks" validate:"required"` // [[time_in_seconds, value]]: [[10, 6570]]
}
type performanceData struct { // Depends on selected mode
// MODE_THROUGHPUT_BPS + MODE_TCP_CONNECTIONS
MaxBPS uint64 `json:"maxBps" validate:"min=0,max=18446744073709551615"`
// MODE_THROUGHPUT_PPS + MODE_TCP_CONNECTIONS
MaxPPS uint32 `json:"maxPps" validate:"min=0,max=4294967295"`
// MODE_TCP_CONNECTIONS
MaxTCPConnections uint32 `json:"maxTcpConnections" validate:"min=0,max=4294967295"`
// MODE_USERS
MaxUsers uint32 `json:"maxUsers" validate:"min=0,max=4294967295"`
}
type tweaksData [][]uint8
type pluginData struct {
Plugin uint32 `json:"plugin" validate:"min=0,max=4294967295"`
Weight uint8 `json:"weight" validate:"min=0,max=100"` // 0-100
}
type sourceData struct {
IPs []string `json:"IPs" validate:"required"` // SRC-IP mask cannot be duplicated into two or more entities within 1 network interface
MACs macData `json:"MACs" validate:"required"`
NextHops macData `json:"nextHops" validate:"required"`
}
type macData struct {
Mode uint8 `json:"mode" validate:"required,min=0,max=255"`
Addresses []macAddressData `json:"addresses"`
}
type macAddressData struct {
Address string `json:"address" validate:"required"`
Weight uint8 `json:"weight" validate:"min=0,max=100"` // 0-100
}

View File

@ -1,45 +0,0 @@
package types
const (
MODE_THROUGHPUT_BPS uint8 = iota
MODE_THROUGHPUT_PPS
MODE_TCP_CONNECTIONS
MODE_USERS
)
type TaskVariables struct {
Mode uint8 `json:"mode" validate:"min=0,max=255"`
Time uint32 `json:"time" validate:"min=0,max=4294967295"` // in seconds
Plugins []pluginData `json:"plugins" validate:"required"`
Performance performanceData `json:"performance" validate:"required"`
Tweaks performanceTweaks `json:"tweaks" validate:"required"` // [[time_in_seconds, value]]: [[10, 6570]]
}
type performanceData struct { // Depends on selected mode
// MODE_THROUGHPUT_BPS
MaxBPS uint64 `json:"maxBps" validate:"min=0,max=18446744073709551615"`
// MODE_THROUGHPUT_PPS
MaxPPS uint64 `json:"maxPps" validate:"min=0,max=18446744073709551615"`
// MODE_TCP_CONNECTIONS
MaxTCPConnections uint32 `json:"maxTcpConn" validate:"min=0,max=4294967295"`
// MODE_USERS
MaxUsers uint32 `json:"maxUsers" validate:"min=0,max=4294967295"`
}
type performanceTweaks struct {
// MODE_THROUGHPUT_BPS
BPS tweaksData `json:"bps"`
// MODE_THROUGHPUT_PPS
PPS tweaksData `json:"pps"`
// MODE_TCP_CONNECTIONS
TCPConnections tweaksData `json:"tcpConn"`
// MODE_USERS
Users tweaksData `json:"users"`
}
type tweaksData [][]uint8
type pluginData struct {
Plugin uint32 `json:"plugin" validate:"min=0,max=4294967295"`
Weight uint8 `json:"weight" validate:"min=0,max=100"` // 0-100
}