It's possible to load data directly from s3 using Glue:

sourceDyf = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": ["s3://bucket/folder"]
    },
    format_options={
        "withHeader": True,
        "separator": ","
    })

You can also do that just with spark (as you already tried):

sourceDf = spark.read
    .option("header","true")
    .option("delimiter", ",")
    .csv("C:\inputs\TEST.txt") 

However, in this case Glue doesn't guarantee that they provide appropriate Spark readers. So if your error is related to missing data source for CSV then you should add spark-csv lib to the Glue job by providing s3 path to its locations via --extra-jars parameter.

Answer from Yuriy Bondaruk on Stack Overflow
🌐
AWS
docs.aws.amazon.com › aws glue › user guide › aws glue programming guide › programming spark scripts › program aws glue etl scripts in pyspark › aws glue pyspark extensions reference › dynamicframe class
DynamicFrame class - AWS Glue
A DynamicRecord represents a logical record in a DynamicFrame. It is similar to a row in a Spark DataFrame, except that it is self-describing and can be used for data that does not conform to a fixed schema. options – A list of ResolveOption objects that specify how to resolve choice types during the conversion. This parameter is used to handle schema inconsistencies, not for format options like CSV parsing.
🌐
AWS
docs.aws.amazon.com › aws glue › user guide › aws glue programming guide › programming spark scripts › features and optimizations for programming aws glue for spark etl scripts › data format options for inputs and outputs in aws glue for spark › using the csv format in aws glue
Using the CSV format in AWS Glue - AWS Glue
For this example, use the create_dynamic_frame.from_options method. # Example: Read CSV from S3 # For show, we handle a CSV with a header row. Set the withHeader option. # Consider whether optimizePerformance is right for your workflow. from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": ["s3://s3path"]}, format="csv", format_options={ "withHeader": True, # "optimizePerformance": True, }, ) You can also use DataFrames in a script (pyspark.sql.DataFrame).
Discussions

amazon web services - How can we apply encoding in dynamic frame from option while reading csv file from s3 location? - Stack Overflow
2 What is the best way to read a csv and text file from S3 on AWS glue without having to read it as a Dynamic daataframe? 0 Glue dynamic frame is not populating from s3 bucket More on stackoverflow.com
🌐 stackoverflow.com
pyspark - How to load a csv/txt file into AWS Glue job - Stack Overflow
I have below 2 clarifications on AWS Glue, could you please clarify. Because I need to use glue as part of my project. I would like to load a csv/txt file into a Glue job to process it. (Like we do... More on stackoverflow.com
🌐 stackoverflow.com
amazon web services - How to use AWS Glue / Spark to convert CSVs partitioned and split in S3 to partitioned and split Parquet - Stack Overflow
In AWS Glue's catalog, I have an external table defined with partitions that looks roughly like this in S3 and partitions for new dates are added daily: s3://my-data-lake/test-table/ 2017/01/0... More on stackoverflow.com
🌐 stackoverflow.com
amazon web services - AWS Glue export to parquet issue using glueContext.write_dynamic_frame.from_options - Stack Overflow
I have the following problem. The code below is auto-generated by AWS Glue. It's mission is to data from Athena (backed up by .csv @ S3) and transform data into Parquet. The code is working for... More on stackoverflow.com
🌐 stackoverflow.com
April 18, 2018
🌐
Sqlandhadoop
sqlandhadoop.com › aws-glue-create-dynamic-frame
AWS Glue create dynamic frame – SQL & Hadoop
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate()) # creating dynamic frame from S3 data dyn_frame_s3 = glueContext.create_dynamic_frame_from_options( connection_type="s3", connection_options = { "paths": ["s3://<bucket name>/data/sales/"], "inferSchema": "true" }, format = "csv", format_options={ "separator": "\t" }, transformation_ctx="") print (dyn_frame_s3.count()) # creating dynamic frame from Glue catalog table dyn_frame_catalog = glueContext.create_dynamic_frame_from_catalog( database = "db_readfile", table_name = "sales", transformation_ctx = "") print (dyn_frame_catalog.count())
🌐
Stack Overflow
stackoverflow.com › questions › 76340963 › how-can-we-apply-encoding-in-dynamic-frame-from-option-while-reading-csv-file-fr
amazon web services - How can we apply encoding in dynamic frame from option while reading csv file from s3 location? - Stack Overflow
csv_dynamicframe = glueContext.create_dynamic_frame.from_options( "s3", connection_options = { "paths": [root_path] }, format = "csv", format_options = { 'withHeader': True, # "quoteChar": -1, "separator": ",", 'encoding': 'utf-8' }, transformation_ctx = "csv_dynamicframe", schema=dynamic_frame_catalog.schema() )
🌐
Medium
swapnil-bhoite.medium.com › aws-glue-dynamicframe-transformations-with-example-code-and-output-26e14d13145f
AWS Glue DynamicFrame transformations with example code and output | by Swapnil Bhoite | Medium
April 28, 2022 - dyF = glueContext.create_dynamic_frame.from_options( 's3', {'paths': ['s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv']}, 'csv', {'withHeader': True})dyF.printSchema()root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string
Top answer
1 of 2
13

Since the source CSV files are not necessarily in the right date, you could add to them additional information regarding collect date time (or use any date if already available):

{"collectDateTime": {
    "timestamp": 1518091828,
    "timestampMs": 1518091828116,
    "day": 8,
    "month": 2,
    "year": 2018
}}

Then your job could use this information in the output DynamicFrame and ultimately use them as partitions. Some sample code of how to achieve this:

from awsglue.transforms import *
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions

import sys
import datetime

###
# CREATE THE NEW SIMPLIFIED LINE
##
def create_simplified_line(event_dict):

    # collect date time
    collect_date_time_dict = event_dict["collectDateTime"]

    new_line = {
        # TODO: COPY YOUR DATA HERE
        "myData": event_dict["myData"],
        "someOtherData": event_dict["someOtherData"],
        "timestamp": collect_date_time_dict["timestamp"],
        "timestampmilliseconds": long(collect_date_time_dict["timestamp"]) * 1000,
        "year": collect_date_time_dict["year"],
        "month": collect_date_time_dict["month"],
        "day": collect_date_time_dict["day"]
    }

    return new_line


###
# MAIN FUNCTION
##

# context
glueContext = GlueContext(SparkContext.getOrCreate())

# fetch from previous day source bucket
previous_date = datetime.datetime.utcnow() - datetime.timedelta(days=1)

# build s3 paths
s3_path = "s3://source-bucket/path/year={}/month={}/day={}/".format(previous_date.year, previous_date.month, previous_date.day)

# create dynamic_frame
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [s3_path]}, format="json", format_options={}, transformation_ctx="dynamic_frame")

# resolve choices (optional)
dynamic_frame_resolved = ResolveChoice.apply(frame=dynamic_frame,choice="project:double",transformation_ctx="dynamic_frame_resolved")

# transform the source dynamic frame into a simplified version
result_frame = Map.apply(frame=dynamic_frame_resolved, f=create_simplified_line)

# write to simple storage service in parquet format
glueContext.write_dynamic_frame.from_options(frame=result_frame, connection_type="s3", connection_options={"path":"s3://target-bucket/path/","partitionKeys":["year", "month", "day"]}, format="parquet")

Did not test it, but the script is just a sample of how to achieve this and is fairly straightforward.

UPDATE

1) As for having specific file sizes/numbers in output partitions,

Spark's coalesce and repartition features are not yet implemented in Glue's Python API (only in Scala).

You can convert your dynamic frame into a data frame and leverage Spark's partition capabilities.

Convert to a dataframe and partition based on "partition_col"

partitioned_dataframe = datasource0.toDF().repartition(1)

Convert back to a DynamicFrame for further processing.

partitioned_dynamicframe = DynamicFrame.fromDF(partitioned_dataframe, glueContext, "partitioned_df")

The good news is that Glue has an interesting feature that if you have more than 50,000 input files per partition it'll automatically group them to you.

In case you want to specifically set this behavior regardless of input files number (your case), you may set the following connection_options while "creating a dynamic frame from options":

dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [s3_path], 'groupFiles': 'inPartition', 'groupSize': 1024 * 1024}, format="json", format_options={}, transformation_ctx="dynamic_frame")

In the previous example, it would attempt to group files into 1MB groups.

It is worth mentioning that this is not the same as coalesce, but it may help if your goal is to reduce the number of files per partition.

2) If files already exist in the destination, will it just safely add it (not overwrite or delete)

Glue's default SaveMode for write_dynamic_frame.from_options is to append.

When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.

3) Given each source partition may be 30-100GB, what's a guideline for # of DPUs

I'm afraid I won't be able to answer that. It depends on how fast it'll load your input files (size/number), your script's transformations, etc.

2 of 2
-1

Import the datetime library

import datetime

Split the timestamp based on partition conditions

now=datetime.datetime.now()
year= str(now.year)

month= str(now.month) day= str(now.day)

currdate= "s3:/Destination/"+year+"/"+month+"/"+day 

Add the variable currdate in the path address in the writer class. The results will be patitioned parquet files.

🌐
AWS
docs.aws.amazon.com › aws glue › user guide › aws glue programming guide › programming spark scripts › features and optimizations for programming aws glue for spark etl scripts › data format options for inputs and outputs in aws glue for spark
Data format options for inputs and outputs in AWS Glue for Spark - AWS Glue
Certain AWS Glue connection types support multiple format types, requiring you to specify information about your data format with a format_options object when using methods like GlueContext.write_dynamic_frame.from_options.
Find elsewhere
Top answer
1 of 2
6

Part 1: identifying the problem

The solution how to find what is causing the problem was to switch output from .parquet to .csv and drop ResolveChoice or DropNullFields (as it is automatically suggested by Glue for .parquet):

datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://xxxx"}, format = "csv", transformation_ctx = "datasink2")
job.commit()

It has produced the more detailed error message:

An error occurred while calling o120.pyWriteDynamicFrame. Job aborted due to stage failure: Task 5 in stage 0.0 failed 4 times, most recent failure: Lost task 5.3 in stage 0.0 (TID 182, ip-172-31-78-99.ec2.internal, executor 15): com.amazonaws.services.glue.util.FatalException: Unable to parse file: xxxx1.csv.gz

The file xxxx1.csv.gz mentioned in the error message appears to be too big for Glue (~100Mb .gzip and ~350Mb as uncompressed .csv).

Part 2: true source of the problem and fix

As mentioned in the 1st part thanks to export to .csv it was possible to identify the wrong file.

Further investigation by loading .csv into R has revealed that one of the columns contains a single string record, while all other values of this column were long or NULL.

After dropping this value in R and re-uploading data to S3 the problem vanished.

Note #1: the column was declared string in Athena so I consider this behaviour as bug

Note #2: the nature of the problem was not the size of the data. I have successfuly processed files up to 200Mb .csv.gz which correspond to roughtly 600 Mb .csv.

2 of 2
1

Please use the updated table schema from the data catalog.

I have gone through this same error. In my case, the crawler had created another table of the same file in the database. I was referencing the old one. This can happen if crawler was crawling again and again the same path and made different schema table in data catalog. So glue job wasn't finding the table name and schema. Thereby giving this error.

Moreover, you can change DeleteBehavior: "LOG" to DeleteBehavior: "DELETE_IN_DATABASE"

🌐
AWS
docs.aws.amazon.com › aws glue › user guide › aws glue programming guide › programming spark scripts › program aws glue etl scripts in pyspark › aws glue pyspark extensions reference › dynamicframewriter class
DynamicFrameWriter class - AWS Glue
glueContext.write_dynamic_frame.from_options(\ frame = dyf_splitFields,\ connection_options = {'path': '/home/glue/GlueLocalOutput/'},\ connection_type = 's3',\ format = 'json')
🌐
Medium
medium.com › @kundansingh0619 › aws-glue-3-aae089693d5a
AWS_Glue_3: Glue(DynamicFrame). GlueContext is the entry point for… | by Kundan Singh | Medium
February 12, 2025 - # Import required libraries from awsglue.context import GlueContext from pyspark.context import SparkContext # Create a GlueContext sc = SparkContext() glueContext = GlueContext(sc) # Read data from the data source dynamic_frame= glueContext.create_dynamic_frame.from_catalog( database="my_database", table_name="my_table" ) # Apply data transformations using PySpark transformed_data = dynamic_frame.apply_mapping([ ("column_name", "string", "new_column_name", "string"), # Add more transformations as needed ]) df = dynamic_frame.toDF() df.show() print("Dataframe converted") # convert column names
🌐
GitHub
github.com › aws-samples › aws-glue-samples › blob › master › examples › data_cleaning_and_lambda.md
aws-glue-samples/examples/data_cleaning_and_lambda.md at master · aws-samples/aws-glue-samples
import sys from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate()) First, let's see what the schema looks like using Spark DataFrames: medicare = spark.read.format( "com.databricks.spark.csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()
Author   aws-samples
🌐
AWS
docs.aws.amazon.com › aws glue › user guide › aws glue programming guide › programming spark scripts › program aws glue etl scripts in pyspark › aws glue pyspark extensions reference › gluecontext class
GlueContext class - AWS Glue
__init__ — creating —getSourcecreate_dynamic_frame_from_rddcreate_dynamic_frame_from_catalogcreate_dynamic_frame_from_optionscreate_sample_dynamic_frame_from_catalogcreate_sample_dynamic_frame_from_optionsadd_ingestion_time_columnscreate_data_frame_from_catalogcreate_data_frame_from_optionsforEachBatch — Amazon S3 datasets —purge_tablepurge_s3_pathtransition_tabletransition_s3_path — extracting —extract_jdbc_conf— transactions —start_transactioncommit_transactioncancel_transaction — writing —getSinkwrite_dynamic_frame_from_optionswrite_from_optionswrite_dynamic_frame_from_catalogwrite_data_frame_from_catalogwrite_dynamic_frame_from_jdbc_confwrite_from_jdbc_conf
🌐
AWS
docs.aws.amazon.com › amazon athena › user guide › use athena sql › connect to data sources › use aws glue data catalog to connect to your data › work with csv data in aws glue
Work with CSV data in AWS Glue - Amazon Athena
glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://amzn-s3-demo-bucket/MYTABLEDATA/"}, format = "csv", format_options = {"writeHeader": False}, transformation_ctx = "datasink2")
🌐
Spark By {Examples}
sparkbyexamples.com › home › amazon aws › aws glue pyspark extensions reference
AWS Glue PySpark Extensions Reference - Spark By {Examples}
March 27, 2024 - # Writing a DynamicFrame to an S3 bucket in CSV format glueContext.write_dynamic_frame.from_options(frame = dynamic_frame, connection_type = "s3", connection_options = {"path": "s3://mybucket/output"}, format = "csv")
🌐
AWS re:Post
repost.aws › questions › QU3rukJUaHRpiMNjydfqLZgw › aws-glue-create-dynamic-frame-from-data-in-postgresql-with-custom-bookmark-key
aws glue create_dynamic_frame from data in PostgreSQL with custom bookmark key | AWS re:Post
March 18, 2023 - If you look into the documentation ...rame.from_catalog — Use additional_options. create_dynamic_frame.from_options — Use connection_options. you can try giving the same in "connection_options" as mentioned and check if it works ...
🌐
GitHub
github.com › awslabs › aws-glue-libs › issues › 108
GlueContext.write_dynamic_frame.from_options · Issue #108 · awslabs/aws-glue-libs
December 9, 2021 - glueContext.write_dynamic_frame.from_options(frame=dynamicFrame, connection_type="s3", connection_options={ "path": s3PathLatest, "StorageClass": "STANDARD_IA"}, format="csv", format_options={"separator": ",", "writeHeader": True, "optimizePerformance": True}, transformation_ctx=f"{table['Name']}_dataSink") There doesn't seem to be an actual documentation on what the connection_options dict actually supports and looking over the code library it doesn't actually really care what you throw in there.
Author   theonlyway
🌐
Stack Overflow
stackoverflow.com › questions › 63929892 › aws-glue-write-dynamic-frame-from-options-encounters-schema-exception
csv - AWS Glue write_dynamic_frame_from_options encounters schema exception - Stack Overflow
September 17, 2020 - It looks like your header might have additional comma, or a column.Can you post the header and record in your question and also while reading try disabling header dyF = glueContext.create_dynamic_frame.from_options('s3',{'paths': ['s3://path']},'csv',{'withHeader': False})
🌐
Hsilgroup
hsilgroup.com › css › 9xcil3v › page.php
gluecontext write_dynamic_frame from_options csv
Job on the data sources table ( gluecontext write_dynamic_frame from_options csv ) create_dynamic_frame_from_catalog, create_dynamic_frame_from_options関数で作成したDynamicFrameをApache Spark DataFrameやPandas DataFrameに変換する方法。 DynamicFrame lt.