1. 程式人生 > >AWS學習筆記(七)--集成SQS和Lambda

AWS學習筆記(七)--集成SQS和Lambda

n) eip mha 調整 scope 根據 直接 ast fun

本文介紹了集成SQS和Lambda的方法,代碼基於JAVA SDK。

POM配置

<dependencies>  
    <dependency>  
        <artifactId>aws-java-sdk-sqs</artifactId>  
        <groupId>com.amazonaws</groupId>  
    </dependency>  
    <dependency>  
        <groupId>com.amazonaws</groupId>  
        <artifactId>aws-lambda-java-core</artifactId>  
        <version>1.2.0</version>  
    </dependency>  
    <dependency>  
        <groupId>com.amazonaws</groupId>  
        <artifactId>aws-lambda-java-events</artifactId>  
        <version>2.0.2</version>  
    </dependency>  
    <dependency>  
        <groupId>com.amazonaws</groupId>  
        <artifactId>aws-lambda-java-log4j2</artifactId>  
        <version>1.1.0</version>  
    </dependency>  
</dependencies>  

<dependencyManagement>  
    <dependencies>  
        <dependency>  
            <groupId>com.amazonaws</groupId>  
            <artifactId>aws-java-sdk-bom</artifactId>  
            <version>1.11.272</version>  
            <type>pom</type>  
            <scope>import</scope>  
        </dependency>  
    </dependencies>  
</dependencyManagement>

SQS

中國區目前僅支持標準Queue,不支持FIFO Queue,下面代碼以標準Queue為例,演示了創建Queue、配置Dead Letter Queue、發送Message、接收Message、刪除Message、刪除Queue的方法:

import com.amazonaws.regions.Regions;  
import com.amazonaws.services.sqs.AmazonSQS;  
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;  
import com.amazonaws.services.sqs.model.*;  

import java.util.*;  

public class SqsUtil {  
    private static final String ARN_ATTRIBUTE_NAME = "QueueArn";  
    private static AmazonSQS sqs;  

    static {  
        sqs = AmazonSQSClientBuilder.standard().withRegion(Regions.CN_NORTH_1).build();  
    }  

    private SqsUtil() {  
    }  

    public static String createQueue(String queueName) {  
        System.out.println("Creating a new SQS queue called " + queueName);  

        CreateQueueRequest createQueueRequest = new CreateQueueRequest(queueName);  
        Map<String, String> attributes = new HashMap<>();  
        // 接收消息等待時間  
        attributes.put("ReceiveMessageWaitTimeSeconds", "5");  
        createQueueRequest.withAttributes(attributes);  

        return sqs.createQueue(createQueueRequest).getQueueUrl();  
    }  

    public static String createDeadLetterQueue(String queueName) {  
        String queueUrl = createQueue(queueName);  
        // 配置Dead Letter Queue時使用ARN  
        return getQueueArn(queueUrl);  
    }  

    public static void configDeadLetterQueue(String queueUrl, String deadLetterQueueArn) {  
        System.out.println("Config dead letter queue for " + queueUrl);  

        SetQueueAttributesRequest queueAttributes = new SetQueueAttributesRequest();
                Map<String, String> attributes = new HashMap<>();  
        // 最大接收次數設為5,當接收次數超過5後,消息未被處理和刪除將被轉到死信隊列  
        attributes.put("RedrivePolicy", "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\"" + deadLetterQueueArn + "\"}");  
        queueAttributes.setAttributes(attributes);  
        queueAttributes.setQueueUrl(queueUrl);  

        sqs.setQueueAttributes(queueAttributes);  
    }  

    public static void sendMessage(String queueUrl, String message) {  
        System.out.println("Sending a message to " + queueUrl);  

        SendMessageRequest request = new SendMessageRequest();  
        request.withQueueUrl(queueUrl);  
        request.withMessageBody(message);  
        Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();  
        // 添加消息屬性,註意必須要有DataType和Value  
        messageAttributes.put("Hello", new MessageAttributeValue().withDataType("String").withStringValue("COCO"));  
        request.withMessageAttributes(messageAttributes);  

        sqs.sendMessage(request);  
    }  

    public static void receiveMessages(String queueUrl) {  
        System.out.println("Receiving messages from " + queueUrl);  

        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);  
        receiveMessageRequest.setMaxNumberOfMessages(5);  
        receiveMessageRequest.withWaitTimeSeconds(10);  
        // 要添加MessageAttributeNames,否則不能接收  
        receiveMessageRequest.setMessageAttributeNames(Arrays.asList("Hello"));  

        List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();  
        for (Message message : messages) {  
            System.out.println("Message: " + message.getBody());  
            for (Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet()) {  
                System.out.println("  Attribute");  
                System.out.println("    Name:  " + entry.getKey());  
                System.out.println("    Value: " + entry.getValue().getStringValue());  
            }

                        // Delete message  
            System.out.println("Deleting a message.");  
            String messageReceiptHandle = message.getReceiptHandle();  
            sqs.deleteMessage(new DeleteMessageRequest(queueUrl, messageReceiptHandle));  
        }  
    }  

    public static void deleteQueue(String queueUrl) {  
        System.out.println("Deleting the queue " + queueUrl);  
        sqs.deleteQueue(new DeleteQueueRequest(queueUrl));  
    }  

    public static String getQueueArn(String queueUrl) {  
        List<String> attributes = new ArrayList<>();  
        attributes.add(ARN_ATTRIBUTE_NAME);  
        GetQueueAttributesResult queueAttributes = sqs.getQueueAttributes(queueUrl, attributes);  
        return queueAttributes.getAttributes().get(ARN_ATTRIBUTE_NAME);  
    }  

}

在運行上面代碼前,要在{HOME}/.aws目錄下配置credentials,用戶要有SQS權限:
[default]
aws_access_key_id = AAAAAAAAAAAAAA
aws_secret_access_key = MXXXXXXXXXXXXXXXXXXXXXX9

測試一下:

// 創建Dead Letter Queue  
String deadLetterQueueArn = createDeadLetterQueue("DeadLetterQueue");  
// 創建Task Queue  
String queueUrl = createQueue("TaskQueue");  
// 配置Dead Letter Queue  
configDeadLetterQueue(queueUrl, deadLetterQueueArn);  
// 發送Message  
for (int i = 0; i < 6; i++) {  
    sendMessage(queueUrl, "Hello COCO " + i);  
}  
// 接收Message  
receiveMessages(queueUrl);  
// 刪除Queue  
deleteQueue(queueUrl);

Lambda

Function Code

Lambda函數定義支持兩種方式 :

  • 實現預定義接口RequestStreamHandler 或 RequestHandler
    import com.amazonaws.services.lambda.runtime.RequestHandler;  
    import com.amazonaws.services.lambda.runtime.Context;
    public class Hello implements RequestHandler<Request, Response> {  
    // Request,Response為自定義的類型  
    public Response handleRequest(Request request, Context context) {  
        String greetingString = String.format("Hello %s %s.", request.firstName, request.lastName);  
        return new Response(greetingString);  
    }  
    }
  • 不實現任何接口,直接定義處理程序方法
    outputType handler-name(inputType input, Context context) {  
    ...  
    }

inputType 和 outputType 可為以下類型之一:

  • Java 基元類型(如 String 或 int)。
  • aws-lambda-java-events 庫中的預定義 AWS 事件類型。 如S3Event。
  • 自己的 POJO 類。AWS Lambda 會根據該 POJO 類型自動序列化和反序列化輸入、輸出 JSON。

如不需要,可以省略處理程序方法簽名中的 Context 對象。

先編寫一個簡單的測試用例接收SQS消息,輸入參數input為Queue URL:

import com.amazonaws.services.lambda.runtime.Context;  
import com.amazonaws.services.lambda.runtime.LambdaLogger;  
import com.amazonaws.services.lambda.runtime.RequestHandler;  

public class Hello implements RequestHandler<String, String> {  
    @Override  
    public String handleRequest(String input, Context context) {  
        LambdaLogger logger = context.getLogger();  
        logger.log("received : " + input);  
        SqsUtil.receiveMessages(input);  
        return "success";  
    }  
}

程序編寫完了,如何放入到Lambda函數中呢?需要打成jar包,且須包含依賴包,pom中增加shade插件:

<build>  
    <plugins>  
        <plugin>  
            <groupId>org.apache.maven.plugins</groupId>  
            <artifactId>maven-shade-plugin</artifactId>  
            <version>3.1.0</version>  
            <configuration>  
                <createDependencyReducedPom>false</createDependencyReducedPom>  
            </configuration>  
            <executions>  
                <execution>  
                    <phase>package</phase>  
                    <goals>  
                        <goal>shade</goal>  
                    </goals>  
                </execution>  
            </executions>  
        </plugin>  
    </plugins>  
</build>

創建Lambda Function

下面通過Web Console創建Lambda Function
技術分享圖片
註意:role要有lambda、Cloudwatch Logs、SQS權限。

然後上傳jar包,配置Handler
技術分享圖片
再調整一下內存配置和超時參數,保存。
技術分享圖片
配置測試參數,測試一下先:
技術分享圖片
執行成功輸出:
技術分享圖片

Lambda觸發器

下面修改一下代碼,輸入參數類型改為ScheduledEvent,將使用觸發器CloudWatch Events調用。

import com.amazonaws.services.lambda.runtime.Context;  
import com.amazonaws.services.lambda.runtime.LambdaLogger;  
import com.amazonaws.services.lambda.runtime.RequestHandler;  
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent;  

public class Hello implements RequestHandler<ScheduledEvent, String> {  
    @Override  
    public String handleRequest(ScheduledEvent input, Context context) {  
        LambdaLogger logger = context.getLogger();  
        logger.log("received : " + input.toString() + "\n");  
        SqsUtil.receiveMessages("https://sqs.cn-north-1.amazonaws.com.cn/891245999999/TaskQueue");  
        return "success";  
    }  
} 

上傳後,同樣先手工測試一下,這次選擇模板Scheduled Event
技術分享圖片
測試成功後,配置CloudWatch Events觸發器,Rule Type選擇Schedule expression:
技術分享圖片
保存後就可以定時調用lambda了,O。

Integrate SQS and Lambda: serverless architecture for asynchronous workloads
Amazon Simple Queue Service Developer Guide
AWS Lambda Developer Guide
Programming Model for Authoring Lambda Functions in Java
AWS SDK for Java Developer Guide
Schedule Expressions Using Rate or Cron
AWS 視頻中心
AWS微服務和無服務器架構入門
快速理解AWS Lambda,輕松構建Serverless後臺
用無服務器應用模型構建AWS Lambda應用
如何通過運行無服務器來滿足企業需求
無服務器架構設計模式和最佳實踐

AWS學習筆記(七)--集成SQS和Lambda