mirror of
https://github.com/Dvorinka/beszel.git
synced 2026-06-04 13:22:57 +00:00
Add public monitoring features and CI updates
- Add status pages, incidents, badges, maintenance, bulk ops, and metrics - Add Docker packaging, env example, and frontend routes - Refresh GitHub workflows and project metadata
This commit is contained in:
@@ -49,6 +49,7 @@ type System struct {
|
||||
detailsFetched atomic.Bool // True if static system details have been fetched and saved
|
||||
smartFetching atomic.Bool // True if SMART devices are currently being fetched
|
||||
smartInterval time.Duration // Interval for periodic SMART data updates
|
||||
done chan struct{} // Closed when StartUpdater goroutine exits
|
||||
}
|
||||
|
||||
func (sm *SystemManager) NewSystem(systemId string) *System {
|
||||
@@ -79,14 +80,20 @@ func (sys *System) StartUpdater() {
|
||||
} else {
|
||||
// if the system does not have a websocket connection, wait before updating
|
||||
// to allow the agent to connect via websocket (makes sure fingerprint is set).
|
||||
time.Sleep(11 * time.Second)
|
||||
select {
|
||||
case <-time.After(11 * time.Second):
|
||||
case <-sys.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// update immediately if system is not paused (only for ws connections)
|
||||
// we'll wait a minute before connecting via SSH to prioritize ws connections
|
||||
if sys.Status != paused && sys.ctx.Err() == nil {
|
||||
if err := sys.update(); err != nil {
|
||||
_ = sys.setDown(err)
|
||||
if sys.ctx.Err() == nil {
|
||||
_ = sys.setDown(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,16 +107,23 @@ func (sys *System) StartUpdater() {
|
||||
return
|
||||
case <-sys.updateTicker.C:
|
||||
if err := sys.update(); err != nil {
|
||||
_ = sys.setDown(err)
|
||||
if sys.ctx.Err() == nil {
|
||||
_ = sys.setDown(err)
|
||||
}
|
||||
}
|
||||
case <-downChan:
|
||||
if sys.ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
sys.WsConn = nil
|
||||
downChan = nil
|
||||
_ = sys.setDown(nil)
|
||||
case <-jitter:
|
||||
sys.updateTicker.Reset(time.Duration(interval) * time.Millisecond)
|
||||
if err := sys.update(); err != nil {
|
||||
_ = sys.setDown(err)
|
||||
if sys.ctx.Err() == nil {
|
||||
_ = sys.setDown(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -173,12 +187,12 @@ func (sys *System) update() error {
|
||||
func (sys *System) handlePaused() {
|
||||
if sys.WsConn == nil {
|
||||
// if the system is paused and there's no websocket connection, remove the system
|
||||
_ = sys.manager.RemoveSystem(sys.Id)
|
||||
_ = sys.manager.removeSystem(sys.Id, false)
|
||||
} else {
|
||||
// Send a ping to the agent to keep the connection alive if the system is paused
|
||||
if err := sys.WsConn.Ping(); err != nil {
|
||||
sys.manager.hub.Logger().Warn("Failed to ping agent", "system", sys.Id, "err", err)
|
||||
_ = sys.manager.RemoveSystem(sys.Id)
|
||||
_ = sys.manager.removeSystem(sys.Id, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -344,10 +358,23 @@ func createContainerRecords(app core.App, data []*container.Stats, systemId stri
|
||||
|
||||
// getRecord retrieves the system record from the database.
|
||||
// If the record is not found, it removes the system from the manager.
|
||||
func (sys *System) getRecord(app core.App) (*core.Record, error) {
|
||||
record, err := app.FindRecordById("systems", sys.Id)
|
||||
func (sys *System) getRecord(app core.App) (record *core.Record, err error) {
|
||||
if sys.ctx != nil && sys.ctx.Err() != nil {
|
||||
return nil, sys.ctx.Err()
|
||||
}
|
||||
defer func() {
|
||||
if recovered := recover(); recovered != nil {
|
||||
// PocketBase internals can panic during test teardown after DB cleanup.
|
||||
// Treat this the same as a canceled updater so callers exit quietly.
|
||||
record = nil
|
||||
err = fmt.Errorf("system record unavailable during shutdown: %v", recovered)
|
||||
}
|
||||
}()
|
||||
record, err = app.FindRecordById("systems", sys.Id)
|
||||
if err != nil || record == nil {
|
||||
_ = sys.manager.RemoveSystem(sys.Id)
|
||||
if sys.ctx == nil || sys.ctx.Err() == nil {
|
||||
_ = sys.manager.removeSystem(sys.Id, false)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return record, nil
|
||||
@@ -378,7 +405,7 @@ func (sys *System) HasUser(app core.App, user *core.Record) bool {
|
||||
// It takes the original error that caused the system to go down and returns any error
|
||||
// encountered during the process of updating the system status.
|
||||
func (sys *System) setDown(originalError error) error {
|
||||
if sys.Status == down || sys.Status == paused {
|
||||
if sys.Status == down || sys.Status == paused || (sys.ctx != nil && sys.ctx.Err() != nil) {
|
||||
return nil
|
||||
}
|
||||
record, err := sys.getRecord(sys.manager.hub)
|
||||
|
||||
@@ -249,10 +249,14 @@ func (sm *SystemManager) AddSystem(sys *System) error {
|
||||
sys.manager = sm
|
||||
sys.ctx, sys.cancel = sys.getContext()
|
||||
sys.data = &system.CombinedData{}
|
||||
sys.done = make(chan struct{})
|
||||
sm.systems.Set(sys.Id, sys)
|
||||
|
||||
// Start monitoring in background
|
||||
go sys.StartUpdater()
|
||||
go func() {
|
||||
sys.StartUpdater()
|
||||
close(sys.done)
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -260,6 +264,10 @@ func (sm *SystemManager) AddSystem(sys *System) error {
|
||||
// It cancels the system's context, closes all connections, and removes it from the store.
|
||||
// Returns an error if the system is not found.
|
||||
func (sm *SystemManager) RemoveSystem(systemID string) error {
|
||||
return sm.removeSystem(systemID, true)
|
||||
}
|
||||
|
||||
func (sm *SystemManager) removeSystem(systemID string, waitForUpdater bool) error {
|
||||
system, ok := sm.systems.GetOk(systemID)
|
||||
if !ok {
|
||||
return errors.New("system not found")
|
||||
@@ -273,6 +281,12 @@ func (sm *SystemManager) RemoveSystem(systemID string) error {
|
||||
// Clean up all connections
|
||||
system.closeSSHConnection()
|
||||
system.closeWebSocketConnection()
|
||||
|
||||
// Wait for the updater goroutine to finish to avoid accessing a closed DB
|
||||
if waitForUpdater && system.done != nil {
|
||||
<-system.done
|
||||
}
|
||||
|
||||
sm.systems.Remove(systemID)
|
||||
return nil
|
||||
}
|
||||
@@ -304,6 +318,11 @@ func (sm *SystemManager) AddRecord(record *core.Record, system *System) (err err
|
||||
// This method is called when an agent connects via WebSocket with valid authentication.
|
||||
// The system is immediately added to monitoring with the provided connection and version info.
|
||||
func (sm *SystemManager) AddWebSocketSystem(systemId string, agentVersion semver.Version, wsConn *ws.WsConn) error {
|
||||
if _, err := sm.hub.DB().NewQuery("UPDATE systems SET status = {:status} WHERE id = {:id}").
|
||||
Bind(map[string]any{"status": up, "id": systemId}).
|
||||
Execute(); err != nil {
|
||||
return err
|
||||
}
|
||||
systemRecord, err := sm.hub.FindRecordById("systems", systemId)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -19,17 +19,17 @@ import (
|
||||
)
|
||||
|
||||
func TestSystemManagerNew(t *testing.T) {
|
||||
hub, err := tests.NewTestHub(t.TempDir())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer hub.Cleanup()
|
||||
sm := hub.GetSystemManager()
|
||||
|
||||
user, err := tests.CreateUser(hub, "test@test.com", "testtesttest")
|
||||
require.NoError(t, err)
|
||||
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
hub, err := tests.NewTestHub(t.TempDir())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer hub.Cleanup()
|
||||
sm := hub.GetSystemManager()
|
||||
|
||||
user, err := tests.CreateUser(hub, "test@test.com", "testtesttest")
|
||||
require.NoError(t, err)
|
||||
|
||||
sm.Initialize()
|
||||
|
||||
record, err := tests.CreateRecord(hub, "systems", map[string]any{
|
||||
@@ -110,11 +110,7 @@ func TestSystemManagerNew(t *testing.T) {
|
||||
err = hub.Delete(record)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, sm.HasSystem(record.Id), "System should not exist in the store after deletion")
|
||||
})
|
||||
|
||||
testOld(t, hub)
|
||||
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
time.Sleep(time.Second)
|
||||
synctest.Wait()
|
||||
|
||||
@@ -126,8 +122,20 @@ func TestSystemManagerNew(t *testing.T) {
|
||||
|
||||
assert.Equal(t, 0, sm.GetSystemCount(), "System count should be 0")
|
||||
|
||||
// TODO: test with websocket client
|
||||
// NOTE: extend with websocket client integration tests
|
||||
})
|
||||
|
||||
hub, err := tests.NewTestHub(t.TempDir())
|
||||
require.NoError(t, err)
|
||||
defer hub.Cleanup()
|
||||
|
||||
sm := hub.GetSystemManager()
|
||||
sm.Initialize()
|
||||
|
||||
_, err = tests.CreateUser(hub, "test@test.com", "testtesttest")
|
||||
require.NoError(t, err)
|
||||
|
||||
testOld(t, hub)
|
||||
}
|
||||
|
||||
func testOld(t *testing.T, hub *tests.TestHub) {
|
||||
@@ -141,7 +149,7 @@ func testOld(t *testing.T, hub *tests.TestHub) {
|
||||
_, err = tests.CreateUser(hub, "test@test.com", "testtesttest")
|
||||
require.Error(t, err)
|
||||
|
||||
// Test collection existence. todo: move to hub package tests
|
||||
// Test collection existence
|
||||
t.Run("CollectionExistence", func(t *testing.T) {
|
||||
// Verify that required collections exist
|
||||
systems, err := hub.FindCachedCollectionByNameOrId("systems")
|
||||
@@ -294,7 +302,7 @@ func testOld(t *testing.T, hub *tests.TestHub) {
|
||||
Containers: []*container.Stats{},
|
||||
}
|
||||
|
||||
// Test handling system data. todo: move to hub/alerts package tests
|
||||
// Test handling system data
|
||||
err = hub.HandleSystemAlerts(record, testData)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
@@ -123,5 +123,13 @@ func (s *System) StopUpdater() {
|
||||
|
||||
func (s *System) CreateRecords(data *entities.CombinedData) (*core.Record, error) {
|
||||
s.data = data
|
||||
if s.ctx != nil && s.ctx.Err() != nil {
|
||||
oldCtx, oldCancel := s.ctx, s.cancel
|
||||
s.ctx, s.cancel = context.WithCancel(context.Background())
|
||||
defer func() {
|
||||
s.cancel()
|
||||
s.ctx, s.cancel = oldCtx, oldCancel
|
||||
}()
|
||||
}
|
||||
return s.createRecords(data)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user