319 lines
10 KiB
Go
319 lines
10 KiB
Go
package remotefileupload
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/fiskerinc/cloud-services/pkg/logger"
|
|
"github.com/fiskerinc/cloud-services/pkg/utils/elptr"
|
|
"github.com/fiskerinc/cloud-services/pkg/utils/envtool"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
|
|
)
|
|
|
|
var (
|
|
backupContainerName = envtool.GetEnv("AZURE_STORAGE_BACKUP_CONTAINER", "raw-can-archive")
|
|
ttl = envtool.GetEnvInt64("AZURE_STORAGE_BACKUP_TTL", 60*24) // 60 days
|
|
azureRawCompressedContainerName = envtool.GetEnv("AZURE_STORAGE_RAW_COMPRESSED_CONTAINER", "raw-can-compressed")
|
|
)
|
|
|
|
type Backup struct {
|
|
azureAccount string
|
|
azureAccountKey string
|
|
containerName string
|
|
cred *azblob.SharedKeyCredential
|
|
}
|
|
|
|
var (
|
|
errTTL = "Failed to set ttl %s"
|
|
errCopy = "Failed to copy %s"
|
|
errDelete = "Failed to delete %s"
|
|
errClient = "Failed to create client"
|
|
errParquetWriter = "Failed to create parquet wirter %s"
|
|
errDownload = "Failed to download file %s"
|
|
blobNotExists = "The specified blob does not exist."
|
|
)
|
|
|
|
func NewBackup(azureAccount string, azureAccountKey string, containerName string) *Backup {
|
|
bk := &Backup{
|
|
azureAccount: azureAccount,
|
|
azureAccountKey: azureAccountKey,
|
|
containerName: containerName,
|
|
}
|
|
bk.cred, _ = azblob.NewSharedKeyCredential(azureAccount, azureAccountKey)
|
|
return bk
|
|
}
|
|
|
|
// remove deletes a file from Azure Blob Storage.
|
|
//
|
|
// Parameters:
|
|
// - context: context, Backgroud as of now.
|
|
// - filePath: The path of the file to be removed.
|
|
//
|
|
// Returns:
|
|
// - err: return err if occur otherwise nil.
|
|
//
|
|
// Deletes appendblock blob from storage. If the removal operation encounters an error, it logs
|
|
// an error message and returns error. Otherwise, it returns nil to indicate
|
|
// a successful removal.
|
|
func (b *Backup) remove(ctx context.Context, filePath string) error {
|
|
|
|
// Construct the full path of the file in Azure Blob Storage
|
|
fullPath := b.azureBlobURL(b.getContainerPath(b.containerName), filePath)
|
|
|
|
client, err := appendblob.NewClientWithSharedKeyCredential(fullPath, b.cred, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = client.Delete(context.Background(), nil)
|
|
return err
|
|
}
|
|
|
|
// Move, copy a blob from Azure Blob Storage to Azure Blob Storage as cool tier block blob type.
|
|
// Set TTL to new blob and remove the original blob
|
|
// Parameters:
|
|
// - context: context, Backgroud as of now.
|
|
// - filePath: path of the src file.
|
|
//
|
|
// Returns:
|
|
// - err: An error, if any, that occurred during the SAS token generation process.
|
|
func (b *Backup) Move(ctx context.Context, filePath string) error {
|
|
|
|
backupPath := filePath
|
|
|
|
// Construct the full path of the src file in Azure Blob Storage
|
|
srcPath := b.azureBlobURL(b.getContainerPath(b.containerName), filePath)
|
|
|
|
// Construct the full path of the dest file in Azure Blob Storage
|
|
destPath := b.azureBlobURL(b.getContainerPath(backupContainerName), backupPath)
|
|
|
|
// Generate a Shared Access Signature (SAS) token for src file with read permissions
|
|
srcSAS, _ := b.generateSASToken(filePath, sas.BlobPermissions{Read: true}, b.containerName)
|
|
|
|
client, err := blockblob.NewClientWithSharedKeyCredential(destPath, b.cred, &blockblob.ClientOptions{
|
|
ClientOptions: policy.ClientOptions{
|
|
Retry: policy.RetryOptions{
|
|
MaxRetries: 1,
|
|
MaxRetryDelay: 1 * time.Minute,
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
tier := blob.AccessTierCool // Set cool tier type as cold tier not supported for this version of sdk
|
|
_, err = client.UploadBlobFromURL(ctx, fmt.Sprintf("%s?%s", srcPath, srcSAS), &blockblob.UploadBlobFromURLOptions{
|
|
Tier: &tier,
|
|
})
|
|
|
|
if err != nil && !strings.Contains(err.Error(), blobNotExists) {
|
|
logger.Err(err).Msg(fmt.Sprintf(errCopy, srcPath))
|
|
return err
|
|
}
|
|
|
|
err = b.setTTL(ctx, destPath)
|
|
if err != nil && !strings.Contains(err.Error(), blobNotExists) {
|
|
logger.Err(err).Msg(fmt.Sprintf(errTTL, destPath))
|
|
}
|
|
|
|
err = b.remove(ctx, filePath)
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), blobNotExists) {
|
|
return nil
|
|
}
|
|
logger.Err(err).Msg(fmt.Sprintf(errDelete, destPath))
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// setTTL set a Time-to-Live (TTL) expiration policy to an Azure Blob Storage file.
|
|
//
|
|
// Parameters:
|
|
// - context: context, Backgroud as of now.
|
|
// - fileUrl: The URL of the Azure Blob Storage file to which the TTL policy will be added.
|
|
//
|
|
// Returns:
|
|
// - error: An error, if any, that occurred during the TTL policy addition process. It returns nil if successful.
|
|
//
|
|
// The setTTL function is responsible for adding a Time-to-Live (TTL) expiration policy
|
|
// to a specific file located in Azure Blob Storage. A TTL policy allows you to specify
|
|
// a duration after which the file will be automatically deleted from storage.
|
|
func (b *Backup) setTTL(ctx context.Context, fileUrl string) error {
|
|
|
|
blockBlobClient, err := blockblob.NewClientWithSharedKeyCredential(fileUrl, b.cred, &blockblob.ClientOptions{
|
|
ClientOptions: policy.ClientOptions{
|
|
Retry: policy.RetryOptions{
|
|
MaxRetries: 1,
|
|
MaxRetryDelay: 1 * time.Minute,
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// set expiry on block blob 4 hours relative to now
|
|
_, err = blockBlobClient.SetExpiry(context.Background(), blockblob.ExpiryTypeRelativeToNow(ttl*int64(time.Hour)), nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// validate set expiry operation
|
|
resp, err := blockBlobClient.GetProperties(ctx, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.ExpiresOn == nil {
|
|
return nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// generateAzureSASToken generates a Shared Access Signature (SAS) token for an Azure Blob Storage blob.
|
|
//
|
|
// Parameters:
|
|
// - blobName: The name of the blob for which the SAS token is generated.
|
|
// - permission: The BlobPermissions object specifying the permissions granted by the SAS token.
|
|
// - containerName: The containerName of the blob.
|
|
//
|
|
// Returns:
|
|
// - token: The generated SAS token string.
|
|
// - err: An error, if any, that occurred during the SAS token generation process.
|
|
func (b *Backup) generateSASToken(blobName string, permission sas.BlobPermissions, containerName string) (token string, err error) {
|
|
// blob name is something like this: 19UUA56873A044568/2023/01/11/raw.log
|
|
cred, err := azblob.NewSharedKeyCredential(b.azureAccount, b.azureAccountKey)
|
|
if err != nil {
|
|
logger.Err(err).Msg("[backup]:[NewSharedKeyCredential]")
|
|
return
|
|
}
|
|
sasQueryParams, err := sas.BlobSignatureValues{
|
|
Protocol: sas.ProtocolHTTPS,
|
|
StartTime: time.Now().UTC().Add(-1 * time.Hour), // reduce an hour from current time to avoid signature issue
|
|
ExpiryTime: time.Now().UTC().Add(3 * 365 * 24 * time.Hour), // 3 years-ish
|
|
Permissions: elptr.ElPtr(permission).String(),
|
|
ContainerName: containerName,
|
|
BlobName: blobName,
|
|
}.SignWithSharedKey(cred)
|
|
|
|
if err != nil {
|
|
logger.Err(err).Msg("Failed to sas.BlobSignatureValues")
|
|
return
|
|
}
|
|
|
|
token = sasQueryParams.Encode()
|
|
return
|
|
}
|
|
|
|
func (b *Backup) azureBlobURL(basePath string, filePath string) string {
|
|
finalPath, _ := url.JoinPath(basePath, filePath)
|
|
return finalPath
|
|
}
|
|
|
|
func (b *Backup) getContainerPath(containerName string) string {
|
|
return fmt.Sprintf("https://%s.blob.core.windows.net/%s/", b.azureAccount, containerName)
|
|
}
|
|
|
|
// ToParquet converts data from an Azure Blob csv to a Parquet file and stores it in another container.
|
|
//
|
|
// This function takes an `blobName` representing the source Azure Blob csv and performs the following steps:
|
|
//
|
|
// 1. Downloads data from the source Azure Blob identified by `blobName`.
|
|
// 2. Converts the retrieved data into a Parquet file using Parquet Writer.
|
|
//
|
|
// Parameters:
|
|
// - blobName: The name of the source Azure Blob csv that contains the data to be converted to Parquet.
|
|
//
|
|
// Returns:
|
|
// - error: An error logs and returns if any step of the conversion or storage process encounters an issue. It returns nil on success.
|
|
func (b *Backup) ToParquet(blobName string, guard chan struct{}) error {
|
|
|
|
var err error
|
|
srcBlobURL := b.azureBlobURL(b.getContainerPath(backupContainerName), blobName)
|
|
parquetBlobName := b.changeFileExt(blobName, "parquet")
|
|
parquetBlobURL := b.azureBlobURL(b.getContainerPath(azureRawCompressedContainerName), parquetBlobName)
|
|
|
|
client, err := blockblob.NewClientWithSharedKeyCredential(srcBlobURL, b.cred, nil)
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), blobNotExists) {
|
|
return nil
|
|
}
|
|
logger.Err(err).Msg(errClient)
|
|
return err
|
|
}
|
|
|
|
downloadResp, err := client.DownloadStream(context.Background(), nil)
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), blobNotExists) {
|
|
return nil
|
|
}
|
|
logger.Err(err).Msg(fmt.Sprintf(errDownload, srcBlobURL))
|
|
return err
|
|
}
|
|
|
|
defer downloadResp.Body.Close()
|
|
|
|
csvToParquet := NewCSVtoParquet(b.azureAccount, b.azureAccountKey, parquetBlobURL)
|
|
|
|
guard <- struct{}{} // for reader
|
|
go func() {
|
|
defer func() {
|
|
<-guard
|
|
}()
|
|
csvToParquet.Read(downloadResp.Body)
|
|
}()
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
guard <- struct{}{} // for writer
|
|
go func(w *sync.WaitGroup) {
|
|
defer func() {
|
|
w.Done()
|
|
<-guard
|
|
}()
|
|
err = csvToParquet.Write()
|
|
if err != nil {
|
|
logger.Err(err).Msg(fmt.Sprintf(errParquetWriter, parquetBlobURL))
|
|
}
|
|
}(&wg)
|
|
|
|
wg.Wait()
|
|
return err
|
|
|
|
}
|
|
|
|
// changeFileExt updates the file extension of a given blob name and returns the modified blob name.
|
|
//
|
|
// This method takes an existing `blobName` and replaces its file extension with the specified `fileExt`.
|
|
// It then returns the modified blob name as a string.
|
|
//
|
|
// Parameters:
|
|
// - blobName: The original blob name, including its current file extension.
|
|
// - fileExt: The new file extension to replace the existing one. The `fileExt` should not include the dot (e.g., "txt").
|
|
//
|
|
// Returns:
|
|
// - string: The modified blob name with the updated file extension.
|
|
func (b *Backup) changeFileExt(blobName, fileExt string) string {
|
|
if len(fileExt) > 0 && string(fileExt[0]) == "." {
|
|
fileExt = fileExt[1:]
|
|
}
|
|
if len(blobName) == 0 {
|
|
return fmt.Sprintf(".%s", fileExt)
|
|
}
|
|
arr := strings.Split(blobName, ".")
|
|
if len(arr) == 1 {
|
|
return fmt.Sprintf("%s.%s", arr[0], fileExt)
|
|
}
|
|
arr[len(arr)-1] = fileExt
|
|
return strings.Join(arr, ".")
|
|
}
|