1. 程式人生 > >Analyze Your Data on Amazon DynamoDB with Apache Spark

Analyze Your Data on Amazon DynamoDB with Apache Spark

Manjeet Chayel is a Solutions Architect with AWS

Every day, tons of customer data is generated, such as website logs, gaming data, advertising data, and streaming videos. Many companies capture this information as it’s generated and process it in real time to understand their customers.

Amazon DynamoDB is a fast and flexible NoSQL database service for applications that need consistent, single-digit-millisecond latency at any scale. It is a fully managed database, supporting both key-value and key-sorted set schemas.

Its flexible data model and reliable performance make it a great fit for mobile, web, gaming, ad tech, the Internet of Things, and many other applications, including the type of real-time data processing I’m talking about. In this blog post, I’ll show you how to use Apache Spark to process customer data in DynamoDB.

With the Amazon EMR 4.3.0 release, you can run Apache Spark 1.6.0 for your big data processing. When you launch an EMR cluster, it comes with the emr-hadoop-ddb.jar library required to let Spark interact with DynamoDB. Spark also natively supports applications written in Scala, Python, and Java and includes several tightly integrated libraries for SQL (

Spark SQL), machine learning (MLlib), stream processing (Spark Streaming), and graph processing (GraphX). These tools make it easier to leverage the Spark framework for a variety of use cases.

How Spark talks to DynamoDB

Spark provides HadoopRDD for reading stored data (for example, to work with files in HDFS, tables in HBase, or objects in Amazon S3), which you can access by using the Hadoop FileSystem interface. Spark also uses familiar approaches like Hadoop InputFormat, OutputFormat, and InputSplits to map the underlying dataset to a resilient distributed dataset (RDD). I recommend you brush up on these subjects. When you call the HadoopRDD API, it uses the MapReduce API (org.apache.hadoop.mapred) to read and write.

The HadoopRDD class definition is as follows:

HadoopRDD(sc: SparkContext, conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int)

When a job starts and you use HadoopRDD to read from a data source, on the back end using MapReduce API operations, each input file is broken into splits and each task processes a single split. Each split is further divided into records of key-value pairs that the Spark task processes one record at a time.

The simplest way for Spark to interact with DynamoDB is to build a connector that talks to DynamoDB by implementing the simple Hadoop interfaces.

Amazon EMR provides an implementation of this connector as part of emr-hadoop-ddb.jar, which contains the DynamoDBItemWriteable class. Using this class, you can implement your own DynamoDBInputFormat as shown below.

public class DynamoDbInputFormat implements InputFormat, Serializable {

    @Override
    public InputSplit[] getSplits(@NonNull final JobConf job, final int numSplits) throws IOException {
        final int splits = Integer.parseInt(requireNonNull(job.get(NUMBER_OF_SPLITS), NUMBER_OF_SPLITS
            + " must be non-null"));

        return IntStream.
            range(0, splits).
            mapToObj(segmentNumber -> new DynamoDbSplit(segmentNumber, splits)).
            toArray(InputSplit[]::new);
}

Because the application does not know the number of splits on DynamoDB, you have to pass them manually by setting them up in a job configuration.

public class DynamoDbSplit implements InputSplit, Serializable {
    private int segmentNumber;
    private int splits;
 
// have setter and getter methods to read and set the variables
}

Finally, in the job configuration you pass the details about the splits and also the table name.

private static JobConf createDynamoDbJobConf() {
        final JobConf conf = new JobConf();
        conf.set("dynamodb.numberOfSplits", "10000");
        conf.set("dynamodb.tableName", "myDynamoDBTable");
	return conf;
}

We will write a simple Scala program to do a word count operation on a DynamoDB table using the preceding implementation. To get started, you need to complete the following steps:

1.Launch an EMR cluster with Spark

You can launch an EMR cluster with Spark and Hive. To learn how, see the Apache Spark topic.

2.Load data into DynamoDB

On the EMR cluster you just launched, load sample data into DynamoDB from a file present on S3. To learn how, see the Using Amazon Elastic MapReduce with DynamoDB post.

3.Read the DynamoDB table from the Spark program

Log in to your EMR cluster using any Secure Shell (SSH) client, as shown below.

ssh -o ServerAliveInterval=10 -i <YOUR-KEY-PAIR> [email protected]<EMR-MASTER-DNS>

We will run this example using a Spark interactive shell. You can invoke the Spark shell easily by entering the Spark shell and passing emr-ddb-hadoop.jar as an external jar. The example below counts the number of records in the DynamoDB table.

$ spark-shell --jars /usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar
   
import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
/* Importing DynamoDBInputFormat and DynamoDBOutputFormat */
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io.LongWritable
 
var jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("dynamodb.servicename", "dynamodb")
jobConf.set("dynamodb.input.tableName", "myDynamoDBTable")   // Pointing to DynamoDB table
jobConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com")
jobConf.set("dynamodb.regionid", "us-east-1")
jobConf.set("dynamodb.throughput.read", "1")
jobConf.set("dynamodb.throughput.read.percent", "1")
jobConf.set("dynamodb.version", "2011-12-05")
 
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
 
var orders = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
 
// Doing a count of items on Orders table
orders.count()

You can use Spark Streaming to process data coming from a live data stream, like one from Amazon Kinesis or Kafka. Spark Streaming is an extension of the core Spark framework. When you’ve processed the data, you can then store the results in DynamoDB, which can easily be used to back up applications to show metrics in real time on many items. Spark Streaming provides a high-level abstraction called a Discretized Stream or DStream, which represents a continuous sequence of RDDs. Spark Streaming uses the Amazon Kinesis Client Library (KCL) to consume data from an Amazon Kinesis stream.

The code snippet below from a Spark Streaming example creates as many DStreams (and in turn, KCL workers) as there are shards.

// Create the Amazon Kinesis DStreams
val kinesisStreams = (0 until numStreams).map { i =>
   KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
}

To learn how to optimize Spark Streaming to efficiently process Amazon Kinesis streams, I recommend you read the Optimize Spark-Streaming to Efficiently Process Amazon Kinesis Streams post.

Conclusion

I’ve shown you how to use Spark to process data in DynamoDB. You can implement the Hadoop interfaces to do some complex logic and persist the information in a NoSQL system to power a variety of applications.

Happy Sparking!

If you have questions or suggestions, please leave a comment below.

————————

Related

Looking to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.

相關推薦

Analyze Your Data on Amazon DynamoDB with Apache Spark

Manjeet Chayel is a Solutions Architect with AWS Every day, tons of customer data is generated, such as website logs, gaming data, adverti

Beginning Data Exploration and Analysis with Apache Spark 使用Apache Spark開始資料探索和分析 中文字幕

使用Apache Spark開始資料探索和分析 中文字幕 Beginning Data Exploration and Analysis with Apache Spark 無論您是想要探索資料還是開發複雜的機器學習模型,資料準備都是任何資料專業人士的主要任務 Spark是一種引擎,它

JVM-based deep learning on IoT data with Apache Spark

Romeo Kienzler works as a Chief Data Scientist in the IBM Watson IoT worldwide team helping clients to apply advanced machine learning at scale on their Io

Offset Management For Apache Kafka With Apache Spark Streaming

ould cond eth ref properly fine load them sca An ingest pattern that we commonly see being adopted at Cloudera customers is Apache Spark

Winning a Kaggle competition with Apache Spark and SparkML Machine Learning Pipelines

IBM Chief Data Scientist Romeo Kienzler demonstrates how to use the new DataFrames-based SparkML pipelines (with data from a recent Kaggle competition on

time bushfire alerting with Complex Event Processing in Apache Flink on Amazon EMR and IoT sensor network | AWS Big Data Blog

Bushfires are frequent events in the warmer months of the year when the climate is hot and dry. Countries like Australia and the United States are

6x speed-up on your data pre-processing with Python

Here’s how you can get a 2–6x speed-up on your data pre-processing with PythonGet a 2–6x speed-up on your pre-processing with these 3 lines of code!Python

Modernize Your Data Warehouse with Amazon Redshift

Data in every organization is growing in volume and complexity faster than ever. Yet, only a small fraction of this invaluable asset is available

Prepare Environment for Working with AWS CLI and Amazon DynamoDB on Amazon EC2

Amazon Web Services is Hiring. Amazon Web Services (AWS) is a dynamic, growing business unit within Amazon.com. We are currently hiring So

Migrate to Apache HBase on Amazon S3 on Amazon EMR: Guidelines and Best Practices

This blog post provides guidance and best practices about how to migrate from Apache HBase on HDFS to Apache HBase on Amazon S3 on Amazon EMR.

How to train your Neural Networks in parallel with Keras and Apache Spark

Apache Spark on IBM Watson StudioNow, we will finally train our Keras model using the experimental Keras2DML API. To be able to execute the following code,

Data Center Scale Computing and Artificial Intelligence with Matei Zaharia, Inventor of Apache Spark

Matei Zaharia, Chief Technologist at Databricks & Assistant Professor of Computer Science at Stanford University, in conversation with Joseph

Navigant Research Publishes Report on Key Issues with Autonomous Vehicle Data

The autonomous systems piloting self-driving cars will make decisions that have life-and-death consequences. There is little room for error in their abilit

Get Your Data Ready For Machine Learning in R with Pre

Tweet Share Share Google Plus Preparing data is required to get the best results from machine le

Large-Scale Machine Learning with Spark on Amazon EMR

This is a guest post by Jeff Smith, Data Engineer at Intent Media. Intent Media, in their own words: “Intent Media operates a platform for adverti

Amazon DynamoDB – Internet-Scale Data Storage the NoSQL Way

We want to make it very easy for you to be able to store any amount of semistructured data and to be able to read, write, and modify it quickly, e

Resolve "The provided key element does not match the schema" Error When Importing DynamoDB Tables Using Hive on Amazon EMR

2018-02-01 08:17:27,782 [INFO] [TezChild] |s3n.S3NativeFileSystem|: Opening 's3://bucket/folder/ddb_hive.sql' for reading 2018-02-01 08:17:27,81

Data Warehouse Modernization on Amazon Web Services

With ever growing data, organizations can gain valuable insights from analytics. However, to cost effectively capture and analyze data,

Multi-Tenant Storage with Amazon DynamoDB

Editor’s note: For the latest information, visit the . By Tod Golding, Partner Solutions Architect at AWS If you’re designing a tr

Quickly build, test, and deploy your data lake with AWS and partner solutions

Performing data science workloads on data from disparate sources – data lake, data warehouse, streaming, and more – creates challenges f