The line
spark = SparkSession.builder.master("local").name("test").getOrCreate
assigns the function getOrCreate to the variable spark, which is not what you wanted.
Instead, you want to assign the return value of the function getOrCreate (i.e. a spark session) to the variable spark, so you need to call the function using a pair of empty parentheses:
spark = SparkSession.builder.master("local").name("test").getOrCreate()
Answer from mck on Stack Overflowpython - Spark join throws 'function' object has no attribute '_get_object_id' error. How could I fix it? - Stack Overflow
AttributeError: 'function' object has no attribute '_jrdd' - Spark Streaming - CloudxLab Discussions
python - SQLContext object has no attribute read while reading csv in pyspark - Stack Overflow
Trouble with spark code in Notebook, 'str' object has no attribute 'option
Adding comment as answer since it solved the problem. count is somewhat of a protected keyword in DataFrame API, so naming columns count is dangerous. In your case you could circumvent the error by not using the dot notation, but bracket based column access, e.g.
info["count"]
Try to get info.count as a function call info.count().
movie_names_df = info.join(movies_df, info.movieId == movies_df.ID, "inner").select(movies_df.title, info.average, info.movieId, info.count()).show()
You can't reference a second spark DataFrame inside a function, unless you're using a join. IIUC, you can do the following to achieve your desired result.
Suppose that means is the following:
#means.show()
#+---+---------+
#| id|avg(col1)|
#+---+---------+
#| 1| 12.0|
#| 3| 300.0|
#| 2| 21.0|
#+---+---------+
Join df and means on the id column, then apply your when condition
from pyspark.sql.functions import when
df.join(means, on="id")\
.withColumn(
"col1",
when(
(df["col1"].isNull()),
means["avg(col1)"]
).otherwise(df["col1"])
)\
.select(*df.columns)\
.show()
#+---+-----+
#| id| col1|
#+---+-----+
#| 1| 12.0|
#| 1| 12.0|
#| 1| 14.0|
#| 1| 10.0|
#| 3|300.0|
#| 3|300.0|
#| 2| 21.0|
#| 2| 22.0|
#| 2| 20.0|
#+---+-----+
But in this case, I'd actually recommend using a Window with pyspark.sql.functions.mean:
from pyspark.sql import Window
from pyspark.sql.functions import col, mean
df.withColumn(
"col1",
when(
col("col1").isNull(),
mean("col1").over(Window.partitionBy("id"))
).otherwise(col("col1"))
).show()
#+---+-----+
#| id| col1|
#+---+-----+
#| 1| 12.0|
#| 1| 10.0|
#| 1| 12.0|
#| 1| 14.0|
#| 3|300.0|
#| 3|300.0|
#| 2| 22.0|
#| 2| 20.0|
#| 2| 21.0|
#+---+-----+
I think you are using Scala API, in which you use (). In PySpark, use [] instead.
Hello community,
I am trying to collect and send the results from a pyspark query to a textfile.
However, I keep on getting the error:
AttributeError: 'builtin_function_or_method' object has no attribute example8
I'm extremely new to pyspark.sql. The code is as follows:
#%%
import sys
from operator import add
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('aggs').getOrCreate()
df = spark.read.csv('/home/packt/Downloads/Spark_DataFrames/sales_info.csv',inferSchema=True,header=True)
example8 = spark.sql("""SELECT
*
FROM sales_info
ORDER BY Sales DESC""")
print.example8.collect()
example8.saveAsTextFile("/home/packt/test.txt")
read_rdd = sc.textFile("/home/packt/test.txt")
read_rdd.collect()
main()
The full error message is as follows:
Append ResultsClear Results
--------------------------------------------------------------------------- AttributeError Traceback (most recent call last) <ipython-input-42-714a9bbd2b92> in <module>() 74 FROM sales_info 75 ORDER BY Sales DESC""") ---> 76 print.example8.collect() 77 78 example8.saveAsTextFile("/home/packt/test.txt") AttributeError: 'builtin_function_or_method' object has no attribute 'example8'Any help figuring out the error will be greatly appreciated.
Thanks
The syntax you are using is for a pandas DataFrame. To achieve this for a spark DataFrame, you should use the withColumn() method. This works great for a wide range of well defined DataFrame functions, but it's a little more complicated for user defined mapping functions.
General Case
In order to define a udf, you need to specify the output data type. For instance, if you wanted to apply a function my_func that returned a string, you could create a udf as follows:
import pyspark.sql.functions as f
my_udf = f.udf(my_func, StringType())
Then you can use my_udf to create a new column like:
df = df.withColumn('new_column', my_udf(f.col("some_column_name")))
Another option is to use select:
df = df.select("*", my_udf(f.col("some_column_name")).alias("new_column"))
Specific Problem
Using a udf
In your specific case, you want to use a dictionary to translate the values of your DataFrame.
Here is a way to define a udf for this purpose:
some_map_udf = f.udf(lambda x: some_map.get(x, None), IntegerType())
Notice that I used dict.get() because you want your udf to be robust to bad inputs.
df = df.withColumn('new_column', some_map_udf(f.col("some_column_name")))
Using DataFrame functions
Sometimes using a udf is unavoidable, but whenever possible, using DataFrame functions is usually preferred.
Here is one option to do the same thing without using the udf.
The trick is to iterate over the items in some_map to create a list of pyspark.sql.functions.when() functions.
some_map_func = [f.when(f.col("some_column_name") == k, v) for k, v in some_map.items()]
print(some_map_func)
#[Column<CASE WHEN (some_column_name = a) THEN 0 END>,
# Column<CASE WHEN (some_column_name = c) THEN 1 END>,
# Column<CASE WHEN (some_column_name = b) THEN 1 END>]
Now you can use pyspark.sql.functions.coalesce() inside of a select:
df = df.select("*", f.coalesce(*some_map_func).alias("some_column_name"))
This works because when() returns null by default if the condition is not met, and coalesce() will pick the first non-null value it encounters. Since the keys of the map are unique, at most one column will be non-null.
You have a spark dataframe, not a pandas dataframe. To add new column to the spark dataframe:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
df = df.withColumn('new_column', F.udf(some_map.get, IntegerType())(some_column_name))
df.show()
The book you're referring to describes Scala / Java API. In PySpark use []
df["count"]
The book combines the Scala and PySpark API's.
In Scala / Java API, df.col("column_name") or df.apply("column_name") return the Column.
Whereas in pyspark use the below to get the column from DF.
df.colName
df["colName"]