s3.go

v1.1.0
Doc Versions Source
1
package storage
2
3
import (
4
	"bytes"
5
	"context"
6
	"errors"
7
	"io"
8
9
	"github.com/aws/aws-sdk-go-v2/aws"
10
	awsconfig "github.com/aws/aws-sdk-go-v2/config"
11
	"github.com/aws/aws-sdk-go-v2/service/s3"
12
	"github.com/aws/aws-sdk-go-v2/service/s3/types"
13
14
	"example.com/curator/internal/config"
15
	"example.com/curator/internal/metrics"
16
)
17
18
type S3Storage struct {
19
	client *s3.Client
20
	bucket string
21
}
22
23
func NewS3Storage(ctx context.Context, cfg config.S3Config) (*S3Storage, error) {
24
	opts := []func(*awsconfig.LoadOptions) error{
25
		awsconfig.WithRegion(cfg.Region),
26
	}
27
28
	awsCfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)
29
	if err != nil {
30
		return nil, err
31
	}
32
33
	client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
34
		if cfg.Endpoint != "" {
35
			o.BaseEndpoint = aws.String(cfg.Endpoint)
36
		}
37
		o.UsePathStyle = cfg.UsePathStyle
38
	})
39
40
	return &S3Storage{client: client, bucket: cfg.Bucket}, nil
41
}
42
43
func (s *S3Storage) Get(ctx context.Context, key string) ([]byte, string, error) {
44
	out, err := s.client.GetObject(ctx, &s3.GetObjectInput{
45
		Bucket: &s.bucket,
46
		Key:    &key,
47
	})
48
	if err != nil {
49
		var noKey *types.NoSuchKey
50
		if errors.As(err, &noKey) {
51
			metrics.S3OperationsTotal.WithLabelValues("get", "miss").Inc()
52
			return nil, "", ErrNotFound
53
		}
54
55
		metrics.S3OperationsTotal.WithLabelValues("get", "error").Inc()
56
		return nil, "", err
57
	}
58
	defer out.Body.Close()
59
60
	data, err := io.ReadAll(out.Body)
61
	if err != nil {
62
		metrics.S3OperationsTotal.WithLabelValues("get", "error").Inc()
63
		return nil, "", err
64
	}
65
66
	ct := ""
67
	if out.ContentType != nil {
68
		ct = *out.ContentType
69
	}
70
71
	metrics.S3OperationsTotal.WithLabelValues("get", "hit").Inc()
72
	return data, ct, nil
73
}
74
75
func (s *S3Storage) Put(ctx context.Context, key string, data []byte, contentType string) error {
76
	input := &s3.PutObjectInput{
77
		Bucket: &s.bucket,
78
		Key:    &key,
79
		Body:   bytes.NewReader(data),
80
	}
81
82
	if contentType != "" {
83
		input.ContentType = &contentType
84
	}
85
86
	_, err := s.client.PutObject(ctx, input)
87
	if err != nil {
88
		metrics.S3OperationsTotal.WithLabelValues("put", "error").Inc()
89
		return err
90
	}
91
92
	metrics.S3OperationsTotal.WithLabelValues("put", "ok").Inc()
93
	return nil
94
}
95
96
func (s *S3Storage) BucketStats(ctx context.Context) (objects int64, sizeBytes int64, err error) {
97
	paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{
98
		Bucket: &s.bucket,
99
	})
100
101
	for paginator.HasMorePages() {
102
		page, err := paginator.NextPage(ctx)
103
		if err != nil {
104
			return 0, 0, err
105
		}
106
107
		for _, obj := range page.Contents {
108
			objects++
109
			if obj.Size != nil {
110
				sizeBytes += *obj.Size
111
			}
112
		}
113
	}
114
115
	return objects, sizeBytes, nil
116
}
117

Source Files