async.go

v0.2.0
Doc Versions Source
1
package async
2
3
import (
4
	"context"
5
	"sync/atomic"
6
7
	"go.bigb.es/auxilia/fn"
8
	"go.bigb.es/auxilia/goroutine"
9
)
10
11
var runningCount atomic.Int64
12
13
// RunningCount returns the number of currently executing async tasks.
14
func RunningCount() int {
15
	return int(runningCount.Load())
16
}
17
18
// ErrFunc is a function that only returns an error.
19
type ErrFunc[T any] func(context.Context) error
20
21
// Func is a function that returns a value and an error.
22
type Func[T any] func(context.Context) (T, error)
23
24
// WrapErrFunc wraps an [ErrFunc] into a [Func] that returns the zero value of T.
25
func WrapErrFunc[T any](f ErrFunc[T]) Func[T] {
26
	return func(ctx context.Context) (T, error) { return fn.Zero[T](), f(ctx) }
27
}
28
29
// Task is a generic async task that runs a function in a goroutine
30
// and provides methods to wait for, poll, or cancel it.
31
type Task[T any] struct {
32
	executor Func[T]
33
	await    chan struct{}
34
35
	val    T
36
	err    error
37
	cancel context.CancelFunc
38
39
	id       uint64
40
	registry *Registry
41
}
42
43
// Start creates and starts a new [Task] with the given executor.
44
// The task runs in a new goroutine with a derived cancellable context.
45
// Not thread-safe — do not call concurrently for the same Task instance.
46
func Start[T any](ctx context.Context, executor Func[T]) *Task[T] {
47
	ctx, cancelFn := context.WithCancel(ctx)
48
	return (&Task[T]{
49
		executor: executor,
50
		cancel:   cancelFn,
51
	}).run(ctx)
52
}
53
54
// StartWith creates and starts a new [Task], registering it in the given
55
// [Registry] with the provided tags for later investigation.
56
func StartWith[T any](ctx context.Context, registry *Registry, executor Func[T], tags ...string) *Task[T] {
57
	ctx, cancelFn := context.WithCancel(ctx)
58
	return (&Task[T]{
59
		executor: executor,
60
		cancel:   cancelFn,
61
		registry: registry,
62
	}).run(ctx, tags...)
63
}
64
65
func (p *Task[T]) run(ctx context.Context, tags ...string) *Task[T] {
66
	if p.await != nil {
67
		panic("async: task already started")
68
	}
69
	p.await = make(chan struct{})
70
71
	runningCount.Add(1)
72
73
	started := make(chan uint64, 1)
74
	go func() {
75
		defer runningCount.Add(-1)
76
		defer close(p.await)
77
78
		if p.registry != nil {
79
			goid := goroutine.ID()
80
			p.id = p.registry.register(p.await, goid, tags)
81
			started <- p.id
82
		} else {
83
			started <- 0
84
		}
85
86
		p.val, p.err = p.executor(ctx)
87
	}()
88
89
	<-started
90
	return p
91
}
92
93
// ID returns the task's registry ID, or 0 if not registered.
94
func (p *Task[T]) ID() uint64 {
95
	return p.id
96
}
97
98
// IsDone reports whether the task has finished.
99
func (p *Task[T]) IsDone() bool {
100
	if p.await == nil {
101
		panic("async: task not started")
102
	}
103
104
	select {
105
	case <-p.await:
106
		return true
107
	default:
108
		return false
109
	}
110
}
111
112
// C returns the channel that is closed when the task completes.
113
func (p *Task[T]) C() <-chan struct{} {
114
	return p.await
115
}
116
117
// Poll returns the task result without blocking.
118
// The second return value indicates whether the task has completed.
119
func (p *Task[T]) Poll() (T, bool, error) {
120
	if p.await == nil {
121
		panic("async: task not started")
122
	} else if !p.IsDone() {
123
		return p.val, false, p.err
124
	}
125
126
	return p.val, true, p.err
127
}
128
129
// Wait blocks until the task completes or ctx is cancelled.
130
// The second return value indicates whether the task completed (true)
131
// or the context was cancelled (false).
132
func (p *Task[T]) Wait(ctx context.Context) (T, bool, error) {
133
	if p.await == nil {
134
		panic("async: task not started")
135
	}
136
137
	select {
138
	case <-ctx.Done():
139
		return p.val, false, ctx.Err()
140
	case <-p.await:
141
		return p.val, true, p.err
142
	}
143
}
144
145
// Cancel cancels the task's internal context.
146
func (p *Task[T]) Cancel() {
147
	if p.await == nil {
148
		panic("async: task not started")
149
	}
150
151
	p.cancel()
152
}
153
154
// CancelAndWait cancels the task and waits for it to finish.
155
func (p *Task[T]) CancelAndWait(ctx context.Context) (T, bool, error) {
156
	p.Cancel()
157
	return p.Wait(ctx)
158
}
159
160
// NewDummy creates a task that immediately completes with the zero value.
161
func NewDummy[T any](ctx context.Context) *Task[T] {
162
	return Start[T](ctx, func(ctx context.Context) (T, error) { return fn.Zero[T](), nil })
163
}
164

Source Files