You need to have an instance of the DeltaTable class, but you're passing the DataFrame instead. For this you need to create it using the DeltaTable.forPath (pointing to a specific path) or DeltaTable.forName (for a named table), like this:
DEV_Delta = DeltaTable.forPath(spark, 'some path')
DEV_Delta.alias("t").merge(df_from_pbl.alias("s"),condition_dev)\
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll()\
.execute()
If you have data as DataFrame only, you need to write them first.
See documentation for more details.
Answer from Alex Ott on Stack Overflow"sklearn.datasets" is a scikit package, where it contains a method load_iris().
load_iris(), by default return an object which holds data, target and other members in it. In order to get actual values you have to read the data and target content itself.
Whereas 'iris.csv', holds feature and target together.
FYI: If you set return_X_y as True in load_iris(), then you will directly get features and target.
from sklearn import datasets
data,target = datasets.load_iris(return_X_y=True)
The Iris Dataset from Sklearn is in Sklearn's Bunch format:
print(type(iris))
print(iris.keys())
output:
<class 'sklearn.utils.Bunch'>
dict_keys(['data', 'target', 'target_names', 'DESCR', 'feature_names', 'filename'])
So, that's why you can access it as:
x=iris.data
y=iris.target
But when you read the CSV file as DataFrame as mentioned by you:
iris = pd.read_csv('iris.csv',header=None).iloc[:,2:4]
iris.head()
output is:
2 3
0 petal_length petal_width
1 1.4 0.2
2 1.4 0.2
3 1.3 0.2
4 1.5 0.2
Here the column names are '1' and '2'.
First of all you should read the CSV file as:
df = pd.read_csv('iris.csv')
you should not include header=None as your csv file includes the column names i.e. the headers.
So, now what you can do is something like this:
X = df.iloc[:, [2, 3]] # Will give you columns 2 and 3 i.e 'petal_length' and 'petal_width'
y = df.iloc[:, 4] # Label column i.e 'species'
or if you want to use the column names then:
X = df[['petal_length', 'petal_width']]
y = df.iloc['species']
Also, if you want to convert labels from string to numerical format use sklearn LabelEncoder
from sklearn import preprocessing
le = preprocessing.LabelEncoder()
y = le.fit_transform(y)
SparkSession is not a replacement for a SparkContext but an equivalent of the SQLContext. Just use it use the same way as you used to use SQLContext:
spark.createDataFrame(...)
and if you ever have to access SparkContext use sparkContext attribute:
spark.sparkContext
so if you need SQLContext for backwards compatibility you can:
SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
Whenever we are trying to create a DF from a backward-compatible object like RDD or a data frame created by spark session, you need to make your SQL context-aware about your session and context.
Like Ex:
If I create a RDD:
ss=SparkSession.builder.appName("vivek").master('local').config("k1","vi").getOrCreate()
rdd=ss.sparkContext.parallelize([('Alex',21),('Bob',44)])
But if we wish to create a df from this RDD, we need to
sq=SQLContext(sparkContext=ss.sparkContext, sparkSession=ss)
then only we can use SQLContext with RDD/DF created by pandas.
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)])
df=sq.createDataFrame(rdd,schema)
df.collect()
You could use reflection to infer the schema from an RDD of Row objects, e.g.,
from pyspark.sql import Row
mdfRows = mdf.map(lambda p: Row(dbn=p[0], boro=p[1], bus=p[2]))
dfOut = sqlContext.createDataFrame(mdfRows)
Does that achieve the desired result?
I had the same issue and was able to track it down to a single entry which had a value of length 0 (or empty). The _inferScheme command runs on each row of the dataframe and determines the types. By default assumption is that the empty value is a Double while the other is a String. These two types cannot be merged by the _merge_type command. The issue has been filed https://issues.apache.org/jira/browse/SPARK-18178, but the best way around is probably supplying a schema to the createDataFrame command.
The code below reproduces the problem in PySpark 2.0
import pandas as pd
from io import StringIO
test_df = pd.read_csv(StringIO(',Scan Options\n15,SAT2\n16,\n'))
sqlContext.createDataFrame(test_df).registerTempTable('Test')
o_qry = sqlContext.sql("SELECT * FROM Test LIMIT 1")
o_qry.first()