AWS S3 - Building distributed log
In this article, I am exploring the possibility of building a durable, distributed, and highly available log using S3. Logs are an event stream of data, once written are immutable.
The usage of S3
S3 is a very attractive proposition,
There is no hardware to maintain, it is scalable, it offers different type of classes
it is global
reasonably priced
Log
In this design, the log here consists of 3 parts, the offset, the data, and the checksum.
Offset
By storing offset we make the record self contained. For e.g. if we do compaction tomorrow and change file names, the record’s offset remains same
func prepareBody(offset uint64, data []byte) ([]byte, error) {
// 8 bytes for offset, len(data) bytes for data, 2 bytes for CRC16
bufferLen := 8 + len(data) + 2
buf := bytes.NewBuffer(make([]byte, 0, bufferLen))
if err := binary.Write(buf, binary.BigEndian, offset); err != nil {
return nil, err
}
if _, err := buf.Write(data); err != nil {
return nil, err
}
crc := crc16Fast(buf.Bytes()) // Exclude space for CRC during calculation
if err := binary.Write(buf, binary.BigEndian, crc); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
Checksum
For checksum we use CRC16 which is fast and easier to compute. This is not a complicated application where its role is just to write in a distributed way
func crc16Fast(data []byte) uint16 {
const polynomial uint16 = 0x1021 // CRC-16-CCITT polynomial
crc := uint16(0xCACA) // Common initialization value
for _, b := range data {
crc ^= uint16(b) << 8
for i := 0; i < 8; i++ {
if crc&0x8000 != 0 {
crc = (crc << 1) ^ polynomial
} else {
crc <<= 1
}
}
}
return crc
}
func validateChecksum(data []byte) bool {
if len(data) < 2 {
return false
}
// Extract stored CRC (ensure correct endianness)
storedCRC := binary.BigEndian.Uint16(data[len(data)-2:])
// Data used for CRC calculation
recordData := data[:len(data)-2]
// Calculate CRC using corrected algorithm
calculatedCRC := crc16Fast(recordData)
// Debug logs
fmt.Printf("Stored CRC: 0x%04X\n", storedCRC)
fmt.Printf("Calculated CRC: 0x%04X\n", calculatedCRC)
fmt.Printf("Data used for CRC: %v\n", recordData)
return storedCRC == calculatedCRC
}
Append
The only ‘write’ operation we can do on a log is Append
. Append takes a bunch of bytes and writes them to the end of the log. It returns the offset, which is the position of this record in the log.
The very first record will have offset 0000000001
. For every new object we insert in the S3 bucket, we will increment it by one. Once a record is inserted, we will return its offset to the caller.
How do we prevent two writers appending records with same offset? This is one of the crucial property of a log. That’s why we have added IfNoneMatch: aws.String("*")
in the request. If an object already exists with the same record offset, the request will be rejected. Let’s write a basic test to confirm this:
func (w *S3DAL) Append(ctx context.Context, data []byte, fileSizeLimit uint64) (uint64, error) {
// Check if adding the new data will exceed the allowed file size
newDataSize := uint64(len(data))
if w.length+newDataSize > fileSizeLimit {
return 0, fmt.Errorf("appending data would exceed the file size limit of %d bytes", fileSizeLimit)
}
// Calculate the next offset
nextOffset := w.length + 1
// Prepare the body for upload
buf, err := prepareBody(nextOffset, data)
if err != nil {
return 0, fmt.Errorf("failed to prepare object body: %w", err)
}
input := &s3.PutObjectInput{
Bucket: aws.String(w.bucketName),
Key: aws.String(w.getObjectKey(nextOffset)),
Body: bytes.NewReader(buf),
IfNoneMatch: aws.String("*"),
}
// Attempt to write the data to S3
if _, err = w.client.PutObject(ctx, input); err != nil {
return 0, fmt.Errorf("failed to put object to S3: %w", err)
}
// Update the current length
w.length = nextOffset
return nextOffset, nil
}
Read
Our log is coming along nicely! Let’s implement the read. It’s straightforward. Given an offset, we will construct the appropriate S3 object name and fetch it:
func (w *S3DAL) Read(ctx context.Context, offset uint64) (Record, error) {
key := w.getObjectKey(offset)
input := &s3.GetObjectInput{
Bucket: aws.String(w.bucketName),
Key: aws.String(key),
}
result, err := w.client.GetObject(ctx, input)
if err != nil {
return Record{}, fmt.Errorf("failed to get object from S3: %w", err)
}
defer result.Body.Close()
data, err := io.ReadAll(result.Body)
if err != nil {
return Record{}, fmt.Errorf("failed to read object body: %w", err)
}
if len(data) < 10 {
return Record{}, fmt.Errorf("invalid record: data too short")
}
var storedOffset uint64
if err = binary.Read(bytes.NewReader(data[:8]), binary.BigEndian, &storedOffset); err != nil {
return Record{}, err
}
if storedOffset != offset {
return Record{}, fmt.Errorf("offset mismatch: expected %d, got %d", offset, storedOffset)
}
if !validateChecksum(data) {
return Record{}, fmt.Errorf("CRC mismatch")
}
return Record{
Offset: storedOffset,
Data: data[8 : len(data)-2],
}, nil
}
Failover / Crash Recovery
Now that we have our basic operations working, let’s handle failure scenarios. What if our node crashes? How do we recover it? We always initialize our WAL with length 0. Subsequently, new writes will try to write at 0000000001
offset. This is not a catastrophic bug! S3 conditional writes protect us and reject the writes. However, we will not be able to proceed with new writes. Let’s fix this. Let’s add a method which goes through the list of keys, finds the last inserted object.
func (w *S3DAL) LastRecord(ctx context.Context) (Record, error) {
// Set up the input for listing objects with reversed order
input := &s3.ListObjectsV2Input{
Bucket: aws.String(w.bucketName),
Prefix: aws.String(w.prefix + "/"),
}
// Initialize paginator
paginator := s3.NewListObjectsV2Paginator(w.client, input)
var lastKey string
for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
return Record{}, fmt.Errorf("failed to list objects from S3: %w", err)
}
// Get the last key in this page (keys are lexicographically sorted)
if len(output.Contents) > 0 {
lastKey = *output.Contents[len(output.Contents)-1].Key
}
}
if lastKey == "" {
return Record{}, fmt.Errorf("WAL is empty")
}
// Extract the offset from the last key
maxOffset, err := w.getOffsetFromKey(lastKey)
if err != nil {
return Record{}, fmt.Errorf("failed to parse offset from key: %w", err)
}
w.length = maxOffset
return w.Read(ctx, maxOffset)
}
That’s it. The whole code now is just about done. The essence of creating distributed logs is as shown