1. 程式人生 > >Azure Functions + Azure Batch實現MP3音頻轉碼方案

Azure Functions + Azure Batch實現MP3音頻轉碼方案

item mach ucc job cep lec public accept tex

客戶需求

客戶的環境是一個網絡音樂播放系統,根據網絡情況提供給手機用戶收聽各種碼率的MP3歌曲,在客戶沒購買歌曲的情況下提供一個三十秒內的試聽版本。這樣一個系統非常明確地一個需求就是會定期需要將一批從音樂版商手中獲取到的高比特率音樂文件轉換成各種低碼率的MP3文件和試聽文件,由於收到版商的文件數量和時間都不確定,所以長期部署大量的轉碼服務器為系統提供轉碼服務顯然非常浪費資源,但是如果不準備好足夠的轉碼服務器的話,當大批量文件需要轉碼時又沒法能夠快速完成任務,在現在這個時間比金錢更加重要的互聯網時代顯然是不可接受的。這時候選擇公有雲這樣高彈性、按需計費的計算平臺就顯得非常合適了。

技術選型

使用Azure Fuctions+Azure Batch+Azure Blob Storage方案,全部都是基於PaaS平臺,無需對服務器進行管理,省去服務器在日常維護中各種補丁安全管理要求。

方案架構圖:

技術分享圖片

方案實現:

利用Azure Function監控Blob文件變化,Azure Functions的一大優點就是提供了不同類型的觸發器(http Trigger,Blob Trigger,Timer Trigger,Queue Trigger…),這裏我們正好利用上Blob Trigger用來監控Blob文件的變化。

首先是創建一個Azure Functions的Project

技術分享圖片

然後指定Function是用Blob Trigger的。

技術分享圖片

創建ListeningBlob函數,

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Queue;

namespace MS.CSU.mp3encoder { public static class ListeningBlob { static string key_Convert = Environment.GetEnvironmentVariable("KeyConvert") ?? "-i \"{0}\" -codec:a libmp3lame -b:a {1} \"{2}\" -y"; static string work_Dir = Path.GetTempPath(); static string targetStorageConnection = Environment.GetEnvironmentVariable("targetStorageConnection"); static string sourceStorageConnection = Environment.GetEnvironmentVariable("sourceStorageConnection"); static string bitRates = Environment.GetEnvironmentVariable("bitRates") ?? "192k;128k;64k"; static string keyPreview = Environment.GetEnvironmentVariable("keyPreview") ?? "-ss 0 -t 29 -i \"{0}\" \"{1}\""; static CloudBlobClient blobOutputClient; static string blobOutputContainerName = Environment.GetEnvironmentVariable("outputContainer") ?? "output"; static CloudBlobContainer blobOutputContainer; static CloudBlobClient blobInputClient; static CloudBlobContainer blobInputContainer; [FunctionName("ListeningBlob")] [return: Queue("jobs")] public static void Run([BlobTrigger("source/{name}", Connection = "sourceStorageConnection")]Stream myBlob, string name, Uri uri, TraceWriter log) { AzureBatch batch = new AzureBatch(sourceStorageConnection); //保證每個音頻文件都有自己的處理文件夾,避免沖突 Guid jobId = Guid.NewGuid(); log.Info($"Job:{jobId},C# Blob trigger function Processed blob\n Name:{name} \n Size: {myBlob.Length} Bytes,Path:{uri.ToString()}"); //將源Blob剪切到TargetBlob,將源文件移出監控Blob容器,避免誤觸發 try { initBlobClient(); CloudBlockBlob sourceBlob = blobInputContainer.GetBlockBlobReference($"{name}"); name = Path.GetFileNameWithoutExtension(name); CloudBlockBlob targetBlob = blobOutputContainer.GetBlockBlobReference($"{name}_{jobId}/{name}.mp3"); targetBlob.StartCopy(sourceBlob); sourceBlob.Delete(); uri = targetBlob.Uri; } catch (Exception err) { log.Error($"刪除源Blob錯誤!Err:{err}"); return ; } List<EncodeJob> jobs = new List<EncodeJob>(); string url = Uri.EscapeUriString(uri.ToString()); log.Info($"需要轉換的碼率:{bitRates}"); string[] bitsRateNames = bitRates.Split(;); Dictionary<string, bool> status = new Dictionary<string, bool>(); foreach (var s in bitsRateNames) { if (string.IsNullOrWhiteSpace(s)) continue; var job = new EncodeJob() { OutputName = $"{name}{s}.mp3", Name = name, Command = string.Format(key_Convert, name, s, $"{name}{s}.mp3"), id = jobId, InputUri = uri }; batch.QueueTask(job); } var previewJob = new EncodeJob() { Name = name, OutputName = $"{name}preview.mp3", Command = string.Format(keyPreview, name, $"{name}preview.mp3"), InputUri = uri, id = jobId, }; batch.QueueTask(previewJob); //Directory.Delete($"{work_Dir}\\{jobId}",true); } static void initBlobClient() { CloudStorageAccount storageOutputAccount = CloudStorageAccount.Parse(targetStorageConnection); // Create a blob client for interacting with the blob service. blobOutputClient = storageOutputAccount.CreateCloudBlobClient(); blobOutputContainer = blobOutputClient.GetContainerReference(blobOutputContainerName); blobOutputContainer.CreateIfNotExists(); //初始化輸入的Storage容器 CloudStorageAccount storageInputAccount = CloudStorageAccount.Parse(sourceStorageConnection); // Create a blob client for interacting with the blob service. blobInputClient = storageInputAccount.CreateCloudBlobClient(); blobInputContainer = blobInputClient.GetContainerReference("source"); } } }


創建Batch服務賬號,並且獲取Batch Account的相關信息。

技術分享圖片技術分享圖片

到https://ffmpeg.zeranoe.com/下載最新的ffmpeg程序,安裝後將ffmpeg.exe單獨壓縮成zip文件,然後上傳到Batch中,為程序調用做準備,

技術分享圖片技術分享圖片

構建Azure Batch類用於調用Azure Batch進行ffmpeg進行轉換

using Microsoft.Azure.Batch;
using Microsoft.Azure.Batch.Auth;
using Microsoft.Azure.Batch.Common;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MS.CSU.mp3encoder
{
    public class AzureBatch
    {
        //ffmpeg相關信息;
        string env_appPackageInfo = Environment.GetEnvironmentVariable("ffmpegversion")??"ffmpeg 3.4";
        string appPackageId = "ffmpeg";
        string appPackageVersion = "3.4";
        // Pool and Job constants
        private const string PoolId = "WinFFmpegPool";
        private const int DedicatedNodeCount = 0;
        private const int LowPriorityNodeCount = 5;
        //指定執行轉碼任務的VM機型
        private const string PoolVMSize = "Standard_F2";
        private const string JobName = "WinFFmpegJob";
        string outputStorageConnection;
        string outputContainerName = "output";
        string batchAccount = Environment.GetEnvironmentVariable("batchAccount");
        string batchKey = Environment.GetEnvironmentVariable("batchKey");
        string batchAccountUrl = Environment.GetEnvironmentVariable("batchAccountUrl");
        string strMaxTaskPerNode = Environment.GetEnvironmentVariable("MaxTaskPerNode") ?? "4";
       //設置每個計算節點能同時處理的任務數量,可根據選擇的VM類型和任務類型適當調整
        int maxTaskPerNode=4;
        public AzureBatch(string storageConnection)
        {
            outputStorageConnection = storageConnection;
        }
        //用於單元測試時創建Batch對象
        public AzureBatch(string storageConnection, string _batchAccount, string _batchAccountUrl, string _batchKey)
        {
            outputStorageConnection = storageConnection;
            batchAccount = _batchAccount;
            batchAccountUrl = _batchAccountUrl;
            batchKey = _batchKey;
            maxTaskPerNode = int.TryParse(strMaxTaskPerNode, out maxTaskPerNode) ? maxTaskPerNode : 4;
            appPackageId = env_appPackageInfo.Split( )[0] ?? "ffmpeg";
            appPackageVersion = env_appPackageInfo.Split( )[1] ?? "3.4";
        }




        /// <summary>
        /// Returns a shared access signature (SAS) URL providing the specified
        ///  permissions to the specified container. The SAS URL provided is valid for 2 hours from
        ///  the time this method is called. The container must already exist in Azure Storage.
        /// </summary>
        /// <param name="blobClient">A <see cref="CloudBlobClient"/>.</param>
        /// <param name="containerName">The name of the container for which a SAS URL will be obtained.</param>
        /// <param name="permissions">The permissions granted by the SAS URL.</param>
        /// <returns>A SAS URL providing the specified access to the container.</returns>
        private string GetContainerSasUrl(CloudBlobClient blobClient, string containerName, SharedAccessBlobPermissions permissions)
        {
            // Set the expiry time and permissions for the container access signature. In this case, no start time is specified,
            // so the shared access signature becomes valid immediately. Expiration is in 2 hours.
            SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
            {
                SharedAccessExpiryTime = DateTime.UtcNow.AddHours(2),
                Permissions = permissions
            };

            // Generate the shared access signature on the container, setting the constraints directly on the signature
            CloudBlobContainer container = blobClient.GetContainerReference(containerName);
            string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);

            // Return the URL string for the container, including the SAS token
            return String.Format("{0}{1}", container.Uri, sasContainerToken);
        }


        // BATCH CLIENT OPERATIONS - FUNCTION IMPLEMENTATIONS

        /// <summary>
        /// Creates the Batch pool.
        /// </summary>
        /// <param name="batchClient">A BatchClient object</param>
        /// <param name="poolId">ID of the CloudPool object to create.</param>
        private void CreatePoolIfNotExist(BatchClient batchClient, string poolId)
       {
        //    if (batchClient.PoolOperations.GetPool(poolId) != null)
        //    {
        //        return;
        //    }
            CloudPool pool = null;
            try
            {
                ImageReference imageReference = new ImageReference(
                        publisher: "MicrosoftWindowsServer",
                        offer: "WindowsServer",
                        sku: "2012-R2-Datacenter-smalldisk",
                        version: "latest");

                //ImageReference imageReference = new ImageReference(
                //        publisher: "MicrosoftWindowsServer",
                //        offer: "WindowsServer",
                //        sku: "2016-Datacenter-samlldisk",
                //        version: "latest");

                VirtualMachineConfiguration virtualMachineConfiguration =
                new VirtualMachineConfiguration(
                    imageReference: imageReference,
                    nodeAgentSkuId: "batch.node.windows amd64");

                // Create an unbound pool. No pool is actually created in the Batch service until we call
                // CloudPool.Commit(). This CloudPool instance is therefore considered "unbound," and we can
                // modify its properties.
                pool = batchClient.PoolOperations.CreatePool(
                    poolId: poolId,
                    targetDedicatedComputeNodes: DedicatedNodeCount,
                    targetLowPriorityComputeNodes: LowPriorityNodeCount,
                    virtualMachineSize: PoolVMSize,
                    
                    virtualMachineConfiguration: virtualMachineConfiguration);
                pool.MaxTasksPerComputeNode = maxTaskPerNode;

                // Specify the application and version to install on the compute nodes
                // This assumes that a Windows 64-bit zipfile of ffmpeg has been added to Batch account
                // with Application Id of "ffmpeg" and Version of "3.4".
                // Download the zipfile https://ffmpeg.zeranoe.com/builds/win64/static/ffmpeg-3.4-win64-static.zip
                // to upload as application package
                pool.ApplicationPackageReferences = new List<ApplicationPackageReference>
                {
                    new ApplicationPackageReference
                    {
                    ApplicationId = appPackageId,
                    Version = appPackageVersion
                    }
                };


                pool.Commit();
            }
            catch (BatchException be)
            {
                // Accept the specific error code PoolExists as that is expected if the pool already exists
                if (be.RequestInformation?.BatchError?.Code == BatchErrorCodeStrings.PoolExists)
                {
             //       Console.WriteLine("The pool {0} already existed when we tried to create it", poolId);
                }
                else
                {
                    throw; // Any other exception is unexpected
                }
            }
        }

        /// <summary>
        /// Creates a job in the specified pool.
        /// </summary>
        /// <param name="batchClient">A BatchClient object.</param>
        /// <param name="jobId">ID of the job to create.</param>
        /// <param name="poolId">ID of the CloudPool object in which to create the job.</param>
        private void CreateJobIfNotExist(BatchClient batchClient, string jobId, string poolId)
        {
            //if (batchClient.JobOperations.GetJob(jobId) != null)
            //    return;
            try
            {
                Console.WriteLine("Creating job [{0}]...", jobId);

                CloudJob job = batchClient.JobOperations.CreateJob();
                job.Id = $"{JobName}";
                job.PoolInformation = new PoolInformation { PoolId = poolId };

                job.Commit();
            }
            catch (BatchException be)
            {
                // Accept the specific error code JobExists as that is expected if the job already exists
                if (be.RequestInformation?.BatchError?.Code == BatchErrorCodeStrings.JobExists)
                {
                    Console.WriteLine("The job {0} already existed when we tried to create it", jobId);
                }
                else
                {
                    throw; // Any other exception is unexpected
                }
            }
        }

        /// <summary>
        /// 
        /// </summary>Creates tasks to process each of the specified input files, and submits them
        ///  to the specified job for execution.
        /// <param name="batchClient">A BatchClient object.</param>
        /// <param name="jobId">ID of the job to which the tasks are added.</param>
        /// <param name="inputFiles">A collection of ResourceFile objects representing the input file
        /// to be processed by the tasks executed on the compute nodes.</param>
        /// <param name="outputContainerSasUrl">The shared access signature URL for the Azure 
        /// Storagecontainer that will hold the output files that the tasks create.</param>
        /// <returns>A collection of the submitted cloud tasks.</returns>
        private List<CloudTask> AddTasks(BatchClient batchClient,EncodeJob job, string outputContainerSasUrl)
        {


            // Create a collection to hold the tasks added to the job:
            List<CloudTask> tasks = new List<CloudTask>();


            // Assign a task ID for each iteration
            var taskId = String.Format("Task{0}", Guid.NewGuid());

            // Define task command line to convert the video format from MP4 to MP3 using ffmpeg.
            // Note that ffmpeg syntax specifies the format as the file extension of the input file
            // and the output file respectively. In this case inputs are MP4.
            string appPath = String.Format("%AZ_BATCH_APP_PACKAGE_{0}#{1}%", appPackageId, appPackageVersion);
            string inputMediaFile = job.Name;
            string outputMediaFile = job.OutputName;

            string taskCommandLine = String.Format("cmd /c {0}\\ffmpeg.exe {1}", appPath, job.Command);
            // Create a cloud task (with the task ID and command line) and add it to the task list
            CloudTask task = new CloudTask(taskId, taskCommandLine);
            task.ApplicationPackageReferences = new List<ApplicationPackageReference>
                {
                    new ApplicationPackageReference
                    {
                    ApplicationId = appPackageId,
                    Version = appPackageVersion
                    }
                };
            task.ResourceFiles = new List<ResourceFile>();
            task.ResourceFiles.Add(new ResourceFile(Uri.EscapeUriString(job.InputUri.ToString()), inputMediaFile));

            // Task output file will be uploaded to the output container in Storage.

            List<OutputFile> outputFileList = new List<OutputFile>();
            OutputFileBlobContainerDestination outputContainer = new OutputFileBlobContainerDestination(outputContainerSasUrl,$"{job.Name}_{job.id}/{job.OutputName}");
            OutputFile outputFile = new OutputFile(outputMediaFile,
                                                   new OutputFileDestination(outputContainer),
                                                   new OutputFileUploadOptions(OutputFileUploadCondition.TaskSuccess));
            outputFileList.Add(outputFile);
            task.OutputFiles = outputFileList;
            tasks.Add(task);

            // Call BatchClient.JobOperations.AddTask() to add the tasks as a collection rather than making a
            // separate call for each. Bulk task submission helps to ensure efficient underlying API
            // calls to the Batch service. 
            batchClient.JobOperations.AddTask($"{JobName}", tasks);

            return tasks;
        }
        private CloudBlobClient initBlobClient()
        {
            CloudStorageAccount storageOutputAccount = CloudStorageAccount.Parse(outputStorageConnection);
            // Create a blob client for interacting with the blob service.

            var blobOutputClient = storageOutputAccount.CreateCloudBlobClient();
            return blobOutputClient;
            //blobOutputContainer = blobOutputClient.GetContainerReference(blobOutputContainerName);
            //blobOutputContainer.CreateIfNotExists();


        }
        public void QueueTask(EncodeJob job)
        {
            BatchSharedKeyCredentials sharedKeyCredentials = new BatchSharedKeyCredentials(batchAccountUrl, batchAccount, batchKey);
            var blobClient = initBlobClient();
            var outputContainerSasUrl = GetContainerSasUrl(blobClient, outputContainerName, SharedAccessBlobPermissions.Write);
            using (BatchClient batchClient = BatchClient.Open(sharedKeyCredentials))
            {
                // Create the Batch pool, which contains the compute nodes that execute the tasks.
                CreatePoolIfNotExist(batchClient, PoolId);

                // Create the job that runs the tasks.
                CreateJobIfNotExist(batchClient, $"{JobName}", PoolId);

                // Create a collection of tasks and add them to the Batch job. 
                // Provide a shared access signature for the tasks so that they can upload their output
                // to the Storage container.
                AddTasks(batchClient,job,outputContainerSasUrl);
                             
            }
        }


        public async Task<Tuple<string,int>>  GetStatus()
        {
            BatchSharedKeyCredentials sharedKeyCredentials = new BatchSharedKeyCredentials(batchAccountUrl, batchAccount, batchKey);
            string result = "正在獲取任務信息...";
            int total = 0;
            using (BatchClient batchClient = BatchClient.Open(sharedKeyCredentials))
            {
                var counts =await batchClient.JobOperations.GetJobTaskCountsAsync(JobName);
                total = counts.Active + counts.Running + counts.Completed;

                result = $"總任務:{total},等待的任務:{counts.Active},運行中的任務:{counts.Running},成功的任務:{counts.Succeeded},失敗的任務:{counts.Failed}";
            }
            return new Tuple<string,int>(result,total);
        }
    }
}

由於Azure Functions的最大Timeout時間為10分鐘,當執行一些大型的文件轉換時如果是同步執行往往會導致超時錯誤,所以我們需要在調用完Batch的任務後即可返回,讓Batch Task後臺執行。為了監控這些Task的完成狀況,我們需要構建一個定時的Functions來檢查任務狀態。然後將獲取到的狀態信息寫到output Blob Container的status.html中就好了

using System;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;

namespace MS.CSU.mp3encoder
{
    /// <summary>
    /// 用於更新任務處理狀態
    /// </summary>
    public static class StatusUpdate
    {
        static int lastTotal=0;
        static DateTime lastSubmitTime;
        static string targetStorageConnection = Environment.GetEnvironmentVariable("targetStorageConnection");
        static CloudBlobClient blobOutputClient;

        static string blobOutputContainerName = Environment.GetEnvironmentVariable("outputContainer") ?? "output";

        static CloudBlobContainer blobOutputContainer;
        [FunctionName("StatusUpdate")]
        public async static Task Run([TimerTrigger("0 */5 * * * *")]TimerInfo myTimer, TraceWriter log)
        {
            string strStatus = "";
            int jobCount = 0;
            try
            {
                AzureBatch batch = new AzureBatch(targetStorageConnection);
              var result=await batch.GetStatus();
                strStatus = result.Item1;
                jobCount = result.Item2 - lastTotal;
                if (lastTotal != result.Item2)
                {
                    lastTotal = result.Item2;
                    
                    lastSubmitTime = DateTime.Now;
                }
                   
            }
            catch (Exception err)
            {
                strStatus = Uri.EscapeDataString(err.ToString());
            };
            initBlobContainer();
            var statusBlob = blobOutputContainer.GetBlockBlobReference("status.html");
            string htmlStatus =$@"<html>
                                    <head>
                                        <meta http-equiv=""refresh"" content=""5"">
                                        < meta charset=""utf-8"">
                                    </head>
                                    <body>
                                    <h1>{strStatus}</h1><br/>
                                    <h1>最後更新 :{DateTime.Now.AddHours(8)}</h1>
                                    <h1>上次任務提交時間:{lastSubmitTime.AddHours(8)}<h1>
                                    <h2>上次任務最後五分鐘內提交了{jobCount}<h2>
                                    </body>
                                </html>";
            await statusBlob.UploadTextAsync(htmlStatus);
            
        }
        private static void initBlobContainer()
        {
            CloudStorageAccount storageOutputAccount = CloudStorageAccount.Parse(targetStorageConnection);
            // Create a blob client for interacting with the blob service.

            blobOutputClient = storageOutputAccount.CreateCloudBlobClient();

            blobOutputContainer = blobOutputClient.GetContainerReference(blobOutputContainerName);
            blobOutputContainer.CreateIfNotExists();
         
        }
    }
}

最終效果:

技術分享圖片


引用資料:

Azure Blob storage bindings for Azure Functions

Timer trigger for Azure Functions

Azure Batch .NET File Processing with ffmpeg

FFmpeg MP3 Encoding Guide

Azure Functions + Azure Batch實現MP3音頻轉碼方案