At Khan Academy, we currently have three frameworks for extracting meaningful information from the data we collect as students and teachers work their way through our content. Every day, we collect around 8 million data points on exercise and video interactions, and a few million more around community discussion, computer science programs, and registrations. Not to mention the raw web request logs, and some client-side events we send to MixPanel.
The most mature system runs Hive on Amazon EMR. This year, we have created a few jobs that run on top of the Google Mapreduce and Pipeline frameworks directly within Appengine. And, very recently, we have started exporting backups to Google Cloud Storage, which Google BigQuery can consume and run queries over at an insane speed.
We use Hive and GCS for data warehousing and giant queries that do not need to output pristine, realtime information. The Hive system has worked well since it was created, but now that the Google technologies are catching up, I am hopeful that continuing to move in that direction will make our lives much easier.
The Mapreduce library allows us to perform queries and data maintenance over production data, on production servers. The Pipeline library is a low-level distributed computing framework which is used for advanced operations that do not fit the Mapreduce semantics, which usually means chaining several Mapreduce jobs together.
Hive and Amazon Elastic MapReduce
Moving our data out of the production datastore and into Hive gives us a safe place to iterate on queries using the powerful HiveQL features like UNION, sub-queries, and, most importantly, JOIN. With these, we can answer questions like “Based on their students’ activity, which coaches have been the most active in the past year?”. We also use this system to train our machine learning models to understand students’ knowledge state and recommend content most likely to help them improve.
This setup requires a few transformations to get the data into a format that Hive can understand. Here is an outline of what happens.
- An hourly cron job running on an Amazon EC2 machine calls an API on khanacademy.org to download any datastore entities that have changed in the last hour. Entities are transferred in protobuf format, converted to JSON, written to a local file, and compressed. Each file contains JSON objects for all of the entities changed during that hour for a particular Model. Each line has the key for the entity, and the JSON string which represents it:
-
"entity key"t"a giant JSON string"
-
- A daily cron job running on the same machine launches several Hadoop jobs to concatenate all of the hourly data files into a single daily partition, which we write to Amazon S3. This results in a single partition per day, per Model. Each line of these files is in the same format mentioned above.
-
s3://ka/entity_store/ProblemLog/dt=2013-09-12/data.gz
-
- With the data in this format, we can mount rudimentary tables in Hive with:
CREATE EXTERNAL TABLE IF NOT EXISTS ProblemLog ( user string, json string ) PARTITIONED BY (dt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY 't' LOCATION 's3://ka/entity_store/ProblemLog'; ALTER TABLE ProblemLog RECOVER PARTITIONS; - We then use Hive to transform this data into a more tractable format.
INSERT OVERWRITE TABLE user_exercise_summary PARTITION (dt='${dt}') SELECT parsed.user, parsed.exercise, SUM(parsed.time_taken), SUM(parsed.correct), SUM(parsed.wrong), MAX(IF(parsed.proficient, 1, 0)) = 1 FROM ( SELECT get_json_object(ProblemLog.json, '$.user') AS user, get_json_object(ProblemLog.json, '$.exercise') AS exercise, get_json_object(ProblemLog.json, '$.time_taken') AS time_taken, IF(get_json_object(ProblemLog.json, '$.correct') = "true", 1, 0) AS correct, IF(get_json_object(ProblemLog.json, '$.correct') != "true", 1, 0) AS wrong, get_json_object(ProblemLog.json, '$.earned_proficiency') = "true" AS proficient FROM ProblemLog WHERE ProblemLog.dt = '${dt}' ) parsed GROUP BY parsed.user, parsed.exercise; - Finally, we shuttle any results we want to surface on a dashboard into a MongoDB instance, which can be easily accessed by JavaScript via sleepy-mongoose.
Now the data exists in a few places which provide varying levels of accessibility. You can fire up a Hive instance to do some heavy duty queries, or issue a REST call to download a compact summary.
Google Mapreduce on Appengine
Our most common use of the Mapreduce library is to perform schema changes and data clean-up across many entities in the datastore. For example, removing a now-unused property from a Model, awarding time-based achievement badges, or recalculating exercise progress based on the eventually-consistent problem logs.
Another common use is calculating aggregate data to determine things like the average time spent completing an exercise.
The Mapreduce library is powerful, but can be difficult to get working exactly as you want. I will talk in depth about how we use this library in a later post.
Google BigQuery on Cloud Storage
The Google Cloud Storage and BigQuery pipeline is conceptually very similar to the EMR and Hive pipeline, but much, much easier to setup. It provides many powerful statements, but lacks the ability to add custom map or reduce functions, which we have done in Hive. We have just started playing with these technologies, but speed of setup and retrieving query results has been very impressive. I hope to write more on this topic after we have used it more extensively.
Here are the steps I went through to setup and execute a simple query over one Model:
- From the Datastore Admin page, select the entities you want to backup, and click Backup Entities. Specify Google Cloud Storage and the bucket name, and then start the job. This will create a hierarchy of files, the bottom of which actually contains the data:
- <key>.backup_info – The job-level manifest file.
- <key>.YourModel.backup_info – The Model-level manifest file.
- <name>_<date>_<key2>-output-<shardNumber>-retry-<retryCount>
- From the Google BigQuery console, click through to Create a New Table, and then select Google Cloud Storage on the Select Data stage. Specify the <key>.YourModel.backup_info file created in the previous step as the source of data.
- After the job is finished, you can query the table with BigQuery!
This is easy to do by clicking around in the UI, but automating the entire process is a bit more difficult, which is what we are currently working on.