Files
gotgt/pkg/scsi/backingstore/s3store/s3store.go
Lei Xue 76ab15b0df feat: add S3-compatible object storage backend
Add a new backend store that enables iSCSI targets backed by
S3-compatible object storage (AWS S3, MinIO, Ceph RGW, etc.).

The implementation uses a chunked storage strategy where the virtual
block device is divided into fixed-size chunks (default 4 MiB), each
stored as an independent S3 object. This enables efficient random
read/write access on top of object storage.

Key features:
- Chunked storage with configurable chunk size
- Sparse device support (unwritten chunks treated as zeros)
- Concurrent multi-chunk I/O via errgroup
- Per-chunk locking for safe read-modify-write
- AWS SDK v2 with default credential chain
- In-process gofakes3 test server (no Docker needed)
- 12 unit tests + 2 integration tests

Also updates CI workflow to run S3 backend tests and updates
README with S3 backend documentation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-16 16:22:57 +08:00

471 lines
13 KiB
Go

/*
Copyright 2024 The GoStor Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package s3store
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"sync"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"github.com/gostor/gotgt/pkg/api"
"github.com/gostor/gotgt/pkg/config"
"github.com/gostor/gotgt/pkg/scsi"
)
const (
S3BackingStorage = "s3"
DefaultChunkSize = 4 * 1024 * 1024 // 4 MiB
metadataKey = "_metadata"
)
func init() {
scsi.RegisterBackingStore(S3BackingStorage, newS3)
}
// s3Client is the minimal S3 interface used by this package, enabling test injection.
type s3Client interface {
GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error)
DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error)
}
// deviceMetadata stores device configuration in S3.
type deviceMetadata struct {
DeviceSize uint64 `json:"deviceSize"`
ChunkSize int64 `json:"chunkSize"`
}
// S3BackingStore implements the BackingStore interface using S3-compatible object storage.
// The virtual block device is divided into fixed-size chunks, each stored as a separate S3 object.
type S3BackingStore struct {
scsi.BaseBackingStore
client s3Client
bucket string
prefix string
chunkSize int64
chunkLocks sync.Map // map[int64]*sync.Mutex
}
func newS3() (api.BackingStore, error) {
return &S3BackingStore{
BaseBackingStore: scsi.BaseBackingStore{
Name: S3BackingStorage,
DataSize: 0,
OflagsSupported: 0,
},
}, nil
}
func (bs *S3BackingStore) lockChunk(idx int64) *sync.Mutex {
val, _ := bs.chunkLocks.LoadOrStore(idx, &sync.Mutex{})
mu := val.(*sync.Mutex)
mu.Lock()
return mu
}
func (bs *S3BackingStore) chunkKey(idx int64) string {
return fmt.Sprintf("%s/chunk_%010d", bs.prefix, idx)
}
func (bs *S3BackingStore) metadataObjKey() string {
return fmt.Sprintf("%s/%s", bs.prefix, metadataKey)
}
func (bs *S3BackingStore) Open(dev *api.SCSILu, path string) error {
// Parse path: bucket/prefix
idx := strings.Index(path, "/")
if idx <= 0 {
return fmt.Errorf("invalid S3 path %q, expected bucket/prefix", path)
}
bs.bucket = path[:idx]
bs.prefix = path[idx+1:]
if bs.prefix == "" {
return fmt.Errorf("invalid S3 path %q, prefix cannot be empty", path)
}
// Read backend-specific config
var (
chunkSize int64
deviceSize uint64
endpoint string
region string
forcePathStyle bool
)
if cfg, ok := dev.BackendConfig.(*config.BackendStorage); ok && cfg != nil {
chunkSize = cfg.S3ChunkSize
deviceSize = cfg.DeviceSize
endpoint = cfg.S3Endpoint
region = cfg.S3Region
forcePathStyle = cfg.S3ForcePathStyle
}
if chunkSize <= 0 {
chunkSize = DefaultChunkSize
}
bs.chunkSize = chunkSize
// Create S3 client if not already set (e.g., by tests)
if bs.client == nil {
ctx := context.Background()
var opts []func(*awsconfig.LoadOptions) error
if region != "" {
opts = append(opts, awsconfig.WithRegion(region))
}
cfg, err := awsconfig.LoadDefaultConfig(ctx, opts...)
if err != nil {
return fmt.Errorf("failed to load AWS config: %w", err)
}
var s3Opts []func(*s3.Options)
if endpoint != "" {
s3Opts = append(s3Opts, func(o *s3.Options) {
o.BaseEndpoint = aws.String(endpoint)
})
}
if forcePathStyle {
s3Opts = append(s3Opts, func(o *s3.Options) {
o.UsePathStyle = true
})
}
// Disable response checksum validation for compatibility with
// S3-compatible backends (e.g., MinIO) and range read requests.
s3Opts = append(s3Opts, func(o *s3.Options) {
o.ResponseChecksumValidation = aws.ResponseChecksumValidationWhenRequired
})
bs.client = s3.NewFromConfig(cfg, s3Opts...)
}
// Try to load existing metadata
ctx := context.Background()
meta, err := bs.loadMetadata(ctx)
if err != nil {
var nsk *types.NoSuchKey
if !errors.As(err, &nsk) {
return fmt.Errorf("failed to load S3 metadata: %w", err)
}
// Metadata does not exist - create new device
if deviceSize == 0 {
return fmt.Errorf("S3 device metadata not found and deviceSize not configured")
}
meta = &deviceMetadata{
DeviceSize: deviceSize,
ChunkSize: chunkSize,
}
if err := bs.saveMetadata(ctx, meta); err != nil {
return fmt.Errorf("failed to save S3 metadata: %w", err)
}
log.Infof("S3 backing store: created new device %s/%s, size=%d, chunkSize=%d",
bs.bucket, bs.prefix, deviceSize, chunkSize)
} else {
bs.chunkSize = meta.ChunkSize
log.Infof("S3 backing store: opened existing device %s/%s, size=%d, chunkSize=%d",
bs.bucket, bs.prefix, meta.DeviceSize, meta.ChunkSize)
}
bs.DataSize = meta.DeviceSize
return nil
}
func (bs *S3BackingStore) loadMetadata(ctx context.Context) (*deviceMetadata, error) {
key := bs.metadataObjKey()
out, err := bs.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bs.bucket),
Key: aws.String(key),
})
if err != nil {
return nil, err
}
defer out.Body.Close()
var meta deviceMetadata
if err := json.NewDecoder(out.Body).Decode(&meta); err != nil {
return nil, fmt.Errorf("failed to decode metadata: %w", err)
}
return &meta, nil
}
func (bs *S3BackingStore) saveMetadata(ctx context.Context, meta *deviceMetadata) error {
data, err := json.Marshal(meta)
if err != nil {
return err
}
key := bs.metadataObjKey()
_, err = bs.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(bs.bucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
})
return err
}
func (bs *S3BackingStore) Close(dev *api.SCSILu) error {
bs.client = nil
return nil
}
func (bs *S3BackingStore) Init(dev *api.SCSILu, Opts string) error {
return nil
}
func (bs *S3BackingStore) Exit(dev *api.SCSILu) error {
return nil
}
func (bs *S3BackingStore) Size(dev *api.SCSILu) uint64 {
return bs.DataSize
}
func (bs *S3BackingStore) Read(offset, tl int64) ([]byte, error) {
if bs.client == nil {
return nil, fmt.Errorf("S3 backend store is not open")
}
result := make([]byte, tl)
startChunk := offset / bs.chunkSize
endChunk := (offset + tl - 1) / bs.chunkSize
ctx := context.Background()
eg, ctx := errgroup.WithContext(ctx)
for ci := startChunk; ci <= endChunk; ci++ {
ci := ci
eg.Go(func() error {
chunkStart := ci * bs.chunkSize
readStart := max(offset, chunkStart) - chunkStart
readEnd := min(offset+tl, chunkStart+bs.chunkSize) - chunkStart
data, err := bs.getChunkRange(ctx, ci, readStart, readEnd)
if err != nil {
return err
}
destStart := max(offset, chunkStart) - offset
copy(result[destStart:], data)
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
return result, nil
}
// ReadAt reads directly into the provided buffer to avoid allocation.
func (bs *S3BackingStore) ReadAt(buf []byte, offset int64) (int, error) {
data, err := bs.Read(offset, int64(len(buf)))
if err != nil {
return 0, err
}
copy(buf, data)
return len(buf), nil
}
// getChunkRange reads a byte range from a chunk. Returns zeros if the chunk does not exist.
func (bs *S3BackingStore) getChunkRange(ctx context.Context, chunkIdx, start, end int64) ([]byte, error) {
key := bs.chunkKey(chunkIdx)
rangeStr := fmt.Sprintf("bytes=%d-%d", start, end-1)
out, err := bs.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bs.bucket),
Key: aws.String(key),
Range: aws.String(rangeStr),
})
if err != nil {
var nsk *types.NoSuchKey
if errors.As(err, &nsk) {
// Chunk doesn't exist - return zeros (sparse)
return make([]byte, end-start), nil
}
return nil, fmt.Errorf("failed to read chunk %d: %w", chunkIdx, err)
}
defer out.Body.Close()
data, err := io.ReadAll(out.Body)
if err != nil {
return nil, fmt.Errorf("failed to read chunk %d body: %w", chunkIdx, err)
}
// Pad with zeros if chunk is shorter than expected range
expected := int(end - start)
if len(data) < expected {
padded := make([]byte, expected)
copy(padded, data)
return padded, nil
}
return data, nil
}
func (bs *S3BackingStore) Write(wbuf []byte, offset int64) error {
if bs.client == nil {
return fmt.Errorf("S3 backend store is not open")
}
tl := int64(len(wbuf))
startChunk := offset / bs.chunkSize
endChunk := (offset + tl - 1) / bs.chunkSize
ctx := context.Background()
eg, ctx := errgroup.WithContext(ctx)
for ci := startChunk; ci <= endChunk; ci++ {
ci := ci
eg.Go(func() error {
chunkStart := ci * bs.chunkSize
writeStart := max(offset, chunkStart) - chunkStart
writeEnd := min(offset+tl, chunkStart+bs.chunkSize) - chunkStart
srcStart := max(offset, chunkStart) - offset
srcEnd := srcStart + (writeEnd - writeStart)
if writeStart == 0 && writeEnd == bs.chunkSize {
// Full chunk write - direct upload
return bs.putChunk(ctx, ci, wbuf[srcStart:srcEnd])
}
// Partial chunk - read-modify-write
return bs.readModifyWriteChunk(ctx, ci, writeStart, writeEnd, wbuf[srcStart:srcEnd])
})
}
return eg.Wait()
}
func (bs *S3BackingStore) putChunk(ctx context.Context, chunkIdx int64, data []byte) error {
key := bs.chunkKey(chunkIdx)
_, err := bs.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(bs.bucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
})
if err != nil {
return fmt.Errorf("failed to write chunk %d: %w", chunkIdx, err)
}
return nil
}
func (bs *S3BackingStore) readModifyWriteChunk(ctx context.Context, chunkIdx, writeStart, writeEnd int64, data []byte) error {
mu := bs.lockChunk(chunkIdx)
defer mu.Unlock()
// Read existing chunk
chunk, err := bs.getFullChunk(ctx, chunkIdx)
if err != nil {
return err
}
// Modify
copy(chunk[writeStart:writeEnd], data)
// Write back
return bs.putChunk(ctx, chunkIdx, chunk)
}
// getFullChunk reads the full chunk, returning a zero-filled buffer if it doesn't exist.
func (bs *S3BackingStore) getFullChunk(ctx context.Context, chunkIdx int64) ([]byte, error) {
key := bs.chunkKey(chunkIdx)
out, err := bs.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bs.bucket),
Key: aws.String(key),
})
if err != nil {
var nsk *types.NoSuchKey
if errors.As(err, &nsk) {
return make([]byte, bs.chunkSize), nil
}
return nil, fmt.Errorf("failed to read full chunk %d: %w", chunkIdx, err)
}
defer out.Body.Close()
data, err := io.ReadAll(out.Body)
if err != nil {
return nil, fmt.Errorf("failed to read full chunk %d body: %w", chunkIdx, err)
}
if int64(len(data)) < bs.chunkSize {
padded := make([]byte, bs.chunkSize)
copy(padded, data)
return padded, nil
}
return data, nil
}
func (bs *S3BackingStore) DataSync(offset, tl int64) error {
// S3 PutObject is durable once acknowledged
return nil
}
func (bs *S3BackingStore) DataAdvise(offset, length int64, advise uint32) error {
return nil
}
func (bs *S3BackingStore) Unmap(descriptors []api.UnmapBlockDescriptor) error {
if bs.client == nil {
return fmt.Errorf("S3 backend store is not open")
}
ctx := context.Background()
for _, desc := range descriptors {
startChunk := int64(desc.Offset) / bs.chunkSize
endChunk := int64(desc.Offset+desc.TL-1) / bs.chunkSize
for ci := startChunk; ci <= endChunk; ci++ {
chunkStart := ci * bs.chunkSize
unmapStart := max(int64(desc.Offset), chunkStart) - chunkStart
unmapEnd := min(int64(desc.Offset+desc.TL), chunkStart+bs.chunkSize) - chunkStart
if unmapStart == 0 && unmapEnd == bs.chunkSize {
// Full chunk unmap - delete the object
key := bs.chunkKey(ci)
if _, err := bs.client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(bs.bucket),
Key: aws.String(key),
}); err != nil {
return fmt.Errorf("failed to delete chunk %d: %w", ci, err)
}
}
// Partial chunk unmap - ignore (sparse zeros on next read is fine
// since missing chunk data is treated as zeros)
}
}
return nil
}
func max(a, b int64) int64 {
if a > b {
return a
}
return b
}
func min(a, b int64) int64 {
if a < b {
return a
}
return b
}