group.go

v0.2.0
Doc Versions Source
1
package async
2
3
import (
4
	"context"
5
	"errors"
6
	"sync"
7
)
8
9
// ErrNotStarted is returned when operating on a group that hasn't been started.
10
var ErrNotStarted = errors.New("async: group not started")
11
12
type contextIDKey struct{}
13
14
// WithContextID stores an integer ID in the context.
15
func WithContextID(ctx context.Context, id int) context.Context {
16
	return context.WithValue(ctx, contextIDKey{}, id)
17
}
18
19
// ContextID retrieves the integer ID from the context. Panics if absent.
20
func ContextID(ctx context.Context) int {
21
	return ctx.Value(contextIDKey{}).(int)
22
}
23
24
// Group manages a set of async tasks with collective start/stop/wait semantics.
25
type Group[T any] struct {
26
	executors []Func[T]
27
	cancelFn  context.CancelFunc
28
29
	mu    sync.Mutex
30
	tasks []*Task[T]
31
}
32
33
// NewGroup creates an empty [Group].
34
func NewGroup[T any]() *Group[T] {
35
	return &Group[T]{}
36
}
37
38
// NewGroupFixed creates a [Group] pre-populated with the given executors.
39
func NewGroupFixed[T any](executors ...Func[T]) *Group[T] {
40
	return &Group[T]{executors: executors}
41
}
42
43
// NewGroupRepeated creates a [Group] with count copies of the same executor.
44
func NewGroupRepeated[T any](executor Func[T], count int) *Group[T] {
45
	g := &Group[T]{}
46
	for i := 0; i < count; i++ {
47
		g.executors = append(g.executors, executor)
48
	}
49
	return g
50
}
51
52
// AddExecutor adds an executor to the group. Panics if the group is already started.
53
func (g *Group[T]) AddExecutor(executor Func[T]) {
54
	g.mu.Lock()
55
	defer g.mu.Unlock()
56
57
	if g.tasks != nil {
58
		panic("async: cannot add executor to started group")
59
	}
60
61
	g.executors = append(g.executors, executor)
62
}
63
64
// Start launches all executors concurrently. Each task receives a context
65
// with its index stored via [WithContextID]. Panics on double start.
66
func (g *Group[T]) Start(ctx context.Context) error {
67
	g.mu.Lock()
68
	defer g.mu.Unlock()
69
70
	if len(g.tasks) != 0 {
71
		panic("async: group already started")
72
	}
73
74
	ctx, g.cancelFn = context.WithCancel(ctx)
75
	for id, executor := range g.executors {
76
		g.tasks = append(g.tasks, Start[T](WithContextID(ctx, id), executor))
77
	}
78
	return nil
79
}
80
81
// Any blocks until any one task in the group completes or ctx is cancelled.
82
func (g *Group[T]) Any(ctx context.Context) error {
83
	var chans []<-chan struct{}
84
85
	{
86
		g.mu.Lock()
87
88
		if len(g.tasks) == 0 {
89
			g.mu.Unlock()
90
			return ErrNotStarted
91
		}
92
93
		chans = make([]<-chan struct{}, 0, len(g.tasks))
94
		for _, t := range g.tasks {
95
			chans = append(chans, t.C())
96
		}
97
98
		g.mu.Unlock()
99
	}
100
101
	if pos, _, _ := SelectSlice(ctx, chans); pos == -1 {
102
		return ctx.Err()
103
	}
104
	return nil
105
}
106
107
// All blocks until all tasks in the group complete or ctx is cancelled.
108
func (g *Group[T]) All(ctx context.Context) error {
109
	{
110
		g.mu.Lock()
111
		if g.tasks == nil {
112
			g.mu.Unlock()
113
			return ErrNotStarted
114
		}
115
		g.mu.Unlock()
116
	}
117
118
	for _, t := range g.tasks {
119
		_, ok, err := t.Wait(ctx)
120
		if !ok {
121
			return err
122
		}
123
	}
124
125
	return nil
126
}
127
128
// FirstError polls all tasks and returns the first non-nil error found, or nil.
129
func (g *Group[T]) FirstError() error {
130
	g.mu.Lock()
131
	defer g.mu.Unlock()
132
133
	for _, t := range g.tasks {
134
		_, ok, err := t.Poll()
135
		if ok && err != nil {
136
			return err
137
		}
138
	}
139
	return nil
140
}
141
142
// Stop cancels the group's shared context and waits for all tasks to finish.
143
func (g *Group[T]) Stop(ctx context.Context) error {
144
	g.cancelFn()
145
	return g.All(ctx)
146
}
147

Source Files