Currently AWS Glue doesn't support 'overwrite' mode but they are working on this feature.
As a workaround you can convert DynamicFrame object to spark's DataFrame and write it using spark instead of Glue:
table.toDF()
.write
.mode("overwrite")
.format("parquet")
.partitionBy("var_1", "var_2")
.save(output_dir)
Answer from Yuriy Bondaruk on Stack OverflowCurrently AWS Glue doesn't support 'overwrite' mode but they are working on this feature.
As a workaround you can convert DynamicFrame object to spark's DataFrame and write it using spark instead of Glue:
table.toDF()
.write
.mode("overwrite")
.format("parquet")
.partitionBy("var_1", "var_2")
.save(output_dir)
As mentioned earlier, AWS Glue doesn't support mode="overwrite" mode. But converting Glue Dynamic Frame back to PySpark data frame can cause lot of issues with big data.
You just need to add single command i.e. purge_s3_path() before writing dynamic_dataFrame to S3.
glueContext.purge_s3_path(s3_path, {"retentionPeriod": 0})
glueContext.write_dynamic_frame.from_options(frame=table,
connection_type="s3",
connection_options={"path": s3_path,
"partitionKeys": ["var1","var2"]},
format="parquet")
Please refer: AWS Documentation
GlueContext.write_dynamic_frame.from_options
amazon web services - AWS Glue export to parquet issue using glueContext.write_dynamic_frame.from_options - Stack Overflow
Duplicate Records in Parquet (Processed) Table after AWS Glue Job execution.
Overwrite or truncate existing data on jdbc connection Mysql with glue job
Part 1: identifying the problem
The solution how to find what is causing the problem was to switch output from .parquet to .csv and drop ResolveChoice or DropNullFields (as it is automatically suggested by Glue for .parquet):
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://xxxx"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
It has produced the more detailed error message:
An error occurred while calling o120.pyWriteDynamicFrame. Job aborted due to stage failure: Task 5 in stage 0.0 failed 4 times, most recent failure: Lost task 5.3 in stage 0.0 (TID 182, ip-172-31-78-99.ec2.internal, executor 15): com.amazonaws.services.glue.util.FatalException: Unable to parse file: xxxx1.csv.gz
The file xxxx1.csv.gz mentioned in the error message appears to be too big for Glue (~100Mb .gzip and ~350Mb as uncompressed .csv).
Part 2: true source of the problem and fix
As mentioned in the 1st part thanks to export to .csv it was possible to identify the wrong file.
Further investigation by loading .csv into R has revealed that one of the columns contains a single string record, while all other values of this column were long or NULL.
After dropping this value in R and re-uploading data to S3 the problem vanished.
Note #1: the column was declared string in Athena so I consider this behaviour as bug
Note #2: the nature of the problem was not the size of the data. I have successfuly processed files up to 200Mb .csv.gz which correspond to roughtly 600 Mb .csv.
Please use the updated table schema from the data catalog.
I have gone through this same error. In my case, the crawler had created another table of the same file in the database. I was referencing the old one. This can happen if crawler was crawling again and again the same path and made different schema table in data catalog. So glue job wasn't finding the table name and schema. Thereby giving this error.
Moreover, you can change DeleteBehavior: "LOG" to DeleteBehavior: "DELETE_IN_DATABASE"
You have a few options:
- DynamicFrameWriter doesn't support overwriting data in S3 yet. Instead you can use Spark native
write(). However, for really large datasets it can be a bit inefficient as a single worker will be used to overwrite existing data in S3. An example is below:
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase", table_name = "mydata", transformation_ctx = "DataSource0")
ds_df = DataSource0.toDF()
ds_df1 = ds_df.select("year","month",upper(col('colA')),upper(col('colB')),upper(col('colC')),upper(col('colD')))
ds_df1 \
.write.mode('overwrite') \
.format('parquet') \
.partitionBy('year', 'month') \
.save('s3://<bucket>/mydata-transformed/')
job.commit()
- In the lambda function, you could use delete data in S3 under a certain prefix. An example using Python and boto3 is:
import boto3
s3_res = boto3.resource('s3')
bucket = 'my-bucket-name'
# Add any logic to derive required prefix based on year/month/day
prefix = 'mydata/year=2020/month=10/'
s3_res.Bucket(bucket).objects.filter(Prefix=key).delete()
- You can use Glue's
purge_s3_pathto delete data from a certain prefix. Link is here.
now exist a function in glue for delete S3 path or delete glue catalog table.
AWS GLue doc
Hi everyone! I'm trying to follow this tutorial https://aws.amazon.com/blogs/big-data/harmonize-query-and-visualize-data-from-various-providers-using-aws-glue-amazon-athena-and-amazon-quicksight/ to understand AWS Glue a bit better, but I'm having a hard time with one of the steps
In the job generation, they have this step
Let’s now convert that to a DataFrame. Please replace the <DYNAMIC_FRAME_NAME> with the name generated in the script.
And this snippet
##---------------------------------- #convert to a Spark DataFrame... customDF = <DYNAMIC_FRAME_NAME>.toDF()
But I can't seem to find where the <DYNAMIC_FRAME_NAME> can be found. I thought it was customDF = resolvechoice2.toDF() , but it didn't run correctly.
Here's my entire code (with the edited names of the buckets, of course)
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import lit
from awsglue.dynamicframe import DynamicFrame
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "nycitytaxianalysis", table_name = "blog_yellow", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "nycitytaxianalysis", table_name = "blog_yellow", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("vendorid", "long", "vendorid", "long"), ("tpep_pickup_datetime", "string", "pickup_datetime", "timestamp"), ("tpep_dropoff_datetime", "string", "dropoff_datetime", "timestamp"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("pickup_longitude", "double", "pickup_longitude", "double"), ("pickup_latitude", "double", "pickup_latitude", "double"), ("ratecodeid", "long", "ratecodeid", "long"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("dropoff_longitude", "double", "dropoff_longitude", "double"), ("dropoff_latitude", "double", "dropoff_latitude", "double"), ("payment_type", "long", "payment_type", "long"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("vendorid", "long", "vendorid", "long"), ("tpep_pickup_datetime", "string", "pickup_datetime", "timestamp"), ("tpep_dropoff_datetime", "string", "dropoff_datetime", "timestamp"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("pickup_longitude", "double", "pickup_longitude", "double"), ("pickup_latitude", "double", "pickup_latitude", "double"), ("ratecodeid", "long", "ratecodeid", "long"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("dropoff_longitude", "double", "dropoff_longitude", "double"), ("dropoff_latitude", "double", "dropoff_latitude", "double"), ("payment_type", "long", "payment_type", "long"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
##----------------------------------
#convert to a Spark DataFrame...
customDF = resolvechoice2.toDF() <<---- HERE'S MY CODE
#add a new column for "type"
customDF = customDF.withColumn("type", lit('yellow'))
# Convert back to a DynamicFrame for further processing.
customDynamicFrame = DynamicFrame.fromDF(customDF, glueContext, "customDF_df")
##----------------------------------
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://<<s3-bucket>>/glue-blog/"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = customDynamicFrame]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = customDynamicFrame, connection_type = "s3", connection_options = {"path": "s3://<<s3-bucket>>/glue-blog/"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()Where can I find the <DYNAMIC_FRAME_NAME> ?
Thanks!