Files
cloud-services/pkg/remotefileupload/backup.go

319 lines
10 KiB
Go

package remotefileupload
import (
"context"
"fmt"
"net/url"
"strings"
"sync"
"time"
"github.com/fiskerinc/cloud-services/pkg/logger"
"github.com/fiskerinc/cloud-services/pkg/utils/elptr"
"github.com/fiskerinc/cloud-services/pkg/utils/envtool"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
)
var (
backupContainerName = envtool.GetEnv("AZURE_STORAGE_BACKUP_CONTAINER", "raw-can-archive")
ttl = envtool.GetEnvInt64("AZURE_STORAGE_BACKUP_TTL", 60*24) // 60 days
azureRawCompressedContainerName = envtool.GetEnv("AZURE_STORAGE_RAW_COMPRESSED_CONTAINER", "raw-can-compressed")
)
type Backup struct {
azureAccount string
azureAccountKey string
containerName string
cred *azblob.SharedKeyCredential
}
var (
errTTL = "Failed to set ttl %s"
errCopy = "Failed to copy %s"
errDelete = "Failed to delete %s"
errClient = "Failed to create client"
errParquetWriter = "Failed to create parquet wirter %s"
errDownload = "Failed to download file %s"
blobNotExists = "The specified blob does not exist."
)
func NewBackup(azureAccount string, azureAccountKey string, containerName string) *Backup {
bk := &Backup{
azureAccount: azureAccount,
azureAccountKey: azureAccountKey,
containerName: containerName,
}
bk.cred, _ = azblob.NewSharedKeyCredential(azureAccount, azureAccountKey)
return bk
}
// remove deletes a file from Azure Blob Storage.
//
// Parameters:
// - context: context, Backgroud as of now.
// - filePath: The path of the file to be removed.
//
// Returns:
// - err: return err if occur otherwise nil.
//
// Deletes appendblock blob from storage. If the removal operation encounters an error, it logs
// an error message and returns error. Otherwise, it returns nil to indicate
// a successful removal.
func (b *Backup) remove(ctx context.Context, filePath string) error {
// Construct the full path of the file in Azure Blob Storage
fullPath := b.azureBlobURL(b.getContainerPath(b.containerName), filePath)
client, err := appendblob.NewClientWithSharedKeyCredential(fullPath, b.cred, nil)
if err != nil {
return err
}
_, err = client.Delete(context.Background(), nil)
return err
}
// Move, copy a blob from Azure Blob Storage to Azure Blob Storage as cool tier block blob type.
// Set TTL to new blob and remove the original blob
// Parameters:
// - context: context, Backgroud as of now.
// - filePath: path of the src file.
//
// Returns:
// - err: An error, if any, that occurred during the SAS token generation process.
func (b *Backup) Move(ctx context.Context, filePath string) error {
backupPath := filePath
// Construct the full path of the src file in Azure Blob Storage
srcPath := b.azureBlobURL(b.getContainerPath(b.containerName), filePath)
// Construct the full path of the dest file in Azure Blob Storage
destPath := b.azureBlobURL(b.getContainerPath(backupContainerName), backupPath)
// Generate a Shared Access Signature (SAS) token for src file with read permissions
srcSAS, _ := b.generateSASToken(filePath, sas.BlobPermissions{Read: true}, b.containerName)
client, err := blockblob.NewClientWithSharedKeyCredential(destPath, b.cred, &blockblob.ClientOptions{
ClientOptions: policy.ClientOptions{
Retry: policy.RetryOptions{
MaxRetries: 1,
MaxRetryDelay: 1 * time.Minute,
},
},
})
if err != nil {
return err
}
tier := blob.AccessTierCool // Set cool tier type as cold tier not supported for this version of sdk
_, err = client.UploadBlobFromURL(ctx, fmt.Sprintf("%s?%s", srcPath, srcSAS), &blockblob.UploadBlobFromURLOptions{
Tier: &tier,
})
if err != nil && !strings.Contains(err.Error(), blobNotExists) {
logger.Err(err).Msg(fmt.Sprintf(errCopy, srcPath))
return err
}
err = b.setTTL(ctx, destPath)
if err != nil && !strings.Contains(err.Error(), blobNotExists) {
logger.Err(err).Msg(fmt.Sprintf(errTTL, destPath))
}
err = b.remove(ctx, filePath)
if err != nil {
if strings.Contains(err.Error(), blobNotExists) {
return nil
}
logger.Err(err).Msg(fmt.Sprintf(errDelete, destPath))
}
return err
}
// setTTL set a Time-to-Live (TTL) expiration policy to an Azure Blob Storage file.
//
// Parameters:
// - context: context, Backgroud as of now.
// - fileUrl: The URL of the Azure Blob Storage file to which the TTL policy will be added.
//
// Returns:
// - error: An error, if any, that occurred during the TTL policy addition process. It returns nil if successful.
//
// The setTTL function is responsible for adding a Time-to-Live (TTL) expiration policy
// to a specific file located in Azure Blob Storage. A TTL policy allows you to specify
// a duration after which the file will be automatically deleted from storage.
func (b *Backup) setTTL(ctx context.Context, fileUrl string) error {
blockBlobClient, err := blockblob.NewClientWithSharedKeyCredential(fileUrl, b.cred, &blockblob.ClientOptions{
ClientOptions: policy.ClientOptions{
Retry: policy.RetryOptions{
MaxRetries: 1,
MaxRetryDelay: 1 * time.Minute,
},
},
})
if err != nil {
return err
}
// set expiry on block blob 4 hours relative to now
_, err = blockBlobClient.SetExpiry(context.Background(), blockblob.ExpiryTypeRelativeToNow(ttl*int64(time.Hour)), nil)
if err != nil {
return err
}
// validate set expiry operation
resp, err := blockBlobClient.GetProperties(ctx, nil)
if err != nil {
return err
}
if resp.ExpiresOn == nil {
return nil
}
return nil
}
// generateAzureSASToken generates a Shared Access Signature (SAS) token for an Azure Blob Storage blob.
//
// Parameters:
// - blobName: The name of the blob for which the SAS token is generated.
// - permission: The BlobPermissions object specifying the permissions granted by the SAS token.
// - containerName: The containerName of the blob.
//
// Returns:
// - token: The generated SAS token string.
// - err: An error, if any, that occurred during the SAS token generation process.
func (b *Backup) generateSASToken(blobName string, permission sas.BlobPermissions, containerName string) (token string, err error) {
// blob name is something like this: 19UUA56873A044568/2023/01/11/raw.log
cred, err := azblob.NewSharedKeyCredential(b.azureAccount, b.azureAccountKey)
if err != nil {
logger.Err(err).Msg("[backup]:[NewSharedKeyCredential]")
return
}
sasQueryParams, err := sas.BlobSignatureValues{
Protocol: sas.ProtocolHTTPS,
StartTime: time.Now().UTC().Add(-1 * time.Hour), // reduce an hour from current time to avoid signature issue
ExpiryTime: time.Now().UTC().Add(3 * 365 * 24 * time.Hour), // 3 years-ish
Permissions: elptr.ElPtr(permission).String(),
ContainerName: containerName,
BlobName: blobName,
}.SignWithSharedKey(cred)
if err != nil {
logger.Err(err).Msg("Failed to sas.BlobSignatureValues")
return
}
token = sasQueryParams.Encode()
return
}
func (b *Backup) azureBlobURL(basePath string, filePath string) string {
finalPath, _ := url.JoinPath(basePath, filePath)
return finalPath
}
func (b *Backup) getContainerPath(containerName string) string {
return fmt.Sprintf("https://%s.blob.core.windows.net/%s/", b.azureAccount, containerName)
}
// ToParquet converts data from an Azure Blob csv to a Parquet file and stores it in another container.
//
// This function takes an `blobName` representing the source Azure Blob csv and performs the following steps:
//
// 1. Downloads data from the source Azure Blob identified by `blobName`.
// 2. Converts the retrieved data into a Parquet file using Parquet Writer.
//
// Parameters:
// - blobName: The name of the source Azure Blob csv that contains the data to be converted to Parquet.
//
// Returns:
// - error: An error logs and returns if any step of the conversion or storage process encounters an issue. It returns nil on success.
func (b *Backup) ToParquet(blobName string, guard chan struct{}) error {
var err error
srcBlobURL := b.azureBlobURL(b.getContainerPath(backupContainerName), blobName)
parquetBlobName := b.changeFileExt(blobName, "parquet")
parquetBlobURL := b.azureBlobURL(b.getContainerPath(azureRawCompressedContainerName), parquetBlobName)
client, err := blockblob.NewClientWithSharedKeyCredential(srcBlobURL, b.cred, nil)
if err != nil {
if strings.Contains(err.Error(), blobNotExists) {
return nil
}
logger.Err(err).Msg(errClient)
return err
}
downloadResp, err := client.DownloadStream(context.Background(), nil)
if err != nil {
if strings.Contains(err.Error(), blobNotExists) {
return nil
}
logger.Err(err).Msg(fmt.Sprintf(errDownload, srcBlobURL))
return err
}
defer downloadResp.Body.Close()
csvToParquet := NewCSVtoParquet(b.azureAccount, b.azureAccountKey, parquetBlobURL)
guard <- struct{}{} // for reader
go func() {
defer func() {
<-guard
}()
csvToParquet.Read(downloadResp.Body)
}()
var wg sync.WaitGroup
wg.Add(1)
guard <- struct{}{} // for writer
go func(w *sync.WaitGroup) {
defer func() {
w.Done()
<-guard
}()
err = csvToParquet.Write()
if err != nil {
logger.Err(err).Msg(fmt.Sprintf(errParquetWriter, parquetBlobURL))
}
}(&wg)
wg.Wait()
return err
}
// changeFileExt updates the file extension of a given blob name and returns the modified blob name.
//
// This method takes an existing `blobName` and replaces its file extension with the specified `fileExt`.
// It then returns the modified blob name as a string.
//
// Parameters:
// - blobName: The original blob name, including its current file extension.
// - fileExt: The new file extension to replace the existing one. The `fileExt` should not include the dot (e.g., "txt").
//
// Returns:
// - string: The modified blob name with the updated file extension.
func (b *Backup) changeFileExt(blobName, fileExt string) string {
if len(fileExt) > 0 && string(fileExt[0]) == "." {
fileExt = fileExt[1:]
}
if len(blobName) == 0 {
return fmt.Sprintf(".%s", fileExt)
}
arr := strings.Split(blobName, ".")
if len(arr) == 1 {
return fmt.Sprintf("%s.%s", arr[0], fileExt)
}
arr[len(arr)-1] = fileExt
return strings.Join(arr, ".")
}