Building useful and scalable Data Lake

There are many options to collect and store data.
The most common one is to just pull it directly from the source and save it in relational db tables.
This works fine for small and medium datasets with well defined schemas. But what if you also have very large datasets with a variety of mixed schemas? How can you store them once and use them later for all your analytic use cases?

In this blog I will share my implementation of Data Lake in Gett that solved the above questions.
I will explain some major concerns regarding how to store data efficiently in a way that it can be easily accessed by Hadoop and other SQL query engines.

The Data Lake era

In the very early days of Gett, data analysis was quite simple. There was one relational database with one replica, and all data analyses could be easily done directly on it.
While Gett’s business is growing fast, the same is true for its data. Gett strives to be a data driven company, and the demand for new data is rising all the time. Dozens of Microservices are being created to support the new business features, each with their own database.

The requirement to estimate all new features right after their launch, is very critical for business success. Joining data from mobile apps (drivers and clients) with data from microservices, is essential for drawing a clear picture of what exactly is going on with the business.
It was very clear that with such a growing demand and with the fact that Gett was born as a cloud company, the solution for any data platform should be scalable, cloud based, agile and as innovative as Gett is.
Data should be streamed fast to a scalable storage that “never” ends, with zero disk and volume management – Storage as a Service.


The above image describes the implemented ingestion process of Gett’s Data Lake.
The data producer (driver app, client app, microservice) sends events to the relay service that quickly sends them in chunks to the Kinesis stream.
AWS lambda functions consume data events from Kinesis to s3, in a way that they will be easily accessed by other SQL engines.
Apache Spark consumes Kinesis events, to produce realtime business KPIs and advanced algorithms.

Store it wisely once, and query it fast later

Storing data in s3 is a very simple task when you just need archiving or intermediate storage for your data analytics pipeline. The challenge arises when you want to store data once and use it many times later with other SQL engines, like Presto.
Let’s say you have a stream of events that you want to analyze with SQL.
Your table will be something like this:
CREATE TABLE EVENTS (event_at timestamp, event_name string, payload string) LOCATION ‘s3://your-bucket/events/’

Data files are populated to your-bucket every 1 minute, so after 1 day you will end up with 1440 files and after 1 week with more than 10,000 files.
A simple query, like:
SELECT * FROM EVENTS where event_at between ‘2018-02-25 08:00’ and ‘2018-02-25 09:00’

will simultaneously scan the whole bucket until the requested filter is found.

You will probably want to improve it by skipping irrelevant files and only reading files that are relevant to the filter. Here comes the partition solution:
Files will be saved to s3 with the following prefix: event_date=YYYY-MM-DD/event_hour=HH/
Table definition will be changed to support the partitioning option:
CREATE TABLE EVENTS (event_at timestamp, event_name string, payload string) PARTITIONED BY (event_date date, event_hour int) LOCATION ‘s3://your-bucket/events/’

Now all files from the above query will only scan 2 prefixes:

Directly accessing relevant files, is called partitioning pruning and is one of the key features that makes your data lake useful and ready for analytics.

DO your analytics directly on RAW data

One of the most important KPIs for Gett is measuring the utilization of cars (supply).
The better cars are utilized, the more drivers are working to make more money.
The Driver app is sending data events with GPS location and status. This data is stored in s3 as described in the previous section.
Let’s use Presto as the SQL query engine to populate the car’s utilization KPI.
A sample event that is sent from the Driver app looks like this:
DriverInfo,2018-03-03 20:12:04,”{\“driver_id\”:123, \“status\”: \“in_ride\”, \“location\”: ,….}”

It is a csv stream: the first element is the event_name, the second is the event timestamp and the last is the json payload.
Our EVENTS table is defined with a simple csv parser and pointed to a relevant s3 location.
Now we can run this simple query to calculate the relevant KPI:
WITH sample AS (
SELECT event_date, event_hour, minute(event_ts) as event_minute,
json_extract_scalar(payload,’$[“status”]’) driver_status,
cast(json_extract_scalar(payload,’$[“driver_id”]’) as int) driver_id
FROM events WHERE event_name = ‘DriverInfo’
AND event_date >= date‘2018-03-01’
SELECT event_date, event_hour, event_minute,
count(distinct case when driver_status=’in_ride’ then driver_id else null end)
/ count(distinct driver_id) as getting_hour
FROM sample
GROUP BY event_date, event_hour, event_minute

First extract the sample data in the WITH clause. Use json_extract_scalar function to extract attributes from the json payload. Use the extracted attributes to calculate the required measure (getting_hour)

As you can see, ingesting data in time based partitions can bring much value with minimum effort.
The same partitions that were created in the ingestion process can be accessed later with various SQL engines (like Presto) to calculate business KPIs.
You don’t need to do any ETLs to extract mixed schemas in advance, and you also don’t need to worry about your disk space when using s3.

Use the ‘magic” of the partitions to store your raw data once and enjoy a ready to go scalable analytics platform for almost any use case.

What people are saying about this article

Be the first one to comment.