Data visualization and anomaly detection using Amazon Athena and Pandas from Amazon SageMaker

Many organizations use Amazon SageMaker for their machine learning (ML) requirements and source data from a data lake stored on Amazon Simple Storage Service (Amazon S3). The petabyte scale source data on Amazon S3 may not always be clean because data lakes ingest data from several source systems, such as like flat files, external feeds, databases, and Hadoop. It may contain extreme values in source attributes, considered as outliers in the data. Outliers arise due to changes in system behavior, fraudulent behavior, human error, instrument error, missing data, or simply through natural deviations in populations. Outliers in training data can easily impact model accuracy of many ML models, like linear and logistic regression. These anomalies result in ML scientists and analysts facing skewed results. Outliers can dramatically impact ML models and change the model equation completely with bad predictions or estimations.

Data scientists and analysts are looking for a way to remove outliers. Analysts come from a strong data background, and are very fluent in writing SQL queries with programming languages. The following tools are a natural choice for ML scientists to remove outliers and carry out data visualization:

  • Amazon Athena – An interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL
  • Pandas – An open-source, high-performance, easy-to-use library that provides for data structures and data analysis library like matplotlib for Python programming language
  • Amazon SageMaker – A fully managed service that provides you with the ability to build, train, and deploy ML models quickly

To illustrate how to use Athena with Pandas for anomaly detection and visualization using Amazon SageMaker, we clean a set of New York City Taxi and Limousine Commission (TLC) Trip Record Data by removing outlier records. In this dataset, outliers are when a taxi trip’s duration is for multiple days, 0 seconds, or less than 0 seconds. Then we use the Pandas matplotlib library to plot graphs to visualize trip duration values.

Solution overview

To implement this solution, you perform the following high-level steps:

  1. Create an AWS Glue Data Catalog and browse the data on the Athena console.
  2. Create an Amazon SageMaker Jupyter notebook and install PyAthena.
  3. Identify anomalies using Athena SQL-Pandas from the Jupyter notebook.
  4. Visualize data and remove outliers using Athena SQL-Pandas.

The following diagram illustrates the architecture of this solution.

Prerequisites

To follow this post, you should be familiar with the following:

  • The Amazon S3 file upload process
  • AWS Glue crawlers and the Data Catalog
  • Basic SQL queries
  • Jupyter notebooks
  • Assigning a basic AWS Identity and Access Management (IAM) policy to a role

Preparing the data

For this post, we use New York City Taxi and Limousine Commission (TLC) Trip Record Data, which is a publicly available dataset.

  1. Download the file yellow_tripdata_2019-01.csv to your local machine.
  2. Create the S3 bucket s3-yellow-cab-trip-details (your name will be different).
  3. Upload the file to your bucket using the Amazon S3 console.

Creating the Data Catalog and browsing the data

After you upload the data to Amazon S3, you create the Data Catalog in AWS Glue. This allows you to run SQL queries using Athena.

  1. On the AWS Glue console, create a new database.
  2. For Database name, enter db_yellow_cab_trip_details.

  1. Create an AWS Glue crawler to gather the metadata in the file and catalog it.

For this post, I use the database (db_yellow_cab_trip_details) to save tables with the added pre-fix as src_.

  1. Run the crawler.

The crawler can take 2–3 minutes to complete. You can check the status on Amazon CloudWatch.

The following screenshot shows the crawler details on the AWS Glue console.

When the crawler is complete, the table is available in the Data Catalog. All the metadata and column-level information is displayed with corresponding data types.

We can now check the data on the Athena console to make sure we can read the file as a table and run a SQL query.

Run your query with the following code:

SELECT * FROM db_yellow_cab_trip_details.src_yellow_cab_trip_details limit 10;

The following screenshot shows your output on the Athena console.

Creating a Jupyter notebook and installing PyAthena

You now create a new notebook instance from Amazon SageMaker and install PyAthena using Jupyter.

Amazon SageMaker has managed built-in Jupyter notebooks that allow you to write code in Python, Julia, R, or Scala to explore, analyze, and do modeling with a small set of data.

Make sure the role used for your notebook has access on Athena (use IAM policies to verify and add S3FullAccess and AmazonAthenaFullAccess).

To create your notebook, complete the following steps:

  1. On the Amazon SageMaker console, under Notebook, choose Notebook instances.
  2. Choose Create notebook instance.

  1. On the Create notebook instance page, enter a name and choose an instance type.

We recommend using an ml.m4.10xlarge instance, due to the size of the dataset. You should choose an appropriate instance depending on your data; costs vary for different instances.

Wait until the Notebook instance status shows as InService (this step can take up to 5 minutes).

  1. When the instance is ready, choose Open Jupyter.

  1. Open the conda_python3 kernel from the notebook instance.
  2. Enter the following commands to install PyAthena:
! pip install --upgrade pip
! pip install PyAthena

The following screenshot shows the output.

You can also install PyAthena when you create the notebook instance by using lifecycle configurations. See the following code:

#!/bin/bash
sudo -u ec2-user -i <<'EOF'
source /home/ec2-user/anaconda3/bin/activate python3
pip install --upgrade pip
pip install --upgrade  PyAthena
source /home/ec2-user/anaconda3/bin/deactivate
EOF

The following screenshot shows where you enter the preceding code in the Scripts section when creating a lifecycle configuration.

You can run a SQL query from the notebook to validate connectivity to Athena and pull data for visualization.

To import the libraries, enter the following code:

from pyathena import connect
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

To connect to Athena, enter the following code:

conn = connect(s3_staging_dir='s3://<your-Query-result-location>',region_name='us-east-1')

To check the sample data, enter the following query:

df_sample = pd.read_sql("SELECT * FROM db_yellow_cab_trip_details.src_yellow_cab_trip_details limit 10", conn)
df_sample

The following screenshot shows the output.

Detecting anomalies with Athena, Pandas, and Amazon SageMaker

Now that we can connect to Athena, we can run SQL queries to find the records that have unusual trip_duration values.

The following Athena query checks anomalies in the trip_duration data to find the top 50 records with the maximum duration:

df_anomaly_duration= pd.read_sql("select tpep_dropoff_datetime,tpep_pickup_datetime, 
date_diff('second', cast(tpep_pickup_datetime as timestamp), cast(tpep_dropoff_datetime as timestamp)) as duration_second, 
date_diff('minute', cast(tpep_pickup_datetime as timestamp),cast(tpep_dropoff_datetime as timestamp)) as duration_minute, 
date_diff('hour',cast(tpep_pickup_datetime as timestamp),cast(tpep_dropoff_datetime as timestamp)) as duration_hour, 
date_diff('day',cast(tpep_pickup_datetime as timestamp),cast(tpep_dropoff_datetime as timestamp)) as duration_day 
from db_yellow_cab_trip_details.src_yellow_cab_trip_details 
order by 3 desc limit 50", conn)

df_anomaly_duration

The following screenshot shows the output; there are many outliers (trips with a duration greater than 1 day).

The output shows the duration in seconds, minutes, hours, and days.

The following query checks for anomalies and shows the top 50 records with the lowest minimum duration (negative value or 0 seconds):

df_anomaly_duration= pd.read_sql("select tpep_dropoff_datetime,tpep_pickup_datetime, 
date_diff('second', cast(tpep_pickup_datetime as timestamp), cast(tpep_dropoff_datetime as timestamp)) as duration_second, 
date_diff('minute', cast(tpep_pickup_datetime as timestamp),cast(tpep_dropoff_datetime as timestamp)) as duration_minute, 
date_diff('hour',cast(tpep_pickup_datetime as timestamp),cast(tpep_dropoff_datetime as timestamp)) as duration_hour, 
date_diff('day',cast(tpep_pickup_datetime as timestamp),cast(tpep_dropoff_datetime as timestamp)) as duration_day 
from db_yellow_cab_trip_details.src_yellow_cab_trip_details 
order by 3 asc limit 50", conn)

df_anomaly_duration

The following screenshot shows the output; multiple trips have a negative value or duration of 0.

Similarly, we can use different SQL queries using to analyze the data and find other outliers. We can also clean the data by using SQL queries and, if needed, save the data in Amazon S3 with CTAS queries.

Visualizing the data and removing outliers

Pull the data using the following Athena query in a Pandas DataFrame, and use matplotlib.pyplot to create a visual graph to see the outliers:

df_full = pd.read_sql("SELECT date_diff('second', cast(tpep_pickup_datetime as timestamp), cast(tpep_dropoff_datetime as timestamp)) as duration_second 
from db_yellow_cab_trip_details.src_yellow_cab_trip_details ", conn)
plt.figure(figsize=(12,12))
plt.scatter(range(len(df_full["duration_second"])), np.sort(df_full["duration_second"]))
plt.xlabel('index')
plt.ylabel('duration_second')

plt.show()

The process of plotting the full dataset can take 7–10 minutes. To reduce time, add a limit to the number of records in the query:

SELECT date_diff('second', cast(tpep_pickup_datetime as timestamp), cast(tpep_dropoff_datetime as timestamp)) as duration_second 
from db_yellow_cab_trip_details.src_yellow_cab_trip_details limit 100000 

The following screenshot shows the output.

To ignore outliers, run the following query. You replot the graph after removing the outlier records in which the duration is equal or less than 0 seconds or longer than 1 day:

df_clean_data = pd.read_sql("SELECT date_diff('second', cast(tpep_pickup_datetime as timestamp), cast(tpep_dropoff_datetime as timestamp)) as duration_second from db_yellow_cab_trip_details.src_yellow_cab_trip_details where 
date_diff('second', cast(tpep_pickup_datetime as timestamp), cast(tpep_dropoff_datetime as timestamp))  > 0 and date_diff('day',cast(tpep_pickup_datetime as timestamp),cast(tpep_dropoff_datetime as timestamp)) < 1 ", conn)
plt.figure(figsize=(12,12))
plt.scatter(range(len(df_clean_data["duration_second"])), np.sort(df_clean_data["duration_second"]))
plt.xlabel('index')
plt.ylabel('duration_second')
plt.show()

The following screenshot shows the output.

Cleaning up

When you’re done, delete the notebook instance to avoid recurring deployment costs.

  1. On the Amazon SageMaker notebook, choose your notebook instance.
  2. Choose Stop.

  1. When the status shows as Stopped, choose Delete.

Conclusion

This post walked you through finding and removing outliers from your dataset and data visualization. We used an Amazon SageMaker notebook to run analytical queries using Athena SQL, and used Athena to read the dataset, which is saved in Amazon S3 with the metadata catalog in AWS Glue. We used queries in Athena to find anomalies in the data and ignore these outliers. We also used notebook instances to visualize graphs using Pandas’ matplotlib.pyplot library.

You can try this solution for your use-cases to remove outliers using Athena SQL and SageMaker notebook. If you have comments or feedback, please leave them below.


About the Authors

Rahul Sonawane is a Senior Consultant, Big Data at the Shared Delivery Teams at Amazon Web Services.

 

 

 

 

Behram Irani is a Senior Solutions Architect, Data & Analytics at Amazon Web Services.

Read More