DynamicFrame is safer when handling memory intensive jobs. "The executor memory with AWS Glue dynamic frames never exceeds the safe threshold," while on the other hand, Spark DataFrame could hit "Out of memory" issue on executors. (https://docs.aws.amazon.com/glue/latest/dg/monitor-profile-debug-oom-abnormalities.html)
DynamicFrames are designed to provide maximum flexibility when dealing with messy data that may lack a declared schema. Records are represented in a flexible self-describing way that preserves information about schema inconsistencies in the data.
For example, with changing requirements, an address column stored as a string in some records might be stored as a struct in later rows. Rather than failing or falling back to a string, DynamicFrames will track both types and gives users a number of options in how to resolve these inconsistencies, providing fine grain resolution options via the ResolveChoice transforms.
DynamicFrames also provide a number of powerful high-level ETL operations that are not found in DataFrames. For example, the Relationalize transform can be used to flatten and pivot complex nested data into tables suitable for transfer to a relational database. In additon, the ApplyMapping transform supports complex renames and casting in a declarative fashion.
DynamicFrames are also integrated with the AWS Glue Data Catalog, so creating frames from tables is a simple operation. Writing to databases can be done through connections without specifying the password. Moreover, DynamicFrames are integrated with job bookmarks, so running these scripts in the job system can allow the script to implictly keep track of what was read and written.(https://github.com/aws-samples/aws-glue-samples/blob/master/FAQ_and_How_to.md)
Answer from Fang Zhang on Stack Overflowamazon web services - DynamicFrame vs DataFrame - Stack Overflow
pyspark - convert spark dataframe to aws glue dynamic frame - Stack Overflow
pyspark - How to convert Dataframe to dynamic frame - Stack Overflow
Glue DynamicFrame show method yields nothing
DynamicFrame is safer when handling memory intensive jobs. "The executor memory with AWS Glue dynamic frames never exceeds the safe threshold," while on the other hand, Spark DataFrame could hit "Out of memory" issue on executors. (https://docs.aws.amazon.com/glue/latest/dg/monitor-profile-debug-oom-abnormalities.html)
DynamicFrames are designed to provide maximum flexibility when dealing with messy data that may lack a declared schema. Records are represented in a flexible self-describing way that preserves information about schema inconsistencies in the data.
For example, with changing requirements, an address column stored as a string in some records might be stored as a struct in later rows. Rather than failing or falling back to a string, DynamicFrames will track both types and gives users a number of options in how to resolve these inconsistencies, providing fine grain resolution options via the ResolveChoice transforms.
DynamicFrames also provide a number of powerful high-level ETL operations that are not found in DataFrames. For example, the Relationalize transform can be used to flatten and pivot complex nested data into tables suitable for transfer to a relational database. In additon, the ApplyMapping transform supports complex renames and casting in a declarative fashion.
DynamicFrames are also integrated with the AWS Glue Data Catalog, so creating frames from tables is a simple operation. Writing to databases can be done through connections without specifying the password. Moreover, DynamicFrames are integrated with job bookmarks, so running these scripts in the job system can allow the script to implictly keep track of what was read and written.(https://github.com/aws-samples/aws-glue-samples/blob/master/FAQ_and_How_to.md)
You can refer to the documentation here: DynamicFrame Class. It says,
A DynamicFrame is similar to a DataFrame, except that each record is self-describing, so no schema is required initially.
You want to use DynamicFrame when,
- Data that does not conform to a fixed schema.
Note: You can also convert the DynamicFrame to DataFrame using toDF()
- Refer here: def toDF
fromDF is a class function. Here is how you can convert Dataframe to DynamicFrame
from awsglue.dynamicframe import DynamicFrame
DynamicFrame.fromDF(test_df, glueContext, "test_nest")
AWS Docs
Just to consolidate the answers for Scala users too, here's how to transform a Spark Dataframe to a DynamicFrame (the method fromDF doesn't exist in the scala API of the DynamicFrame) :
import com.amazonaws.services.glue.DynamicFrame
val dynamicFrame = DynamicFrame(df, glueContext)
I hope it helps !
I don't think AWSGlue provide any mapping method for it. After some struggling, I found the transformation was relatively easy in the pyspark. Here is the pseudo code:
Retrieve datasource from database
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = ...)Convert it into DF and transform it in spark
mapped_df = datasource0.toDF().select(explode(col("Datapoints")).alias("collection")).select("collection.*")Convert back to DynamicFrame and continue the rest of ETL process
mapped_datasource0 = DynamicFrame.fromDF(mapped_df, glueContext, "mapped_datasource0");
Thanks to this reference
Check the function split_row in the following link:
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-split_rows
Here is also an example of how to transform directly the DynamicFrame in case you also need to do it first:
datasource = glueContext.create_dynamic_frame.from_catalog(database = ...)
# Function to modify a single record
def process_record(record):
# Changes in the fields or adding fields
record["timestamp"] = record["Datapoints"] + '_sufix' # Any change you need
...
return record
processed_datasource = datasource.map(process_record)
Hi everyone! I'm trying to follow this tutorial https://aws.amazon.com/blogs/big-data/harmonize-query-and-visualize-data-from-various-providers-using-aws-glue-amazon-athena-and-amazon-quicksight/ to understand AWS Glue a bit better, but I'm having a hard time with one of the steps
In the job generation, they have this step
Let’s now convert that to a DataFrame. Please replace the <DYNAMIC_FRAME_NAME> with the name generated in the script.
And this snippet
##---------------------------------- #convert to a Spark DataFrame... customDF = <DYNAMIC_FRAME_NAME>.toDF()
But I can't seem to find where the <DYNAMIC_FRAME_NAME> can be found. I thought it was customDF = resolvechoice2.toDF() , but it didn't run correctly.
Here's my entire code (with the edited names of the buckets, of course)
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import lit
from awsglue.dynamicframe import DynamicFrame
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "nycitytaxianalysis", table_name = "blog_yellow", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "nycitytaxianalysis", table_name = "blog_yellow", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("vendorid", "long", "vendorid", "long"), ("tpep_pickup_datetime", "string", "pickup_datetime", "timestamp"), ("tpep_dropoff_datetime", "string", "dropoff_datetime", "timestamp"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("pickup_longitude", "double", "pickup_longitude", "double"), ("pickup_latitude", "double", "pickup_latitude", "double"), ("ratecodeid", "long", "ratecodeid", "long"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("dropoff_longitude", "double", "dropoff_longitude", "double"), ("dropoff_latitude", "double", "dropoff_latitude", "double"), ("payment_type", "long", "payment_type", "long"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("vendorid", "long", "vendorid", "long"), ("tpep_pickup_datetime", "string", "pickup_datetime", "timestamp"), ("tpep_dropoff_datetime", "string", "dropoff_datetime", "timestamp"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("pickup_longitude", "double", "pickup_longitude", "double"), ("pickup_latitude", "double", "pickup_latitude", "double"), ("ratecodeid", "long", "ratecodeid", "long"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("dropoff_longitude", "double", "dropoff_longitude", "double"), ("dropoff_latitude", "double", "dropoff_latitude", "double"), ("payment_type", "long", "payment_type", "long"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
##----------------------------------
#convert to a Spark DataFrame...
customDF = resolvechoice2.toDF() <<---- HERE'S MY CODE
#add a new column for "type"
customDF = customDF.withColumn("type", lit('yellow'))
# Convert back to a DynamicFrame for further processing.
customDynamicFrame = DynamicFrame.fromDF(customDF, glueContext, "customDF_df")
##----------------------------------
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://<<s3-bucket>>/glue-blog/"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = customDynamicFrame]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = customDynamicFrame, connection_type = "s3", connection_options = {"path": "s3://<<s3-bucket>>/glue-blog/"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()Where can I find the <DYNAMIC_FRAME_NAME> ?
Thanks!
Why do you want to convert from dataframe to DynamicFrame as you can't do unit testing using Glue APIs - No mocks for Glue APIs?
I prefer following approach:
- Write two files per glue job - job_glue.py and job_pyspark.py
- Write Glue API specific code in job_glue.py
- Write non-glue api specific code job_pyspark.py
- Write pytest test-cases to test job_pyspark.py
I think present there is no other alternate option for us other than using glue. For reference:Can I test AWS Glue code locally?
at least you need pyspark.context, awsglue.context and awsglue.dynamicframe There is example :
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
sc = SparkContext()
glueContext = GlueContext(sc)
NewDynamicFrame = DynamicFrame.fromDF(persons, glueContext, "nested")
"persons" is your DataFrame
Please check following links :
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-samples-medicaid.html
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-fromDF
You can create a dynamic frame from dataframe using the fromDF function.
Basic Syntax
dyf = fromDF(dataframe, glue_ctx, name)
where,
dataframe – The Apache Spark SQL DataFrame to convert (required).
glue_ctx – The GlueContext Class object that specifies the context for this transform (required).
name – The name of the resulting DynamicFrame (required).
Reference : Dynamic frame from dataframe