You can try this way:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('so')\
.getOrCreate()
sc = spark.sparkContext
map = {'a': 3, 'b': 44}
data = sc.parallelize([(k, v) for k, v in map.items()]).toDF(['A', 'B'])
data.show()
# +---+---+
# | A| B|
# +---+---+
# | a| 3|
# | b| 44|
# +---+---+
Answer from kites on Stack OverflowThe notebook is using version 0.17.1 of pandas but the autovizwidget depends on a later version of pandas that has the 'api' module. I've been told that this will be resolved in a subsequent release of HDInsight configs.
ssh into the cluster and run the following:
sudo -HE /usr/bin/anaconda/bin/conda install pandas
Had the same issue. I used:
pip install pandas --upgrade --user
via the terminal available in the jupyter notebook.
You don't have pyspark installed in a place available to the python installation you're using. To confirm this, on your command line terminal, with your virtualenv activated, enter your REPL (python) and type import pyspark:
$ python
Python 3.5.0 (default, Dec 3 2015, 09:58:14)
[GCC 4.2.1 Compatible Apple LLVM 7.0.0 (clang-700.1.76)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyspark
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ImportError: No module named 'pyspark'
If you see the No module name 'pyspark' ImportError you need to install that library. Quit the REPL and type:
pip install pyspark
Then re-enter the repl to confirm it works:
$ python
Python 3.5.0 (default, Dec 3 2015, 09:58:14)
[GCC 4.2.1 Compatible Apple LLVM 7.0.0 (clang-700.1.76)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyspark
>>>
As a note, it is critical your virtual environment is activated. When in the directory of your virtual environment:
$ source bin/activate
These instructions are for a unix-based machine, and will vary for Windows.
Just use:
import findspark
findspark.init()
import pyspark # only run after findspark.init()
If you don't have findspark module install it with:
python -m pip install findspark
SparkSession is not a replacement for a SparkContext but an equivalent of the SQLContext. Just use it use the same way as you used to use SQLContext:
spark.createDataFrame(...)
and if you ever have to access SparkContext use sparkContext attribute:
spark.sparkContext
so if you need SQLContext for backwards compatibility you can:
SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
Whenever we are trying to create a DF from a backward-compatible object like RDD or a data frame created by spark session, you need to make your SQL context-aware about your session and context.
Like Ex:
If I create a RDD:
ss=SparkSession.builder.appName("vivek").master('local').config("k1","vi").getOrCreate()
rdd=ss.sparkContext.parallelize([('Alex',21),('Bob',44)])
But if we wish to create a df from this RDD, we need to
sq=SQLContext(sparkContext=ss.sparkContext, sparkSession=ss)
then only we can use SQLContext with RDD/DF created by pandas.
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)])
df=sq.createDataFrame(rdd,schema)
df.collect()
I had the same error and followed the stack trace.
In my case, I was building an Egg file and then passing it to spark via the --py-files option.
Concerning the error, I think it boils down to the fact that when you call F.udf(str2num, t.IntegerType()) a UserDefinedFunction instance is created before Spark is running, so it has an empty reference to some SparkContext, call it sc. When you run the UDF, sc._pickled_broadcast_vars is referenced and this throws the AttributeError in your output.
My work around is to avoid creating the UDF until Spark is running (and hence there is an active SparkContext. In your case, you could just change your definition of
def letConvNum(df): # df is a PySpark DataFrame
#Get a list of columns that I want to transform, using the metadata Pandas DataFrame
chng_cols=metadta[(metadta.comments=='letter conversion to num')].col_name.tolist()
str2numUDF = F.udf(str2num, t.IntegerType()) # create UDF on demand
for curcol in chng_cols:
df=df.withColumn(curcol, str2numUDF(df[curcol]))
return df
Note: I haven't actually tested the code above, but the change in my own code was similar and everything worked fine.
Also, for the interested reader, see the Spark code for UserDefinedFunction
I think a cleaner solution would be to use the udf decorator to define your udf function :
from pyspark.sql.functions as F
@F.udf
def str2numUDF(text):
if type(text)==None or text=='' or text=='NULL' or text=='null':
return 0
elif len(text)==1:
return ord(text)
else:
newnum=''
for lettr in text:
newnum=newnum+str(ord(lettr))
return int(newnum)
With this solution, the udf does not reference any other function so it won't throw any errors at you.
For some older versions of spark, the decorator doesn't support typed udf some you might have to define a custom decorator as follow :
from pyspark.sql.functions as F
from pyspark.sql.types as t
# Custom udf decorator which accept return type
def udf_typed(returntype=t.StringType()):
def _typed_udf_wrapper(func):
return F.udf(func, returntype)
return _typed_udf_wrapper
@udf_typed(t.IntegerType())
def my_udf(x)
return int(x)