AWS S3 - Building distributed log

·

5 min read

In this article, I am exploring the possibility of building a durable, distributed, and highly available log using S3. Logs are an event stream of data, once written are immutable.

The usage of S3
S3 is a very attractive proposition,

  1. There is no hardware to maintain, it is scalable, it offers different type of classes

  2. it is global

  3. reasonably priced

Log

In this design, the log here consists of 3 parts, the offset, the data, and the checksum.

Offset

By storing offset we make the record self contained. For e.g. if we do compaction tomorrow and change file names, the record’s offset remains same

func prepareBody(offset uint64, data []byte) ([]byte, error) {
    // 8 bytes for offset, len(data) bytes for data, 2 bytes for CRC16
    bufferLen := 8 + len(data) + 2
    buf := bytes.NewBuffer(make([]byte, 0, bufferLen))
    if err := binary.Write(buf, binary.BigEndian, offset); err != nil {
        return nil, err
    }
    if _, err := buf.Write(data); err != nil {
        return nil, err
    }
    crc := crc16Fast(buf.Bytes()) // Exclude space for CRC during calculation
    if err := binary.Write(buf, binary.BigEndian, crc); err != nil {
        return nil, err
    }
    return buf.Bytes(), nil
}

Checksum

For checksum we use CRC16 which is fast and easier to compute. This is not a complicated application where its role is just to write in a distributed way

func crc16Fast(data []byte) uint16 {
    const polynomial uint16 = 0x1021 // CRC-16-CCITT polynomial
    crc := uint16(0xCACA)            // Common initialization value
    for _, b := range data {
        crc ^= uint16(b) << 8
        for i := 0; i < 8; i++ {
            if crc&0x8000 != 0 {
                crc = (crc << 1) ^ polynomial
            } else {
                crc <<= 1
            }
        }
    }
    return crc
}

func validateChecksum(data []byte) bool {
    if len(data) < 2 {
        return false
    }

    // Extract stored CRC (ensure correct endianness)
    storedCRC := binary.BigEndian.Uint16(data[len(data)-2:])
    // Data used for CRC calculation
    recordData := data[:len(data)-2]

    // Calculate CRC using corrected algorithm
    calculatedCRC := crc16Fast(recordData)

    // Debug logs
    fmt.Printf("Stored CRC: 0x%04X\n", storedCRC)
    fmt.Printf("Calculated CRC: 0x%04X\n", calculatedCRC)
    fmt.Printf("Data used for CRC: %v\n", recordData)

    return storedCRC == calculatedCRC
}

Append

The only ‘write’ operation we can do on a log is Append. Append takes a bunch of bytes and writes them to the end of the log. It returns the offset, which is the position of this record in the log.

The very first record will have offset 0000000001. For every new object we insert in the S3 bucket, we will increment it by one. Once a record is inserted, we will return its offset to the caller.

How do we prevent two writers appending records with same offset? This is one of the crucial property of a log. That’s why we have added IfNoneMatch: aws.String("*") in the request. If an object already exists with the same record offset, the request will be rejected. Let’s write a basic test to confirm this:

func (w *S3DAL) Append(ctx context.Context, data []byte, fileSizeLimit uint64) (uint64, error) {
    // Check if adding the new data will exceed the allowed file size
    newDataSize := uint64(len(data))
    if w.length+newDataSize > fileSizeLimit {
        return 0, fmt.Errorf("appending data would exceed the file size limit of %d bytes", fileSizeLimit)
    }

    // Calculate the next offset
    nextOffset := w.length + 1

    // Prepare the body for upload
    buf, err := prepareBody(nextOffset, data)
    if err != nil {
        return 0, fmt.Errorf("failed to prepare object body: %w", err)
    }

    input := &s3.PutObjectInput{
        Bucket:      aws.String(w.bucketName),
        Key:         aws.String(w.getObjectKey(nextOffset)),
        Body:        bytes.NewReader(buf),
        IfNoneMatch: aws.String("*"),
    }

    // Attempt to write the data to S3
    if _, err = w.client.PutObject(ctx, input); err != nil {
        return 0, fmt.Errorf("failed to put object to S3: %w", err)
    }

    // Update the current length
    w.length = nextOffset
    return nextOffset, nil
}

Read

Our log is coming along nicely! Let’s implement the read. It’s straightforward. Given an offset, we will construct the appropriate S3 object name and fetch it:

func (w *S3DAL) Read(ctx context.Context, offset uint64) (Record, error) {
    key := w.getObjectKey(offset)
    input := &s3.GetObjectInput{
        Bucket: aws.String(w.bucketName),
        Key:    aws.String(key),
    }

    result, err := w.client.GetObject(ctx, input)
    if err != nil {
        return Record{}, fmt.Errorf("failed to get object from S3: %w", err)
    }
    defer result.Body.Close()

    data, err := io.ReadAll(result.Body)
    if err != nil {
        return Record{}, fmt.Errorf("failed to read object body: %w", err)
    }
    if len(data) < 10 {
        return Record{}, fmt.Errorf("invalid record: data too short")
    }

    var storedOffset uint64
    if err = binary.Read(bytes.NewReader(data[:8]), binary.BigEndian, &storedOffset); err != nil {
        return Record{}, err
    }
    if storedOffset != offset {
        return Record{}, fmt.Errorf("offset mismatch: expected %d, got %d", offset, storedOffset)
    }
    if !validateChecksum(data) {
        return Record{}, fmt.Errorf("CRC mismatch")
    }
    return Record{
        Offset: storedOffset,
        Data:   data[8 : len(data)-2],
    }, nil
}

Failover / Crash Recovery

Now that we have our basic operations working, let’s handle failure scenarios. What if our node crashes? How do we recover it? We always initialize our WAL with length 0. Subsequently, new writes will try to write at 0000000001 offset. This is not a catastrophic bug! S3 conditional writes protect us and reject the writes. However, we will not be able to proceed with new writes. Let’s fix this. Let’s add a method which goes through the list of keys, finds the last inserted object.

func (w *S3DAL) LastRecord(ctx context.Context) (Record, error) {
    // Set up the input for listing objects with reversed order
    input := &s3.ListObjectsV2Input{
        Bucket: aws.String(w.bucketName),
        Prefix: aws.String(w.prefix + "/"),
    }

    // Initialize paginator
    paginator := s3.NewListObjectsV2Paginator(w.client, input)

    var lastKey string
    for paginator.HasMorePages() {
        output, err := paginator.NextPage(ctx)
        if err != nil {
            return Record{}, fmt.Errorf("failed to list objects from S3: %w", err)
        }

        // Get the last key in this page (keys are lexicographically sorted)
        if len(output.Contents) > 0 {
            lastKey = *output.Contents[len(output.Contents)-1].Key
        }
    }

    if lastKey == "" {
        return Record{}, fmt.Errorf("WAL is empty")
    }

    // Extract the offset from the last key
    maxOffset, err := w.getOffsetFromKey(lastKey)
    if err != nil {
        return Record{}, fmt.Errorf("failed to parse offset from key: %w", err)
    }

    w.length = maxOffset
    return w.Read(ctx, maxOffset)
}

That’s it. The whole code now is just about done. The essence of creating distributed logs is as shown