It's possible to load data directly from s3 using Glue:
sourceDyf = glueContext.create_dynamic_frame_from_options(
connection_type="s3",
format="csv",
connection_options={
"paths": ["s3://bucket/folder"]
},
format_options={
"withHeader": True,
"separator": ","
})
You can also do that just with spark (as you already tried):
sourceDf = spark.read
.option("header","true")
.option("delimiter", ",")
.csv("C:\inputs\TEST.txt")
However, in this case Glue doesn't guarantee that they provide appropriate Spark readers. So if your error is related to missing data source for CSV then you should add spark-csv lib to the Glue job by providing s3 path to its locations via --extra-jars parameter.
Answer from Yuriy Bondaruk on Stack Overflowamazon web services - How can we apply encoding in dynamic frame from option while reading csv file from s3 location? - Stack Overflow
pyspark - How to load a csv/txt file into AWS Glue job - Stack Overflow
amazon web services - How to use AWS Glue / Spark to convert CSVs partitioned and split in S3 to partitioned and split Parquet - Stack Overflow
amazon web services - AWS Glue export to parquet issue using glueContext.write_dynamic_frame.from_options - Stack Overflow
It's possible to load data directly from s3 using Glue:
sourceDyf = glueContext.create_dynamic_frame_from_options(
connection_type="s3",
format="csv",
connection_options={
"paths": ["s3://bucket/folder"]
},
format_options={
"withHeader": True,
"separator": ","
})
You can also do that just with spark (as you already tried):
sourceDf = spark.read
.option("header","true")
.option("delimiter", ",")
.csv("C:\inputs\TEST.txt")
However, in this case Glue doesn't guarantee that they provide appropriate Spark readers. So if your error is related to missing data source for CSV then you should add spark-csv lib to the Glue job by providing s3 path to its locations via --extra-jars parameter.
Below 2 cases i tested working fine:
To load a file from S3 into Glue.
dfnew = glueContext.create_dynamic_frame_from_options("s3", {'paths': ["s3://MyBucket/path/"] }, format="csv" )
dfnew.show(2)
To load data from Glue db and tables which are generated already through Glue Crawlers.
DynFr = glueContext.create_dynamic_frame.from_catalog(database="test_db", table_name="test_table")
DynFr is a DynamicFrame, so if we want to work with Spark code in Glue, then we need to convert it into a normal data frame like below.
df1 = DynFr.toDF()
Since the source CSV files are not necessarily in the right date, you could add to them additional information regarding collect date time (or use any date if already available):
{"collectDateTime": {
"timestamp": 1518091828,
"timestampMs": 1518091828116,
"day": 8,
"month": 2,
"year": 2018
}}
Then your job could use this information in the output DynamicFrame and ultimately use them as partitions. Some sample code of how to achieve this:
from awsglue.transforms import *
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
import sys
import datetime
###
# CREATE THE NEW SIMPLIFIED LINE
##
def create_simplified_line(event_dict):
# collect date time
collect_date_time_dict = event_dict["collectDateTime"]
new_line = {
# TODO: COPY YOUR DATA HERE
"myData": event_dict["myData"],
"someOtherData": event_dict["someOtherData"],
"timestamp": collect_date_time_dict["timestamp"],
"timestampmilliseconds": long(collect_date_time_dict["timestamp"]) * 1000,
"year": collect_date_time_dict["year"],
"month": collect_date_time_dict["month"],
"day": collect_date_time_dict["day"]
}
return new_line
###
# MAIN FUNCTION
##
# context
glueContext = GlueContext(SparkContext.getOrCreate())
# fetch from previous day source bucket
previous_date = datetime.datetime.utcnow() - datetime.timedelta(days=1)
# build s3 paths
s3_path = "s3://source-bucket/path/year={}/month={}/day={}/".format(previous_date.year, previous_date.month, previous_date.day)
# create dynamic_frame
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [s3_path]}, format="json", format_options={}, transformation_ctx="dynamic_frame")
# resolve choices (optional)
dynamic_frame_resolved = ResolveChoice.apply(frame=dynamic_frame,choice="project:double",transformation_ctx="dynamic_frame_resolved")
# transform the source dynamic frame into a simplified version
result_frame = Map.apply(frame=dynamic_frame_resolved, f=create_simplified_line)
# write to simple storage service in parquet format
glueContext.write_dynamic_frame.from_options(frame=result_frame, connection_type="s3", connection_options={"path":"s3://target-bucket/path/","partitionKeys":["year", "month", "day"]}, format="parquet")
Did not test it, but the script is just a sample of how to achieve this and is fairly straightforward.
UPDATE
1) As for having specific file sizes/numbers in output partitions,
Spark's coalesce and repartition features are not yet implemented in Glue's Python API (only in Scala).
You can convert your dynamic frame into a data frame and leverage Spark's partition capabilities.
Convert to a dataframe and partition based on "partition_col"
partitioned_dataframe = datasource0.toDF().repartition(1)
Convert back to a DynamicFrame for further processing.
partitioned_dynamicframe = DynamicFrame.fromDF(partitioned_dataframe, glueContext, "partitioned_df")
The good news is that Glue has an interesting feature that if you have more than 50,000 input files per partition it'll automatically group them to you.
In case you want to specifically set this behavior regardless of input files number (your case), you may set the following connection_options while "creating a dynamic frame from options":
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [s3_path], 'groupFiles': 'inPartition', 'groupSize': 1024 * 1024}, format="json", format_options={}, transformation_ctx="dynamic_frame")
In the previous example, it would attempt to group files into 1MB groups.
It is worth mentioning that this is not the same as coalesce, but it may help if your goal is to reduce the number of files per partition.
2) If files already exist in the destination, will it just safely add it (not overwrite or delete)
Glue's default SaveMode for write_dynamic_frame.from_options is to append.
When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
3) Given each source partition may be 30-100GB, what's a guideline for # of DPUs
I'm afraid I won't be able to answer that. It depends on how fast it'll load your input files (size/number), your script's transformations, etc.
Import the datetime library
import datetime
Split the timestamp based on partition conditions
now=datetime.datetime.now()
year= str(now.year)
month= str(now.month)
day= str(now.day)
currdate= "s3:/Destination/"+year+"/"+month+"/"+day
Add the variable currdate in the path address in the writer class. The results will be patitioned parquet files.
Part 1: identifying the problem
The solution how to find what is causing the problem was to switch output from .parquet to .csv and drop ResolveChoice or DropNullFields (as it is automatically suggested by Glue for .parquet):
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://xxxx"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
It has produced the more detailed error message:
An error occurred while calling o120.pyWriteDynamicFrame. Job aborted due to stage failure: Task 5 in stage 0.0 failed 4 times, most recent failure: Lost task 5.3 in stage 0.0 (TID 182, ip-172-31-78-99.ec2.internal, executor 15): com.amazonaws.services.glue.util.FatalException: Unable to parse file: xxxx1.csv.gz
The file xxxx1.csv.gz mentioned in the error message appears to be too big for Glue (~100Mb .gzip and ~350Mb as uncompressed .csv).
Part 2: true source of the problem and fix
As mentioned in the 1st part thanks to export to .csv it was possible to identify the wrong file.
Further investigation by loading .csv into R has revealed that one of the columns contains a single string record, while all other values of this column were long or NULL.
After dropping this value in R and re-uploading data to S3 the problem vanished.
Note #1: the column was declared string in Athena so I consider this behaviour as bug
Note #2: the nature of the problem was not the size of the data. I have successfuly processed files up to 200Mb .csv.gz which correspond to roughtly 600 Mb .csv.
Please use the updated table schema from the data catalog.
I have gone through this same error. In my case, the crawler had created another table of the same file in the database. I was referencing the old one. This can happen if crawler was crawling again and again the same path and made different schema table in data catalog. So glue job wasn't finding the table name and schema. Thereby giving this error.
Moreover, you can change DeleteBehavior: "LOG" to DeleteBehavior: "DELETE_IN_DATABASE"
You can try withHeader param. e.g.
dyF = glueContext.create_dynamic_frame.from_options(
's3',
{'paths': ['s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv']},
'csv',
{'withHeader': True})
The documentation for this can be found here
I know this post is old, but I just ran into a similar issue and spent way too long figuring out what the problem was. Wanted to share my solution in case it's helpful to others!
I was using the GUI on AWS and forgot to actually add the correct classifier to the crawler before running it. This resulted in AWS Glue incorrectly detecting datatypes (they mostly came out as strings) and the column names were not detected (they came out as col1, col2, etc). You can create the classifier in "classifiers" under "crawlers". Then, when setting up the crawler, add your classifier to the "selected classifiers" section at the bottom.
Documentation: https://docs.aws.amazon.com/glue/latest/dg/add-classifier.html