Wednesday, May 29, 2024
HomeBig DataUse Amazon Athena with Spark SQL to your open-source transactional desk codecs

Use Amazon Athena with Spark SQL to your open-source transactional desk codecs


AWS-powered knowledge lakes, supported by the unrivaled availability of Amazon Easy Storage Service (Amazon S3), can deal with the dimensions, agility, and suppleness required to mix completely different knowledge and analytics approaches. As knowledge lakes have grown in measurement and matured in utilization, a big quantity of effort will be spent conserving the info per enterprise occasions. To make sure recordsdata are up to date in a transactionally constant method, a rising variety of clients are utilizing open-source transactional desk codecs similar to Apache Iceberg, Apache Hudi, and Linux Basis Delta Lake that aid you retailer knowledge with excessive compression charges, natively interface together with your purposes and frameworks, and simplify incremental knowledge processing in knowledge lakes constructed on Amazon S3. These codecs allow ACID (atomicity, consistency, isolation, sturdiness) transactions, upserts, and deletes, and superior options similar to time journey and snapshots that had been beforehand solely out there in knowledge warehouses. Every storage format implements this performance in barely other ways; for a comparability, consult with Selecting an open desk format to your transactional knowledge lake on AWS.

In 2023, AWS introduced basic availability for Apache Iceberg, Apache Hudi, and Linux Basis Delta Lake in Amazon Athena for Apache Spark, which removes the necessity to set up a separate connector or related dependencies and handle variations, and simplifies the configuration steps required to make use of these frameworks.

On this publish, we present you the right way to use Spark SQL in Amazon Athena notebooks and work with Iceberg, Hudi, and Delta Lake desk codecs. We show widespread operations similar to creating databases and tables, inserting knowledge into the tables, querying knowledge, and taking a look at snapshots of the tables in Amazon S3 utilizing Spark SQL in Athena.

Stipulations

Full the next stipulations:

Obtain and import instance notebooks from Amazon S3

To observe alongside, obtain the notebooks mentioned on this publish from the next places:

After you obtain the notebooks, import them into your Athena Spark surroundings by following the To import a pocket book part in Managing pocket book recordsdata.

Navigate to particular Open Desk Format part

If you’re all for Iceberg desk format, navigate to Working with Apache Iceberg tables part.

If you’re all for Hudi desk format, navigate to Working with Apache Hudi tables part.

If you’re all for Delta Lake desk format, navigate to Working with Linux basis Delta Lake tables part.

Working with Apache Iceberg tables

When utilizing Spark notebooks in Athena, you possibly can run SQL queries immediately with out having to make use of PySpark. We do that through the use of cell magics, that are particular headers in a pocket book cell that change the cell’s habits. For SQL, we will add the %%sql magic, which can interpret all the cell contents as a SQL assertion to be run on Athena.

On this part, we present how you should use SQL on Apache Spark for Athena to create, analyze, and handle Apache Iceberg tables.

Arrange a pocket book session

So as to use Apache Iceberg in Athena, whereas creating or enhancing a session, choose the Apache Iceberg possibility by increasing the Apache Spark properties part. It would pre-populate the properties as proven within the following screenshot.

This image shows the Apache Iceberg properties set while creating Spak session in Athena.

For steps, see Modifying session particulars or Creating your personal pocket book.

The code used on this part is accessible within the SparkSQL_iceberg.ipynb file to observe alongside.

Create a database and Iceberg desk

First, we create a database within the AWS Glue Knowledge Catalog. With the next SQL, we will create a database known as icebergdb:

%%sql
CREATE DATABASE icebergdb

Subsequent, within the database icebergdb, we create an Iceberg desk known as noaa_iceberg pointing to a location in Amazon S3 the place we’ll load the info. Run the next assertion and substitute the situation s3://<your-S3-bucket>/<prefix>/ together with your S3 bucket and prefix:

%%sql
CREATE TABLE icebergdb.noaa_iceberg(
station string,
date string,
latitude string,
longitude string,
elevation string,
identify string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string)
USING iceberg
PARTITIONED BY (yr string)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaaiceberg/'

Insert knowledge into the desk

To populate the noaa_iceberg Iceberg desk, we insert knowledge from the Parquet desk sparkblogdb.noaa_pq that was created as a part of the stipulations. You are able to do this utilizing an INSERT INTO assertion in Spark:

%%sql
INSERT INTO icebergdb.noaa_iceberg choose * from sparkblogdb.noaa_pq

Alternatively, you should use CREATE TABLE AS SELECT with the USING iceberg clause to create an Iceberg desk and insert knowledge from a supply desk in a single step:

%%sql
CREATE TABLE icebergdb.noaa_iceberg
USING iceberg
PARTITIONED BY (yr)
AS SELECT * FROM sparkblogdb.noaa_pq

Question the Iceberg desk

Now that the info is inserted within the Iceberg desk, we will begin analyzing it. Let’s run a Spark SQL to search out the minimal recorded temperature by yr for the 'SEATTLE TACOMA AIRPORT, WA US' location:

%%sql
choose identify, yr, min(MIN) as minimum_temperature
from icebergdb.noaa_iceberg
the place identify="SEATTLE TACOMA AIRPORT, WA US"
group by 1,2

We get following output.

Image shows output of first select query

Replace knowledge within the Iceberg desk

Let’s have a look at the right way to replace knowledge in our desk. We need to replace the station identify 'SEATTLE TACOMA AIRPORT, WA US' to 'Sea-Tac'. Utilizing Spark SQL, we will run an UPDATE assertion towards the Iceberg desk:

%%sql
UPDATE icebergdb.noaa_iceberg
SET identify="Sea-Tac"
WHERE identify="SEATTLE TACOMA AIRPORT, WA US"

We are able to then run the earlier SELECT question to search out the minimal recorded temperature for the 'Sea-Tac' location:

%%sql
choose identify, yr, min(MIN) as minimum_temperature
from icebergdb.noaa_iceberg
the place identify="Sea-Tac"
group by 1,2

We get the next output.

Image shows output of second select query

Compact knowledge recordsdata

Open desk codecs like Iceberg work by creating delta adjustments in file storage, and monitoring the variations of rows by way of manifest recordsdata. Extra knowledge recordsdata results in extra metadata saved in manifest recordsdata, and small knowledge recordsdata usually trigger an pointless quantity of metadata, leading to much less environment friendly queries and better Amazon S3 entry prices. Working Iceberg’s rewrite_data_files process in Spark for Athena will compact knowledge recordsdata, combining many small delta change recordsdata right into a smaller set of read-optimized Parquet recordsdata. Compacting recordsdata quickens the learn operation when queried. To run compaction on our desk, run the next Spark SQL:

%%sql
CALL spark_catalog.system.rewrite_data_files
(desk => 'icebergdb.noaa_iceberg', technique=>'kind', sort_order => 'zorder(identify)')

rewrite_data_files gives choices to specify your kind technique, which can assist reorganize and compact knowledge.

Record desk snapshots

Every write, replace, delete, upsert, and compaction operation on an Iceberg desk creates a brand new snapshot of a desk whereas conserving the outdated knowledge and metadata round for snapshot isolation and time journey. To record the snapshots of an Iceberg desk, run the next Spark SQL assertion:

%%sql
SELECT *
FROM spark_catalog.icebergdb.noaa_iceberg.snapshots

Expire outdated snapshots

Commonly expiring snapshots is advisable to delete knowledge recordsdata which are now not wanted, and to maintain the scale of desk metadata small. It would by no means take away recordsdata which are nonetheless required by a non-expired snapshot. In Spark for Athena, run the next SQL to run out snapshots for the desk icebergdb.noaa_iceberg which are older than a selected timestamp:

%%sql
CALL spark_catalog.system.expire_snapshots
('icebergdb.noaa_iceberg', TIMESTAMP '2023-11-30 00:00:00.000')

Be aware that the timestamp worth is specified as a string in format yyyy-MM-dd HH:mm:ss.fff. The output will give a rely of the variety of knowledge and metadata recordsdata deleted.

Drop the desk and database

You possibly can run the next Spark SQL to wash up the Iceberg tables and related knowledge in Amazon S3 from this train:

%%sql
DROP TABLE icebergdb.noaa_iceberg PURGE

Run the next Spark SQL to take away the database icebergdb:

%%sql
DROP DATABASE icebergdb

To study extra about all of the operations you possibly can carry out on Iceberg tables utilizing Spark for Athena, consult with Spark Queries and Spark Procedures within the Iceberg documentation.

Working with Apache Hudi tables

Subsequent, we present how you should use SQL on Spark for Athena to create, analyze, and handle Apache Hudi tables.

Arrange a pocket book session

So as to use Apache Hudi in Athena, whereas creating or enhancing a session, choose the Apache Hudi possibility by increasing the Apache Spark properties part.

This image shows the Apache Hudi properties set while creating Spak session in Athena.

For steps, see Modifying session particulars or Creating your personal pocket book.

The code used on this part must be out there within the SparkSQL_hudi.ipynb file to observe alongside.

Create a database and Hudi desk

First, we create a database known as hudidb that will probably be saved within the AWS Glue Knowledge Catalog adopted by Hudi desk creation:

%%sql
CREATE DATABASE hudidb

We create a Hudi desk pointing to a location in Amazon S3 the place we’ll load the info. Be aware that the desk is of copy-on-write kind. It’s outlined by kind="cow" within the desk DDL. We have now outlined station and date because the a number of main keys and preCombinedField as yr. Additionally, the desk is partitioned on yr. Run the next assertion and substitute the situation s3://<your-S3-bucket>/<prefix>/ together with your S3 bucket and prefix:

%%sql
CREATE TABLE hudidb.noaa_hudi(
station string,
date string,
latitude string,
longitude string,
elevation string,
identify string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string,
yr string)
USING HUDI
PARTITIONED BY (yr)
TBLPROPERTIES(
primaryKey = 'station, date',
preCombineField = 'yr',
kind="cow"
)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaahudi/'

Insert knowledge into the desk

Like with Iceberg, we use the INSERT INTO assertion to populate the desk by studying knowledge from the sparkblogdb.noaa_pq desk created within the earlier publish:

%%sql
INSERT INTO hudidb.noaa_hudi choose * from sparkblogdb.noaa_pq

Question the Hudi desk

Now that the desk is created, let’s run a question to search out the utmost recorded temperature for the 'SEATTLE TACOMA AIRPORT, WA US' location:

%%sql
choose identify, yr, max(MAX) as maximum_temperature
from hudidb.noaa_hudi
the place identify="SEATTLE TACOMA AIRPORT, WA US"
group by 1,2

Replace knowledge within the Hudi desk

Let’s change the station identify 'SEATTLE TACOMA AIRPORT, WA US' to 'Sea–Tac'. We are able to run an UPDATE assertion on Spark for Athena to replace the data of the noaa_hudi desk:

%%sql
UPDATE hudidb.noaa_hudi
SET identify="Sea-Tac"
WHERE identify="SEATTLE TACOMA AIRPORT, WA US"

We run the earlier SELECT question to search out the utmost recorded temperature for the 'Sea-Tac' location:

%%sql
choose identify, yr, max(MAX) as maximum_temperature
from hudidb.noaa_hudi
the place identify="Sea-Tac"
group by 1,2

Run time journey queries

We are able to use time journey queries in SQL on Athena to investigate previous knowledge snapshots. For instance:

%%sql
choose identify, yr, max(MAX) as maximum_temperature
from hudidb.noaa_hudi timestamp as of '2023-12-01 23:53:43.100'
the place identify="SEATTLE TACOMA AIRPORT, WA US"
group by 1,2

This question checks the Seattle Airport temperature knowledge as of a selected time prior to now. The timestamp clause lets us journey again with out altering present knowledge. Be aware that the timestamp worth is specified as a string in format yyyy-MM-dd HH:mm:ss.fff.

Optimize question pace with clustering

To enhance question efficiency, you possibly can carry out clustering on Hudi tables utilizing SQL in Spark for Athena:

%%sql
CALL run_clustering(desk => 'hudidb.noaa_hudi', order => 'identify')

Compact tables

Compaction is a desk service employed by Hudi particularly in Merge On Learn (MOR) tables to merge updates from row-based log recordsdata to the corresponding columnar-based base file periodically to supply a brand new model of the bottom file. Compaction will not be relevant to Copy On Write (COW) tables and solely applies to MOR tables. You possibly can run the next question in Spark for Athena to carry out compaction on MOR tables:

%%sql
CALL run_compaction(op => 'run', desk => 'hudi_table_mor');

Drop the desk and database

Run the next Spark SQL to take away the Hudi desk you created and related knowledge from the Amazon S3 location:

%%sql
DROP TABLE hudidb.noaa_hudi PURGE

Run the next Spark SQL to take away the database hudidb:

%%sql
DROP DATABASE hudidb

To study all of the operations you possibly can carry out on Hudi tables utilizing Spark for Athena, consult with SQL DDL and Procedures within the Hudi documentation.

Working with Linux basis Delta Lake tables

Subsequent, we present how you should use SQL on Spark for Athena to create, analyze, and handle Delta Lake tables.

Arrange a pocket book session

So as to use Delta Lake in Spark for Athena, whereas creating or enhancing a session, choose Linux Basis Delta Lake by increasing the Apache Spark properties part.

This image shows the Delta Lake properties set while creating Spak session in Athena.

For steps, see Modifying session particulars or Creating your personal pocket book.

The code used on this part must be out there within the SparkSQL_delta.ipynb file to observe alongside.

Create a database and Delta Lake desk

On this part, we create a database within the AWS Glue Knowledge Catalog. Utilizing following SQL, we will create a database known as deltalakedb:

%%sql
CREATE DATABASE deltalakedb

Subsequent, within the database deltalakedb, we create a Delta Lake desk known as noaa_delta pointing to a location in Amazon S3 the place we’ll load the info. Run the next assertion and substitute the situation s3://<your-S3-bucket>/<prefix>/ together with your S3 bucket and prefix:

%%sql
CREATE TABLE deltalakedb.noaa_delta(
station string,
date string,
latitude string,
longitude string,
elevation string,
identify string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string)
USING delta
PARTITIONED BY (yr string)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaadelta/'

Insert knowledge into the desk

We use an INSERT INTO assertion to populate the desk by studying knowledge from the sparkblogdb.noaa_pq desk created within the earlier publish:

%%sql
INSERT INTO deltalakedb.noaa_delta choose * from sparkblogdb.noaa_pq

You can even use CREATE TABLE AS SELECT to create a Delta Lake desk and insert knowledge from a supply desk in a single question.

Question the Delta Lake desk

Now that the info is inserted within the Delta Lake desk, we will begin analyzing it. Let’s run a Spark SQL to search out the minimal recorded temperature for the 'SEATTLE TACOMA AIRPORT, WA US' location:

%%sql
choose identify, yr, max(MAX) as minimum_temperature
from deltalakedb.noaa_delta
the place identify="SEATTLE TACOMA AIRPORT, WA US"
group by 1,2

Replace knowledge within the Delta lake desk

Let’s change the station identify 'SEATTLE TACOMA AIRPORT, WA US' to 'Sea–Tac'. We are able to run an UPDATE assertion on Spark for Athena to replace the data of the noaa_delta desk:

%%sql
UPDATE deltalakedb.noaa_delta
SET identify="Sea-Tac"
WHERE identify="SEATTLE TACOMA AIRPORT, WA US"

We are able to run the earlier SELECT question to search out the minimal recorded temperature for the 'Sea-Tac' location, and the outcome must be the identical as earlier:

%%sql
choose identify, yr, max(MAX) as minimum_temperature
from deltalakedb.noaa_delta
the place identify="Sea-Tac"
group by 1,2

Compact knowledge recordsdata

In Spark for Athena, you possibly can run OPTIMIZE on the Delta Lake desk, which can compact the small recordsdata into bigger recordsdata, so the queries usually are not burdened by the small file overhead. To carry out the compaction operation, run the next question:

%%sql
OPTIMIZE deltalakedb.noaa_delta

Seek advice from Optimizations within the Delta Lake documentation for various choices out there whereas operating OPTIMIZE.

Take away recordsdata now not referenced by a Delta Lake desk

You possibly can take away recordsdata saved in Amazon S3 which are now not referenced by a Delta Lake desk and are older than the retention threshold by operating the VACCUM command on the desk utilizing Spark for Athena:

%%sql
VACUUM deltalakedb.noaa_delta

Seek advice from Take away recordsdata now not referenced by a Delta desk within the Delta Lake documentation for choices out there with VACUUM.

Drop the desk and database

Run the next Spark SQL to take away the Delta Lake desk you created:

%%sql
DROP TABLE deltalakedb.noaa_delta

Run the next Spark SQL to take away the database deltalakedb:

%%sql
DROP DATABASE deltalakedb

Working DROP TABLE DDL on the Delta Lake desk and database deletes the metadata for these objects, however doesn’t robotically delete the info recordsdata in Amazon S3. You possibly can run the next Python code within the pocket book’s cell to delete the info from the S3 location:

import boto3

s3 = boto3.useful resource('s3')
bucket = s3.Bucket('<your-S3-bucket>')
bucket.objects.filter(Prefix="<prefix>/noaadelta/").delete()

To study extra concerning the SQL statements that you would be able to run on a Delta Lake desk utilizing Spark for Athena, consult with the quickstart within the Delta Lake documentation.

Conclusion

This publish demonstrated the right way to use Spark SQL in Athena notebooks to create databases and tables, insert and question knowledge, and carry out widespread operations like updates, compactions, and time journey on Hudi, Delta Lake, and Iceberg tables. Open desk codecs add ACID transactions, upserts, and deletes to knowledge lakes, overcoming limitations of uncooked object storage. By eradicating the necessity to set up separate connectors, Spark on Athena’s built-in integration reduces configuration steps and administration overhead when utilizing these common frameworks for constructing dependable knowledge lakes on Amazon S3. To study extra about deciding on an open desk format to your knowledge lake workloads, consult with Selecting an open desk format to your transactional knowledge lake on AWS.


Concerning the Authors

Pathik Shah is a Sr. Analytics Architect on Amazon Athena. He joined AWS in 2015 and has been focusing within the huge knowledge analytics house since then, serving to clients construct scalable and sturdy options utilizing AWS analytics companies.

Raj Devnath is a Product Supervisor at AWS on Amazon Athena. He’s keen about constructing merchandise clients love and serving to clients extract worth from their knowledge. His background is in delivering options for a number of finish markets, similar to finance, retail, good buildings, residence automation, and knowledge communication programs.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments