Write ETL Script

4.2 Write ETL Script

In the Python (Spark) script:

  • Read data from the sales_raw table
  • Rename columns if needed (e.g., order_idid)
  • Cast data types appropriately (e.g., date, float)
  • Write output to s3://sales-data-bucket-2025/processed/ in Parquet or CSV format
import sys
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, trim, regexp_replace, round
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

input_dyf = glueContext.create_dynamic_frame.from_catalog(
    database="sales_analysis_db",
    table_name="sales_dataset_raw"  
)

df = input_dyf.toDF()

for col_name in df.columns:
    if df.schema[col_name].dataType.simpleString() == 'string':
        df = df.withColumn(
            col_name,
            trim(
                regexp_replace(  
                    regexp_replace(col(col_name), '^"|"$', ''),
                    '\s+', ' '
                )
            )
        )

if 'price' in df.columns and 'quantity' in df.columns:
    df = df.withColumn("revenue", round(col("price") * col("quantity"), 2))

output_dyf = DynamicFrame.fromDF(df, glueContext, "output_dyf")

output_path = "s3://sales-data-bucket-2025/processed/"

glueContext.write_dynamic_frame.from_options(
    frame=output_dyf,
    connection_type="s3",
    connection_options={
        "path": output_path,
        "partitionKeys": []
    },
    format="csv"
)

glueContext.write_dynamic_frame.from_options(
    frame=output_dyf,
    connection_type="s3",
    connection_options={"path": "s3://sales-data-bucket-2025/processed/"},
    format="csv"
)

job.commit()

Prefer Parquet for better performance and cost when querying with Athena.