The syntax is valid with Pandas DataFrames but that attribute doesn't exist for the PySpark created DataFrames. You can check out this link for the documentation.
Usually, the collect() method or the .rdd attribute would help you with these tasks.
You can use the following snippet to produce the desired result:
http_path = sdf.rdd.map(lambda row: row['http_path'].split('?'))
api_param_df = pd.DataFrame([[row[0], np.nan] if len(row) == 1 else row for row in http_path.collect()], columns=["api", "param"])
sdf = pd.concat([sdf.toPandas()['raw'], api_param_df], axis=1)
Note that I removed the comments to make it more readable and I've also substituted the regex with a simple split.
You can't reference a second spark DataFrame inside a function, unless you're using a join. IIUC, you can do the following to achieve your desired result.
Suppose that means is the following:
#means.show()
#+---+---------+
#| id|avg(col1)|
#+---+---------+
#| 1| 12.0|
#| 3| 300.0|
#| 2| 21.0|
#+---+---------+
Join df and means on the id column, then apply your when condition
from pyspark.sql.functions import when
df.join(means, on="id")\
.withColumn(
"col1",
when(
(df["col1"].isNull()),
means["avg(col1)"]
).otherwise(df["col1"])
)\
.select(*df.columns)\
.show()
#+---+-----+
#| id| col1|
#+---+-----+
#| 1| 12.0|
#| 1| 12.0|
#| 1| 14.0|
#| 1| 10.0|
#| 3|300.0|
#| 3|300.0|
#| 2| 21.0|
#| 2| 22.0|
#| 2| 20.0|
#+---+-----+
But in this case, I'd actually recommend using a Window with pyspark.sql.functions.mean:
from pyspark.sql import Window
from pyspark.sql.functions import col, mean
df.withColumn(
"col1",
when(
col("col1").isNull(),
mean("col1").over(Window.partitionBy("id"))
).otherwise(col("col1"))
).show()
#+---+-----+
#| id| col1|
#+---+-----+
#| 1| 12.0|
#| 1| 10.0|
#| 1| 12.0|
#| 1| 14.0|
#| 3|300.0|
#| 3|300.0|
#| 2| 22.0|
#| 2| 20.0|
#| 2| 21.0|
#+---+-----+
I think you are using Scala API, in which you use (). In PySpark, use [] instead.
A short, clean, scalable solution
Change some columns, leave the rest untouched
import pyspark.sql.functions as F
# That's not part of the solution, just a creation of a sample dataframe
# df = spark.createDataFrame([(10, 1,2,3,4),(20, 5,6,7,8)],'Id int, Revenue int ,GROSS_PROFIT int ,Net_Income int ,Enterprise_Value int')
cols_to_cast = ["Revenue" ,"GROSS_PROFIT" ,"Net_Income" ,"Enterprise_Value"]
df = df.select([F.col(c).cast('double') if c in cols_to_cast else c for c in df.columns])
df.printSchema()
root
|-- Id: integer (nullable = true)
|-- Revenue: double (nullable = true)
|-- GROSS_PROFIT: double (nullable = true)
|-- Net_Income: double (nullable = true)
|-- Enterprise_Value: double (nullable = true)
If this helps
df = spark.createDataFrame([(1, 0),
(2, 1),
(3 ,1),
(4, 1),
(5, 0),
(6 ,0),
(7, 1),
(8 ,1),
(9 ,1),
(10, 1),
(11, 0),
(12, 0)],
('Time' ,'Tag1'))
df = df.withColumn('a', col('Time').cast('integer')).withColumn('a1', col('Tag1').cast('double'))
df.printSchema()
df.show()
I figured it out. Looks like it has to do with our spark version. It worked with 1.6
if you are working with spark version 1.6 then use this code for conversion of rdd into df
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(rdd)
if you want to assign title to rows then use this
df= rdd.map(lambda p: Row(ip=p[0], time=p[1], zone=p[2]))
ip,time,zone are row headers in this example.
The book you're referring to describes Scala / Java API. In PySpark use []
df["count"]
The book combines the Scala and PySpark API's.
In Scala / Java API, df.col("column_name") or df.apply("column_name") return the Column.
Whereas in pyspark use the below to get the column from DF.
df.colName
df["colName"]
The syntax you are using is for a pandas DataFrame. To achieve this for a spark DataFrame, you should use the withColumn() method. This works great for a wide range of well defined DataFrame functions, but it's a little more complicated for user defined mapping functions.
General Case
In order to define a udf, you need to specify the output data type. For instance, if you wanted to apply a function my_func that returned a string, you could create a udf as follows:
import pyspark.sql.functions as f
my_udf = f.udf(my_func, StringType())
Then you can use my_udf to create a new column like:
df = df.withColumn('new_column', my_udf(f.col("some_column_name")))
Another option is to use select:
df = df.select("*", my_udf(f.col("some_column_name")).alias("new_column"))
Specific Problem
Using a udf
In your specific case, you want to use a dictionary to translate the values of your DataFrame.
Here is a way to define a udf for this purpose:
some_map_udf = f.udf(lambda x: some_map.get(x, None), IntegerType())
Notice that I used dict.get() because you want your udf to be robust to bad inputs.
df = df.withColumn('new_column', some_map_udf(f.col("some_column_name")))
Using DataFrame functions
Sometimes using a udf is unavoidable, but whenever possible, using DataFrame functions is usually preferred.
Here is one option to do the same thing without using the udf.
The trick is to iterate over the items in some_map to create a list of pyspark.sql.functions.when() functions.
some_map_func = [f.when(f.col("some_column_name") == k, v) for k, v in some_map.items()]
print(some_map_func)
#[Column<CASE WHEN (some_column_name = a) THEN 0 END>,
# Column<CASE WHEN (some_column_name = c) THEN 1 END>,
# Column<CASE WHEN (some_column_name = b) THEN 1 END>]
Now you can use pyspark.sql.functions.coalesce() inside of a select:
df = df.select("*", f.coalesce(*some_map_func).alias("some_column_name"))
This works because when() returns null by default if the condition is not met, and coalesce() will pick the first non-null value it encounters. Since the keys of the map are unique, at most one column will be non-null.
You have a spark dataframe, not a pandas dataframe. To add new column to the spark dataframe:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
df = df.withColumn('new_column', F.udf(some_map.get, IntegerType())(some_column_name))
df.show()
It's related to the Databricks Runtime (DBR) version used - the Spark versions in up to DBR 12.2 rely on .iteritems function to construct a Spark DataFrame from Pandas DataFrame. This issue was fixed in the Spark 3.4 that is available as DBR 13.x.
If you can't upgrade to DBR 13.x, then you need to downgrade the Pandas to latest 1.x version (1.5.3 right now) by using %pip install -U pandas==1.5.3 command in your notebook. Although it's just better to use Pandas version shipped with your DBR - it was tested for compatibility with other packages in DBR.
I couldn't change package versions, but it looks like this was a name change only.
So I did
df.iteritems = df.items
and spark.createDataFrame(df) works now.
Sure, it's ugly, and it will break my notebook when I move to a cluster with a new DBR, but it works for now.
EDIT: AyoubH's answer is better because you only have to do it once. With the code above, you have to modify every data frame you display.