asset_service.go

v0.2.0
Doc Versions Source
1
package steward
2
3
import (
4
	"context"
5
	"fmt"
6
	"reflect"
7
	"strconv"
8
	"sync"
9
	"time"
10
11
	"github.com/soverenio/vanilla/throw"
12
13
	"go.bigb.es/auxilia/async"
14
	"go.bigb.es/auxilia/fn"
15
	"go.bigb.es/auxilia/internal/logutil"
16
)
17
18
type AssetStatus int
19
20
const (
21
	AssetStatusUnknown AssetStatus = iota
22
	AssetStatusInjected
23
	AssetStatusInitialized
24
	AssetStatusStarted
25
	AssetStatusStopping
26
	AssetStatusGracefulStopped
27
	AssetStatusStopped
28
	AssetStatusDestroyed
29
)
30
31
const callStopWarningDuration = 100 * time.Millisecond
32
33
var _ Asset = &serviceAsset{}
34
35
type serviceAsset struct {
36
	mu     sync.Mutex
37
	status AssetStatus
38
39
	dependencies   []*serviceAsset
40
	dependents     []*serviceAsset
41
	value          any
42
	reflectedValue reflect.Value
43
	roots          []assetTreeNode
44
45
	// properties
46
	root         bool        // must be started/stopped even if it is not a dependency
47
	ignoreUnused bool        // ignore unused assets
48
	singleton    interface{} // singleton assets are always root assets
49
}
50
51
func ServiceAsset(value any, opts ...ComponentOption) (Asset, error) {
52
	asset := &serviceAsset{
53
		value:          value,
54
		reflectedValue: reflect.ValueOf(value),
55
	}
56
	for _, opt := range opts {
57
		opt(asset)
58
	}
59
	return asset, nil
60
}
61
62
func NewServiceAsset(value any, opts ...ComponentOption) (Asset, error) { return ServiceAsset(value, opts...) }
63
64
func MustServiceAsset(value any, opts ...ComponentOption) Asset {
65
	return fn.Must1(ServiceAsset(value, opts...))
66
}
67
68
func MustNewServiceAsset(value any, opts ...ComponentOption) Asset { return MustServiceAsset(value, opts...) }
69
70
func SingletonAsset(key any, value any, opts ...ComponentOption) (Asset, error) {
71
	asset, err := ServiceAsset(value, append(opts, Singleton(key))...)
72
	if err != nil {
73
		return nil, err
74
	}
75
	return fn.FirstOf2(RegisterSingleton(key, asset)), nil
76
}
77
78
func MustSingletonAsset(key any, value any, opts ...ComponentOption) Asset {
79
	return fn.Must1(SingletonAsset(key, value, opts...))
80
}
81
82
func (c *serviceAsset) asset() {}
83
84
func (c *serviceAsset) String() string {
85
	return c.Name()
86
}
87
88
// TryInjectTo tries to inject the asset into the component on the given field.
89
func (c *serviceAsset) TryInjectTo(component *serviceAsset, field string) (injected bool) {
90
	objectValue := component.reflectedValue.Elem().FieldByName(field)
91
92
	c.mu.Lock()
93
	defer c.mu.Unlock()
94
95
	// if objectValue is slice then check assignable to slice element type
96
	if objectValue.Kind() == reflect.Slice {
97
		if !c.TypeAssignableTo(objectValue.Type().Elem()) {
98
			return false
99
		}
100
101
		component.dependencies = append(component.dependencies, c)
102
		// TODO: may be we need a better way to handle this
103
		if c.status < AssetStatusStarted {
104
			c.dependents = append(c.dependents, component)
105
		}
106
107
		// append to slice
108
		objectValue.Set(reflect.Append(objectValue, c.reflectedValue))
109
110
		return true
111
	}
112
113
	if !c.TypeAssignableTo(objectValue.Type()) {
114
		return false
115
	}
116
117
	component.dependencies = append(component.dependencies, c)
118
	// TODO: may be we need a better way to handle this
119
	if c.status < AssetStatusStarted {
120
		c.dependents = append(c.dependents, component)
121
	}
122
123
	objectValue.Set(c.reflectedValue)
124
125
	return true
126
}
127
128
func (c *serviceAsset) Name() string {
129
	return reflect.TypeOf(c.value).Elem().String()
130
}
131
132
func (c *serviceAsset) populateConfiguration(ctx context.Context, configurations []*configurationAsset) {
133
	log := logutil.FromContext(ctx).With(
134
		"component", "manager",
135
		"InjectorType", "configuration",
136
	)
137
138
	structFieldIteratorFromType(c.reflectedValue).Filter(func(d structField) bool {
139
		switch {
140
		case !hasTag(d.First(), "config"):
141
			return false
142
		case !isPublic(d.First()):
143
			// better to catch that early on
144
			panic(fmt.Sprintf("we've got config tag on private field: target=%s targetField=%s",
145
				c.Name(), d.First().Name))
146
		case d.First().Type.Kind() != reflect.Struct:
147
			// better to catch that early on
148
			panic(fmt.Sprintf("we've got config tag on invalid type: target=%s targetField=%s expected=%s got=%s",
149
				c.Name(), d.First().Name, reflect.Struct.String(), d.First().Type.Kind().String()))
150
		default:
151
			return true
152
		}
153
	}).Apply(func(d structField) {
154
		for _, dep := range configurations {
155
			if !dep.TryInjectTo(c, d.First().Name) {
156
				continue
157
			}
158
159
			log.Debug("injecting configuration",
160
				"target", c.Name()+"."+d.First().Name,
161
				"dependency", d.First().Type.Name(),
162
			)
163
164
			return
165
		}
166
167
		panic(fmt.Sprintf("failed to find dependency: target=%s targetField=%s dependencyType=%s",
168
			c.Name(), d.First().Name, d.First().Type.String()))
169
	})
170
}
171
172
func (c *serviceAsset) populateDependencies(ctx context.Context, assets []*serviceAsset) {
173
	log := logutil.FromContext(ctx).With(
174
		"component", "manager",
175
		"InjectorType", "injections",
176
	)
177
178
	structFieldIteratorFromType(c.reflectedValue).Filter(func(d structField) bool {
179
		switch {
180
		case !hasTag(d.First(), "inject"):
181
			return false
182
		case !isPublic(d.First()):
183
			// better to catch that early on
184
			panic(fmt.Sprintf("we've got inject tag on private field: target=%s targetField=%s",
185
				c.Name(), d.First().Name))
186
		case !isAllowedInjectType(d.First().Type):
187
			// better to catch that early on
188
			panic(fmt.Sprintf("we've got inject tag on invalid type: target=%s targetField=%s expected=%s got=%s",
189
				c.Name(), d.First().Name, "interface/pointer to struct/list of interface or pointer to struct", d.First().Type.Kind().String()))
190
		default:
191
			return true
192
		}
193
	}).Apply(func(d structField) {
194
		var (
195
			found          bool
196
			stopAfterFirst bool
197
198
			fieldName = d.First().Name
199
			fieldType = d.First().Type
200
		)
201
202
		switch {
203
		case isSingleInjectType(fieldType):
204
			stopAfterFirst = true
205
		case isMultiInjectType(fieldType):
206
			stopAfterFirst = false
207
		default:
208
			panic(throw.IllegalState())
209
		}
210
211
		// fieldPath is used for logging and consists of component name and field name
212
		fieldPath := c.Name() + "." + fieldName
213
214
		for _, dep := range assets {
215
			// full fieldPathWithIdx looks like <component>.<field>[<idx>]
216
			fieldPathWithIdx := fieldPath
217
			if !stopAfterFirst { // if we're inserting into slice then we need to add index
218
				fieldPathWithIdx += "[" + strconv.Itoa(d.Second().Len()) + "]"
219
			}
220
221
			switch {
222
			case dep == c:
223
			case dep.TryInjectTo(c, fieldName):
224
				log.Debug("injecting component",
225
					"target", fieldPathWithIdx,
226
					"dependency", dep.reflectedValue.Type().String(),
227
				)
228
				found = true
229
230
				if stopAfterFirst {
231
					return
232
				}
233
			}
234
		}
235
236
		if !found && !hasTagBool(d.First(), "optional") {
237
			panic(fmt.Sprintf("failed to find dependency: target=%s targetField=%s dependencyType=%s",
238
				c.Name(), fieldName, fieldType.String()))
239
		}
240
	})
241
242
}
243
244
// TypeAssignableTo returns true if the asset type is assignable to the given type.
245
func (c *serviceAsset) TypeAssignableTo(r reflect.Type) bool {
246
	return c.reflectedValue.Type().AssignableTo(r)
247
}
248
249
type ComponentOption func(asset *serviceAsset)
250
251
// Root marks the asset as a root asset. Root assets are always started/stopped.
252
func Root() ComponentOption {
253
	return func(asset *serviceAsset) {
254
		asset.root = true
255
	}
256
}
257
258
func IgnoreUnused() ComponentOption {
259
	return func(asset *serviceAsset) {
260
		asset.ignoreUnused = true
261
	}
262
}
263
264
// Singleton marks the asset as a singleton asset.
265
// Singleton assets are always root assets and created only once.
266
// NB: Singleton assets cant depend on other assets, for now.
267
func Singleton(singletonKey interface{}) ComponentOption {
268
	if singletonKey == nil || !isComparable(singletonKey) {
269
		panic(throw.IllegalValue())
270
	}
271
272
	return func(asset *serviceAsset) {
273
		asset.singleton = singletonKey
274
		asset.root = true
275
	}
276
}
277
278
func (c *serviceAsset) IsSingleton() bool {
279
	return c.singleton != nil
280
}
281
282
func (c *serviceAsset) NeedStart() bool {
283
	return c.root || c.dependents != nil
284
}
285
286
func (c *serviceAsset) CallInit(ctx context.Context) error {
287
	c.mu.Lock()
288
	defer c.mu.Unlock()
289
290
	switch {
291
	case c.status == AssetStatusInjected:
292
		c.status = AssetStatusInitialized
293
	case c.singleton == nil:
294
		panic(throw.IllegalState()) // double call of init is not allowed on non-singleton assets
295
	default:
296
		return nil
297
	}
298
299
	_, log := logutil.WithField(ctx, "component", "manager")
300
301
	converted, ok := c.value.(Initer)
302
	if ok {
303
		log.Debug("initializing component", "component", c.Name())
304
305
		return converted.Init(ctx)
306
	}
307
308
	return nil
309
}
310
311
func (c *serviceAsset) CallStart(ctx context.Context) error {
312
	c.mu.Lock()
313
	defer c.mu.Unlock()
314
315
	switch {
316
	case c.status == AssetStatusInitialized:
317
		c.status = AssetStatusStarted
318
	case c.singleton == nil:
319
		panic(throw.IllegalState()) // double call of start is not allowed on non-singleton assets
320
	default:
321
		return nil
322
	}
323
324
	if !c.NeedStart() {
325
		return nil
326
	}
327
328
	_, log := logutil.WithField(ctx, "component", "manager")
329
330
	converted, ok := c.value.(Starter)
331
	if ok {
332
		log.Debug("starting component", "component", c.Name())
333
334
		return converted.Start(ctx)
335
	}
336
337
	return nil
338
}
339
340
func (c *serviceAsset) CallStop(ctx context.Context) error {
341
	c.mu.Lock()
342
	defer c.mu.Unlock()
343
344
	startTime := time.Now()
345
346
	switch c.status {
347
	case AssetStatusStarted, AssetStatusStopping:
348
		c.status = AssetStatusStopped
349
	default:
350
		return nil
351
	}
352
353
	if !c.NeedStart() {
354
		return nil
355
	}
356
357
	_, log := logutil.WithField(ctx, "component", "manager")
358
359
	converted, ok := c.value.(Stopper)
360
	if !ok {
361
		return nil
362
	}
363
364
	log.Info("stopping component (CallStop)", "component", c.Name())
365
366
	err := converted.Stop(ctx)
367
	if err != nil {
368
		return err
369
	}
370
371
	log.Info("stopped component (CallStop)", "component", c.Name())
372
373
	dur := time.Since(startTime)
374
	if dur > callStopWarningDuration {
375
		logutil.FromContext(ctx).Warn("CallStop took too long",
376
			"component", c.Name(),
377
			"duration", dur,
378
		)
379
	}
380
381
	return nil
382
}
383
384
// CallGracefulStop calls GracefulStop on the asset if it implements GracefulStopper.
385
// It also honors the ctx timeout.
386
// If the asset does not implement GracefulStopper then it calls Stop on the asset if it implements Stopper.
387
func (c *serviceAsset) CallGracefulStop(ctx context.Context) error {
388
	c.mu.Lock()
389
	defer c.mu.Unlock()
390
391
	switch {
392
	case c.status == AssetStatusStarted:
393
		c.status = AssetStatusStopping
394
	case c.singleton == nil:
395
		panic(throw.IllegalState()) // double call of stop is not allowed on non-singleton assets
396
	default:
397
		return nil
398
	}
399
400
	if !c.NeedStart() {
401
		return nil
402
	}
403
404
	_, log := logutil.WithField(ctx, "component", "manager")
405
406
	converted, ok := c.value.(GracefulStopper)
407
	if ok {
408
		log.Info("gracefully stopping component", "component", c.Name())
409
410
		task := async.WrapErrFunc[struct{}](converted.GracefulStop)
411
		err := fn.ThirdOf3(async.Start(ctx, task).Wait(ctx))
412
		if err != nil {
413
			return err
414
		}
415
416
		c.status = AssetStatusGracefulStopped
417
		log.Info("gracefully stopped component", "component", c.Name())
418
419
		return nil
420
	}
421
422
	convertedStopper, ok := c.value.(Stopper)
423
	if ok {
424
		log.Info("stopping component (CallGracefulStop)", "component", c.Name())
425
426
		err := convertedStopper.Stop(ctx)
427
		if err != nil {
428
			return err
429
		}
430
431
		c.status = AssetStatusStopped
432
		log.Info("stopped component (CallGracefulStop)", "component", c.Name())
433
434
		return nil
435
	}
436
437
	return nil
438
}
439
440
func (c *serviceAsset) CallHealthCheck(ctx context.Context) error {
441
	c.mu.Lock()
442
	defer c.mu.Unlock()
443
444
	if c.status != AssetStatusStarted {
445
		return throw.New("asset is not running", struct{ Asset string }{Asset: c.Name()})
446
	}
447
448
	converted, ok := c.value.(HealthChecker)
449
	if !ok {
450
		return nil
451
	}
452
	return converted.HealthCheck(ctx)
453
}
454
455
func (c *serviceAsset) CallDestroy(ctx context.Context) error {
456
	c.mu.Lock()
457
	defer c.mu.Unlock()
458
459
	switch c.status {
460
	case AssetStatusGracefulStopped, AssetStatusStopped:
461
		c.status = AssetStatusDestroyed
462
	default:
463
		// No exit here
464
	}
465
466
	if !c.NeedStart() {
467
		return nil
468
	}
469
470
	_, log := logutil.WithField(ctx, "component", "manager")
471
472
	converted, ok := c.value.(Destroyer)
473
	if !ok {
474
		c.status = AssetStatusDestroyed // For CheckStopped correctness
475
476
		return nil
477
	}
478
479
	log.Info("destroying component", "component", c.Name())
480
481
	err := converted.Destroy(ctx)
482
	if err != nil {
483
		log.Info("destroyed component", "component", c.Name())
484
	}
485
486
	return err
487
}
488

Source Files