registry.go

v0.2.0
Doc Versions Source
1
package async
2
3
import (
4
	"sync"
5
	"sync/atomic"
6
	"time"
7
8
	"go.bigb.es/auxilia/goroutine"
9
)
10
11
// TaskInfo holds metadata about a running task for debugging and investigation.
12
type TaskInfo struct {
13
	ID          uint64
14
	Tags        []string
15
	StartTime   time.Time
16
	GoroutineID uint64
17
	done        <-chan struct{}
18
}
19
20
// IsRunning reports whether the task is still running.
21
func (ti *TaskInfo) IsRunning() bool {
22
	select {
23
	case <-ti.done:
24
		return false
25
	default:
26
		return true
27
	}
28
}
29
30
// Stacktrace returns the stack trace of the goroutine running this task.
31
// Returns empty string if the task has already finished.
32
func (ti *TaskInfo) Stacktrace() string {
33
	if !ti.IsRunning() {
34
		return ""
35
	}
36
	return goroutine.Stack(ti.GoroutineID)
37
}
38
39
var taskIDSeq atomic.Uint64
40
41
// Registry stores metadata about all running tasks for investigation.
42
type Registry struct {
43
	mu    sync.RWMutex
44
	tasks map[uint64]*TaskInfo
45
}
46
47
// NewRegistry creates a new [Registry].
48
func NewRegistry() *Registry {
49
	return &Registry{
50
		tasks: make(map[uint64]*TaskInfo),
51
	}
52
}
53
54
func (r *Registry) register(done <-chan struct{}, goroutineID uint64, tags []string) uint64 {
55
	id := taskIDSeq.Add(1)
56
	info := &TaskInfo{
57
		ID:          id,
58
		Tags:        tags,
59
		StartTime:   time.Now(),
60
		GoroutineID: goroutineID,
61
		done:        done,
62
	}
63
64
	r.mu.Lock()
65
	r.tasks[id] = info
66
	r.mu.Unlock()
67
68
	go func() {
69
		<-done
70
		r.mu.Lock()
71
		delete(r.tasks, id)
72
		r.mu.Unlock()
73
	}()
74
75
	return id
76
}
77
78
// Get returns the [TaskInfo] for a task by ID, or nil if not found.
79
func (r *Registry) Get(id uint64) *TaskInfo {
80
	r.mu.RLock()
81
	defer r.mu.RUnlock()
82
	return r.tasks[id]
83
}
84
85
// All returns a snapshot of all currently running tasks.
86
func (r *Registry) All() []*TaskInfo {
87
	r.mu.RLock()
88
	defer r.mu.RUnlock()
89
90
	out := make([]*TaskInfo, 0, len(r.tasks))
91
	for _, info := range r.tasks {
92
		out = append(out, info)
93
	}
94
	return out
95
}
96
97
// ByTag returns all running tasks that have the given tag.
98
func (r *Registry) ByTag(tag string) []*TaskInfo {
99
	r.mu.RLock()
100
	defer r.mu.RUnlock()
101
102
	var out []*TaskInfo
103
	for _, info := range r.tasks {
104
		for _, t := range info.Tags {
105
			if t == tag {
106
				out = append(out, info)
107
				break
108
			}
109
		}
110
	}
111
	return out
112
}
113
114
// Len returns the number of currently registered tasks.
115
func (r *Registry) Len() int {
116
	r.mu.RLock()
117
	defer r.mu.RUnlock()
118
	return len(r.tasks)
119
}
120
121
122

Source Files