You can't reference a second spark DataFrame inside a function, unless you're using a join. IIUC, you can do the following to achieve your desired result.
Suppose that means is the following:
#means.show()
#+---+---------+
#| id|avg(col1)|
#+---+---------+
#| 1| 12.0|
#| 3| 300.0|
#| 2| 21.0|
#+---+---------+
Join df and means on the id column, then apply your when condition
from pyspark.sql.functions import when
df.join(means, on="id")\
.withColumn(
"col1",
when(
(df["col1"].isNull()),
means["avg(col1)"]
).otherwise(df["col1"])
)\
.select(*df.columns)\
.show()
#+---+-----+
#| id| col1|
#+---+-----+
#| 1| 12.0|
#| 1| 12.0|
#| 1| 14.0|
#| 1| 10.0|
#| 3|300.0|
#| 3|300.0|
#| 2| 21.0|
#| 2| 22.0|
#| 2| 20.0|
#+---+-----+
But in this case, I'd actually recommend using a Window with pyspark.sql.functions.mean:
from pyspark.sql import Window
from pyspark.sql.functions import col, mean
df.withColumn(
"col1",
when(
col("col1").isNull(),
mean("col1").over(Window.partitionBy("id"))
).otherwise(col("col1"))
).show()
#+---+-----+
#| id| col1|
#+---+-----+
#| 1| 12.0|
#| 1| 10.0|
#| 1| 12.0|
#| 1| 14.0|
#| 3|300.0|
#| 3|300.0|
#| 2| 22.0|
#| 2| 20.0|
#| 2| 21.0|
#+---+-----+
Answer from pault on Stack OverflowYou can't reference a second spark DataFrame inside a function, unless you're using a join. IIUC, you can do the following to achieve your desired result.
Suppose that means is the following:
#means.show()
#+---+---------+
#| id|avg(col1)|
#+---+---------+
#| 1| 12.0|
#| 3| 300.0|
#| 2| 21.0|
#+---+---------+
Join df and means on the id column, then apply your when condition
from pyspark.sql.functions import when
df.join(means, on="id")\
.withColumn(
"col1",
when(
(df["col1"].isNull()),
means["avg(col1)"]
).otherwise(df["col1"])
)\
.select(*df.columns)\
.show()
#+---+-----+
#| id| col1|
#+---+-----+
#| 1| 12.0|
#| 1| 12.0|
#| 1| 14.0|
#| 1| 10.0|
#| 3|300.0|
#| 3|300.0|
#| 2| 21.0|
#| 2| 22.0|
#| 2| 20.0|
#+---+-----+
But in this case, I'd actually recommend using a Window with pyspark.sql.functions.mean:
from pyspark.sql import Window
from pyspark.sql.functions import col, mean
df.withColumn(
"col1",
when(
col("col1").isNull(),
mean("col1").over(Window.partitionBy("id"))
).otherwise(col("col1"))
).show()
#+---+-----+
#| id| col1|
#+---+-----+
#| 1| 12.0|
#| 1| 10.0|
#| 1| 12.0|
#| 1| 14.0|
#| 3|300.0|
#| 3|300.0|
#| 2| 22.0|
#| 2| 20.0|
#| 2| 21.0|
#+---+-----+
I think you are using Scala API, in which you use (). In PySpark, use [] instead.
A short, clean, scalable solution
Change some columns, leave the rest untouched
import pyspark.sql.functions as F
# That's not part of the solution, just a creation of a sample dataframe
# df = spark.createDataFrame([(10, 1,2,3,4),(20, 5,6,7,8)],'Id int, Revenue int ,GROSS_PROFIT int ,Net_Income int ,Enterprise_Value int')
cols_to_cast = ["Revenue" ,"GROSS_PROFIT" ,"Net_Income" ,"Enterprise_Value"]
df = df.select([F.col(c).cast('double') if c in cols_to_cast else c for c in df.columns])
df.printSchema()
root
|-- Id: integer (nullable = true)
|-- Revenue: double (nullable = true)
|-- GROSS_PROFIT: double (nullable = true)
|-- Net_Income: double (nullable = true)
|-- Enterprise_Value: double (nullable = true)
If this helps
df = spark.createDataFrame([(1, 0),
(2, 1),
(3 ,1),
(4, 1),
(5, 0),
(6 ,0),
(7, 1),
(8 ,1),
(9 ,1),
(10, 1),
(11, 0),
(12, 0)],
('Time' ,'Tag1'))
df = df.withColumn('a', col('Time').cast('integer')).withColumn('a1', col('Tag1').cast('double'))
df.printSchema()
df.show()
It's related to the Databricks Runtime (DBR) version used - the Spark versions in up to DBR 12.2 rely on .iteritems function to construct a Spark DataFrame from Pandas DataFrame. This issue was fixed in the Spark 3.4 that is available as DBR 13.x.
If you can't upgrade to DBR 13.x, then you need to downgrade the Pandas to latest 1.x version (1.5.3 right now) by using %pip install -U pandas==1.5.3 command in your notebook. Although it's just better to use Pandas version shipped with your DBR - it was tested for compatibility with other packages in DBR.
I couldn't change package versions, but it looks like this was a name change only.
So I did
df.iteritems = df.items
and spark.createDataFrame(df) works now.
Sure, it's ugly, and it will break my notebook when I move to a cluster with a new DBR, but it works for now.
EDIT: AyoubH's answer is better because you only have to do it once. With the code above, you have to modify every data frame you display.
As a workaround, downgrade to pandas v1.5
%pip install --upgrade pandas==1.5
The answers provided till now used to work prior to 3rd April 2023.
As of April 4, with pandas 2.0.0, you are not able to convert a Pandas DataFrame to a Spark DataFrame using the command:
spark.createDataFrame(df)
Using the above command leads to the error mentioned in the question:
AttributeError: 'DataFrame' object has no attribute 'iteritems'
The iteritems function seems to have been removed in pandas 2.0.0. From the changelog of pandas 2.0.0:
Removed deprecated Series.iteritems(), DataFrame.iteritems(), use obj.items instead
While the code written in spark to convert pandas dataframe to a spark dataframe still uses iteritems
/databricks/spark/python/pyspark/sql/pandas/conversion.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
308 warnings.warn(msg)
309 raise
--> 310 data = self._convert_from_pandas(data, schema, timezone)
311 return self._create_dataframe(data, schema, samplingRatio, verifySchema)
312
/databricks/spark/python/pyspark/sql/pandas/conversion.py in _convert_from_pandas(self, pdf, schema, timezone)
340 pdf[field.name] = s
341 else:
--> 342 for column, series in pdf.iteritems():
343 s = _check_series_convert_timestamps_tz_local(series, timezone)
344 if s is not series:
Looks like we will have to wait for a fix to use Pandas 2.0.0.
You just need to use display function passing Pandas DataFrame as the argument - not try to call it as a member of the Pandas DataFrame class.
display(pdf)

Or you can simply specify variable name with Pandas DataFrame object - then it will be printed using Panda's built-in representation
import pyspark.sql.functions as F
pdf = spark.range(10).withColumn("rnd", F.rand()).toPandas()

"sklearn.datasets" is a scikit package, where it contains a method load_iris().
load_iris(), by default return an object which holds data, target and other members in it. In order to get actual values you have to read the data and target content itself.
Whereas 'iris.csv', holds feature and target together.
FYI: If you set return_X_y as True in load_iris(), then you will directly get features and target.
from sklearn import datasets
data,target = datasets.load_iris(return_X_y=True)
The Iris Dataset from Sklearn is in Sklearn's Bunch format:
print(type(iris))
print(iris.keys())
output:
<class 'sklearn.utils.Bunch'>
dict_keys(['data', 'target', 'target_names', 'DESCR', 'feature_names', 'filename'])
So, that's why you can access it as:
x=iris.data
y=iris.target
But when you read the CSV file as DataFrame as mentioned by you:
iris = pd.read_csv('iris.csv',header=None).iloc[:,2:4]
iris.head()
output is:
2 3
0 petal_length petal_width
1 1.4 0.2
2 1.4 0.2
3 1.3 0.2
4 1.5 0.2
Here the column names are '1' and '2'.
First of all you should read the CSV file as:
df = pd.read_csv('iris.csv')
you should not include header=None as your csv file includes the column names i.e. the headers.
So, now what you can do is something like this:
X = df.iloc[:, [2, 3]] # Will give you columns 2 and 3 i.e 'petal_length' and 'petal_width'
y = df.iloc[:, 4] # Label column i.e 'species'
or if you want to use the column names then:
X = df[['petal_length', 'petal_width']]
y = df.iloc['species']
Also, if you want to convert labels from string to numerical format use sklearn LabelEncoder
from sklearn import preprocessing
le = preprocessing.LabelEncoder()
y = le.fit_transform(y)
The book you're referring to describes Scala / Java API. In PySpark use []
df["count"]
The book combines the Scala and PySpark API's.
In Scala / Java API, df.col("column_name") or df.apply("column_name") return the Column.
Whereas in pyspark use the below to get the column from DF.
df.colName
df["colName"]