Processing petabytes of data in a couple of hours
without expending a fortune
The idea
Well… actually, a sample of the internet, thanks to http://commoncrawl.org/
The idea is to use a Spark cluster provided by AWS EMR, to calculate the average size of a sample of the internet. We’ll do it using the WARC files provided from the guys at Common Crawl.
I’ll use the Content-Length header from the metadata to make the numbers.
The steps
Go to https://console.aws.amazon.com/elasticmapreduce -> Create cluster. Ensure that Spark 2.4.0 is checked and then click Next.

Then, in “Hardware” config, let “Uniform instance groups” checked. Then you need to define the architecture of your cluster. In order to save some money, I’ll use Spot Instances bidding USD 0.070 per instance/hr. I’ll use a Master node, 2 Task nodes and 2 Core nodes, like the following image. Then click “Next”.

Then we need to set up the General Cluster Settings. Nothing important here for us, except for this:

We basically need to run the same python script in all of our cluster nodes, in order to get the same packages installed in all the machines. To accomplish this, we set up a custom action in “Bootstrap Actions” section. Previously, we need to upload a .sh file (to AWS S3 or some other public location) with the following commands:
That’s going to install boto, warc (needed to open the WARC files provided by Common Crawl) and gzipstream packages.
Ok, so now we need to SSH our master node and start running/installing things. Inside our master node, we need to grab the CommonCrawl’s April 2017 metadata file from S3. Unzip it, and take only the first 5 lines in order to make tests. The goal of this step is to get that .gz file in the /home dir of the “hadoop” user in our Master node:

Note: The default user for your AWS EMR nodes is “hadoop” and NOT “ec2-user”
We’re ready to copy our source data files to the Hadoop Task Nodes.
Ready to the good Spark/Hadoop sauce! The Python code that does the magic:
get_content_length function is the portion of code that is going to be executed in the Task Nodes (don’t write down any logging to the stdout there because if you run things up from the master node, you’re not going to see a thing in the output).
The rest of the code is pretty much standard for a Spark application:
- create a Spark Context
- create a RDD from the textfile
- use mapPartitionsWithSplit to run the get_content_length job
- for those that are used to use Scala with Apache Spark, a word of advice: mapPartitionsWithSplit is deprecated in the Apache Spark SDK for Scala (mapPartitionsWithIndex is use nowadays), but not for pyspark, which is our main package to use Spark in Python
- we calculate the mean and then with that mean value we create a new RDD just to be able to write it to a log file (this step is no necessary really)
We name the file content_length_job.py and then we upload it to the Master Node via SSH like:
With the .py script in the master node, the last step is to execute spark-submit:
If you get an exception trying to build the SparkContext, you probably have this issue:
https://stackoverflow.com/questions/53140852/cant-get-a-sparkcontext-in-new-aws-emr-cluster
It has to do with how the loggers are configured using the ${spark.yarn.app.container.log.dir}
system property.
After fixing that, run spark-submit again, wait for the jobs to end… and we’re done!
You can verify the results accessing the Hadoop distributed filesystem like this:
Cheers!