Important classes of Spark SQL and DataFrames:
- pyspark.sql.SQLContext Main entry point for DataFrame and SQL functionality.
- pyspark.sql.DataFrame A distributed collection of data grouped into named columns.
- pyspark.sql.Column A column expression in a DataFrame.
- pyspark.sql.Row A row of data in a DataFrame.
- pyspark.sql.HiveContext Main entry point for accessing data stored in Apache Hive.
- pyspark.sql.GroupedData Aggregation methods, returned by DataFrame.groupBy().
- pyspark.sql.DataFrameNaFunctions Methods for handling missing data (null values).
- pyspark.sql.DataFrameStatFunctions Methods for statistics functionality.
- pyspark.sql.functions List of built-in functions available for DataFrame.
- pyspark.sql.types List of data types available.
- pyspark.sql.Window For working with window functions.
Main entry point for Spark SQL functionality.
A SQLContext can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files.
Parameters: |
|
---|
Note
Deprecated in 1.3, use createDataFrame() instead.
Caches the specified table in-memory.
New in version 1.0.
Removes all cached tables from the in-memory cache.
New in version 1.3.
Creates a DataFrame from an RDD of tuple/list, list or pandas.DataFrame.
When schema is a list of column names, the type of each column will be inferred from data.
When schema is None, it will try to infer the schema (column names and types) from data, which should be an RDD of Row, or namedtuple, or dict.
If schema inference is needed, samplingRatio is used to determined the ratio of rows used for schema inference. The first row will be used if samplingRatio is None.
Parameters: |
|
---|---|
Returns: |
>>> l = [('Alice', 1)]
>>> sqlContext.createDataFrame(l).collect()
[Row(_1=u'Alice', _2=1)]
>>> sqlContext.createDataFrame(l, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]
>>> d = [{'name': 'Alice', 'age': 1}]
>>> sqlContext.createDataFrame(d).collect()
[Row(age=1, name=u'Alice')]
>>> rdd = sc.parallelize(l)
>>> sqlContext.createDataFrame(rdd).collect()
[Row(_1=u'Alice', _2=1)]
>>> df = sqlContext.createDataFrame(rdd, ['name', 'age'])
>>> df.collect()
[Row(name=u'Alice', age=1)]
>>> from pyspark.sql import Row
>>> Person = Row('name', 'age')
>>> person = rdd.map(lambda r: Person(*r))
>>> df2 = sqlContext.createDataFrame(person)
>>> df2.collect()
[Row(name=u'Alice', age=1)]
>>> from pyspark.sql.types import *
>>> schema = StructType([
... StructField("name", StringType(), True),
... StructField("age", IntegerType(), True)])
>>> df3 = sqlContext.createDataFrame(rdd, schema)
>>> df3.collect()
[Row(name=u'Alice', age=1)]
>>> sqlContext.createDataFrame(df.toPandas()).collect()
[Row(name=u'Alice', age=1)]
>>> sqlContext.createDataFrame(pandas.DataFrame([[1, 2]]).collect())
[Row(0=1, 1=2)]
New in version 1.3.
Creates an external table based on the dataset in a data source.
It returns the DataFrame associated with the external table.
The data source is specified by the source and a set of options. If source is not specified, the default data source configured by spark.sql.sources.default will be used.
Optionally, a schema can be provided as the schema of the returned DataFrame and created external table.
Returns: | DataFrame |
---|
New in version 1.3.
Returns the value of Spark SQL configuration property for the given key.
If the key is not set, returns defaultValue.
New in version 1.3.
Note
Deprecated in 1.3, use createDataFrame() instead.
Loads a text file storing one JSON object per line as a DataFrame.
Note
Deprecated in 1.4, use DataFrameReader.json() instead.
>>> sqlContext.jsonFile('python/test_support/sql/people.json').dtypes
[('age', 'bigint'), ('name', 'string')]
Loads an RDD storing one JSON object per string as a DataFrame.
If the schema is provided, applies the given schema to this JSON dataset. Otherwise, it samples the dataset with ratio samplingRatio to determine the schema.
>>> df1 = sqlContext.jsonRDD(json)
>>> df1.first()
Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None)
>>> df2 = sqlContext.jsonRDD(json, df1.schema)
>>> df2.first()
Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None)
>>> from pyspark.sql.types import *
>>> schema = StructType([
... StructField("field2", StringType()),
... StructField("field3",
... StructType([StructField("field5", ArrayType(IntegerType()))]))
... ])
>>> df3 = sqlContext.jsonRDD(json, schema)
>>> df3.first()
Row(field2=u'row1', field3=Row(field5=None))
New in version 1.0.
Returns the dataset in a data source as a DataFrame.
Note
Deprecated in 1.4, use DataFrameReader.load() instead.
Loads a Parquet file, returning the result as a DataFrame.
Note
Deprecated in 1.4, use DataFrameReader.parquet() instead.
>>> sqlContext.parquetFile('python/test_support/sql/parquet_partitioned').dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
Create a DataFrame with single LongType column named id, containing elements in a range from start to end (exclusive) with step value step.
Parameters: |
|
---|---|
Returns: |
>>> sqlContext.range(1, 7, 2).collect()
[Row(id=1), Row(id=3), Row(id=5)]
>>> sqlContext.range(3).collect()
[Row(id=0), Row(id=1), Row(id=2)]
New in version 1.4.
Returns a DataFrameReader that can be used to read data in as a DataFrame.
Returns: | DataFrameReader |
---|
New in version 1.4.
Registers the given DataFrame as a temporary table in the catalog.
Temporary tables exist only during the lifetime of this instance of SQLContext.
>>> sqlContext.registerDataFrameAsTable(df, "table1")
New in version 1.3.
Registers a lambda function as a UDF so it can be used in SQL statements.
In addition to a name and the function itself, the return type can be optionally specified. When the return type is not given it default to a string and conversion will automatically be done. For any other return type, the produced object must match the specified type.
Parameters: |
|
---|
>>> sqlContext.registerFunction("stringLengthString", lambda x: len(x))
>>> sqlContext.sql("SELECT stringLengthString('test')").collect()
[Row(c0=u'4')]
>>> from pyspark.sql.types import IntegerType
>>> sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
[Row(c0=4)]
>>> from pyspark.sql.types import IntegerType
>>> sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
[Row(c0=4)]
New in version 1.2.
Sets the given Spark SQL configuration property.
New in version 1.3.
Returns a DataFrame representing the result of the given query.
Returns: | DataFrame |
---|
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
New in version 1.0.
Returns the specified table as a DataFrame.
Returns: | DataFrame |
---|
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
True
New in version 1.0.
Returns a list of names of tables in the database dbName.
Parameters: | dbName – string, name of the database to use. Default to the current database. |
---|---|
Returns: | list of table names, in string |
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> "table1" in sqlContext.tableNames()
True
>>> "table1" in sqlContext.tableNames("db")
True
New in version 1.3.
Returns a DataFrame containing names of tables in the given database.
If dbName is not specified, the current database will be used.
The returned DataFrame has two columns: tableName and isTemporary (a column with BooleanType indicating if a table is a temporary one or not).
Parameters: | dbName – string, name of the database to use. |
---|---|
Returns: | DataFrame |
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.tables()
>>> df2.filter("tableName = 'table1'").first()
Row(tableName=u'table1', isTemporary=True)
New in version 1.3.
Returns a UDFRegistration for UDF registration.
Returns: | UDFRegistration |
---|
New in version 1.3.1.
Removes the specified table from the in-memory cache.
New in version 1.0.
A variant of Spark SQL that integrates with data stored in Hive.
Configuration for Hive is read from hive-site.xml on the classpath. It supports running both SQL and HiveQL commands.
Parameters: |
|
---|
Invalidate and refresh all the cached the metadata of the given table. For performance reasons, Spark SQL or the external data source library it uses might cache certain metadata about a table, such as the location of blocks. When those change outside of Spark SQL, users should call this function to invalidate the cache.
A distributed collection of data grouped into named columns.
A DataFrame is equivalent to a relational table in Spark SQL, and can be created using various functions in SQLContext:
people = sqlContext.read.parquet("...")
Once created, it can be manipulated using the various domain-specific-language (DSL) functions defined in: DataFrame, Column.
To select a column from the data frame, use the apply method:
ageCol = people.age
A more concrete example:
# To create DataFrame using SQLContext
people = sqlContext.read.parquet("...")
department = sqlContext.read.parquet("...")
people.filter(people.age > 30).join(department, people.deptId == department.id)) .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
Note
Experimental
New in version 1.3.
Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg()).
>>> df.agg({"age": "max"}).collect()
[Row(MAX(age)=5)]
>>> from pyspark.sql import functions as F
>>> df.agg(F.min(df.age)).collect()
[Row(MIN(age)=2)]
New in version 1.3.
Returns a new DataFrame with an alias set.
>>> from pyspark.sql.functions import *
>>> df_as1 = df.alias("df_as1")
>>> df_as2 = df.alias("df_as2")
>>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
>>> joined_df.select(col("df_as1.name"), col("df_as2.name"), col("df_as2.age")).collect()
[Row(name=u'Alice', name=u'Alice', age=2), Row(name=u'Bob', name=u'Bob', age=5)]
New in version 1.3.
Persists with the default storage level (MEMORY_ONLY_SER).
New in version 1.3.
Returns a new DataFrame that has exactly numPartitions partitions.
Similar to coalesce defined on an RDD, this operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.
>>> df.coalesce(1).rdd.getNumPartitions()
1
New in version 1.4.
Returns all the records as a list of Row.
>>> df.collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
New in version 1.3.
Returns all column names as a list.
>>> df.columns
['age', 'name']
New in version 1.3.
Calculates the correlation of two columns of a DataFrame as a double value. Currently only supports the Pearson Correlation Coefficient. DataFrame.corr() and DataFrameStatFunctions.corr() are aliases of each other.
Parameters: |
|
---|
New in version 1.4.
Calculate the sample covariance for the given columns, specified by their names, as a double value. DataFrame.cov() and DataFrameStatFunctions.cov() are aliases.
Parameters: |
|
---|
New in version 1.4.
Computes a pair-wise frequency table of the given columns. Also known as a contingency table. The number of distinct values for each column should be less than 1e4. At most 1e6 non-zero pair frequencies will be returned. The first column of each row will be the distinct values of col1 and the column names will be the distinct values of col2. The name of the first column will be $col1_$col2. Pairs that have no occurrences will have null as their counts. DataFrame.crosstab() and DataFrameStatFunctions.crosstab() are aliases.
Parameters: |
|
---|
New in version 1.4.
Create a multi-dimensional cube for the current DataFrame using the specified columns, so we can run aggregation on them.
>>> df.cube('name', df.age).count().show()
+-----+----+-----+
| name| age|count|
+-----+----+-----+
| null| 2| 1|
|Alice|null| 1|
| Bob| 5| 1|
| Bob|null| 1|
| null| 5| 1|
| null|null| 2|
|Alice| 2| 1|
+-----+----+-----+
New in version 1.4.
Computes statistics for numeric columns.
This include count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical columns.
Note
This function is meant for exploratory data analysis, as we make no guarantee about the backward compatibility of the schema of the resulting DataFrame.
>>> df.describe().show()
+-------+---+
|summary|age|
+-------+---+
| count| 2|
| mean|3.5|
| stddev|1.5|
| min| 2|
| max| 5|
+-------+---+
>>> df.describe(['age', 'name']).show()
+-------+---+-----+
|summary|age| name|
+-------+---+-----+
| count| 2| 2|
| mean|3.5| null|
| stddev|1.5| null|
| min| 2|Alice|
| max| 5| Bob|
+-------+---+-----+
New in version 1.3.1.
Returns a new DataFrame containing the distinct rows in this DataFrame.
>>> df.distinct().count()
2
New in version 1.3.
Returns a new DataFrame that drops the specified column.
Parameters: | col – a string name of the column to drop, or a Column to drop. |
---|
>>> df.drop('age').collect()
[Row(name=u'Alice'), Row(name=u'Bob')]
>>> df.drop(df.age).collect()
[Row(name=u'Alice'), Row(name=u'Bob')]
>>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
[Row(age=5, height=85, name=u'Bob')]
>>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
[Row(age=5, name=u'Bob', height=85)]
New in version 1.4.
Return a new DataFrame with duplicate rows removed, optionally only considering certain columns.
>>> from pyspark.sql import Row
>>> df = sc.parallelize([ Row(name='Alice', age=5, height=80), Row(name='Alice', age=5, height=80), Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
| 10| 80|Alice|
+---+------+-----+
>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
+---+------+-----+
New in version 1.4.
Return a new DataFrame with duplicate rows removed, optionally only considering certain columns.
>>> from pyspark.sql import Row
>>> df = sc.parallelize([ Row(name='Alice', age=5, height=80), Row(name='Alice', age=5, height=80), Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
| 10| 80|Alice|
+---+------+-----+
>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
+---+------+-----+
New in version 1.4.
Returns a new DataFrame omitting rows with null values. DataFrame.dropna() and DataFrameNaFunctions.drop() are aliases of each other.
Parameters: |
|
---|
>>> df4.na.drop().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10| 80|Alice|
+---+------+-----+
New in version 1.3.1.
Returns all column names and their data types as a list.
>>> df.dtypes
[('age', 'int'), ('name', 'string')]
New in version 1.3.
Prints the (logical and physical) plans to the console for debugging purpose.
Parameters: | extended – boolean, default False. If False, prints only the physical plan. |
---|
>>> df.explain()
PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:...
>>> df.explain(True)
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
...
== Physical Plan ==
...
== RDD ==
New in version 1.3.
Replace null values, alias for na.fill(). DataFrame.fillna() and DataFrameNaFunctions.fill() are aliases of each other.
Parameters: |
|
---|
>>> df4.na.fill(50).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10| 80|Alice|
| 5| 50| Bob|
| 50| 50| Tom|
| 50| 50| null|
+---+------+-----+
>>> df4.na.fill({'age': 50, 'name': 'unknown'}).show()
+---+------+-------+
|age|height| name|
+---+------+-------+
| 10| 80| Alice|
| 5| null| Bob|
| 50| null| Tom|
| 50| null|unknown|
+---+------+-------+
New in version 1.3.1.
Filters rows using the given condition.
where() is an alias for filter().
Parameters: | condition – a Column of types.BooleanType or a string of SQL expression. |
---|
>>> df.filter(df.age > 3).collect()
[Row(age=5, name=u'Bob')]
>>> df.where(df.age == 2).collect()
[Row(age=2, name=u'Alice')]
>>> df.filter("age > 3").collect()
[Row(age=5, name=u'Bob')]
>>> df.where("age = 2").collect()
[Row(age=2, name=u'Alice')]
New in version 1.3.
Returns the first row as a Row.
>>> df.first()
Row(age=2, name=u'Alice')
New in version 1.3.
Returns a new RDD by first applying the f function to each Row, and then flattening the results.
This is a shorthand for df.rdd.flatMap().
>>> df.flatMap(lambda p: p.name).collect()
[u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b']
New in version 1.3.
Applies the f function to all Row of this DataFrame.
This is a shorthand for df.rdd.foreach().
>>> def f(person):
... print(person.name)
>>> df.foreach(f)
New in version 1.3.
Applies the f function to each partition of this DataFrame.
This a shorthand for df.rdd.foreachPartition().
>>> def f(people):
... for person in people:
... print(person.name)
>>> df.foreachPartition(f)
New in version 1.3.
Finding frequent items for columns, possibly with false positives. Using the frequent element count algorithm described in “http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou”. DataFrame.freqItems() and DataFrameStatFunctions.freqItems() are aliases.
Note
This function is meant for exploratory data analysis, as we make no guarantee about the backward compatibility of the schema of the resulting DataFrame.
Parameters: |
|
---|
New in version 1.4.
Groups the DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.
groupby() is an alias for groupBy().
Parameters: | cols – list of columns to group by. Each element should be a column name (string) or an expression (Column). |
---|
>>> df.groupBy().avg().collect()
[Row(AVG(age)=3.5)]
>>> df.groupBy('name').agg({'age': 'mean'}).collect()
[Row(name=u'Alice', AVG(age)=2.0), Row(name=u'Bob', AVG(age)=5.0)]
>>> df.groupBy(df.name).avg().collect()
[Row(name=u'Alice', AVG(age)=2.0), Row(name=u'Bob', AVG(age)=5.0)]
>>> df.groupBy(['name', df.age]).count().collect()
[Row(name=u'Bob', age=5, count=1), Row(name=u'Alice', age=2, count=1)]
New in version 1.3.
Groups the DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.
groupby() is an alias for groupBy().
Parameters: | cols – list of columns to group by. Each element should be a column name (string) or an expression (Column). |
---|
>>> df.groupBy().avg().collect()
[Row(AVG(age)=3.5)]
>>> df.groupBy('name').agg({'age': 'mean'}).collect()
[Row(name=u'Alice', AVG(age)=2.0), Row(name=u'Bob', AVG(age)=5.0)]
>>> df.groupBy(df.name).avg().collect()
[Row(name=u'Alice', AVG(age)=2.0), Row(name=u'Bob', AVG(age)=5.0)]
>>> df.groupBy(['name', df.age]).count().collect()
[Row(name=u'Bob', age=5, count=1), Row(name=u'Alice', age=2, count=1)]
New in version 1.3.
Returns the first n rows.
Parameters: | n – int, default 1. Number of rows to return. |
---|---|
Returns: | If n is greater than 1, return a list of Row. If n is 1, return a single Row. |
>>> df.head()
Row(age=2, name=u'Alice')
>>> df.head(1)
[Row(age=2, name=u'Alice')]
New in version 1.3.
Inserts the contents of this DataFrame into the specified table.
Note
Deprecated in 1.4, use DataFrameWriter.insertInto() instead.
Return a new DataFrame containing rows only in both this frame and another frame.
This is equivalent to INTERSECT in SQL.
New in version 1.3.
Returns True if the collect() and take() methods can be run locally (without any Spark executors).
New in version 1.3.
Joins with another DataFrame, using the given join expression.
The following performs a full outer join between df1 and df2.
Parameters: |
|
---|
>>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
[Row(name=None, height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)]
>>> df.join(df2, 'name').select(df.name, df2.height).collect()
[Row(name=u'Bob', height=85)]
New in version 1.3.
Limits the result count to the number specified.
>>> df.limit(1).collect()
[Row(age=2, name=u'Alice')]
>>> df.limit(0).collect()
[]
New in version 1.3.
Returns a new RDD by applying a the f function to each Row.
This is a shorthand for df.rdd.map().
>>> df.map(lambda p: p.name).collect()
[u'Alice', u'Bob']
New in version 1.3.
Returns a new RDD by applying the f function to each partition.
This is a shorthand for df.rdd.mapPartitions().
>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
>>> def f(iterator): yield 1
>>> rdd.mapPartitions(f).sum()
4
New in version 1.3.
Returns a DataFrameNaFunctions for handling missing values.
New in version 1.3.1.
Returns a new DataFrame sorted by the specified column(s).
Parameters: |
|
---|
>>> df.sort(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.sort("age", ascending=False).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
New in version 1.3.
Sets the storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY_SER).
New in version 1.3.
Prints out the schema in the tree format.
>>> df.printSchema()
root
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
New in version 1.3.
Randomly splits this DataFrame with the provided weights.
Parameters: |
|
---|
>>> splits = df4.randomSplit([1.0, 2.0], 24)
>>> splits[0].count()
1
>>> splits[1].count()
3
New in version 1.4.
Returns the content as an pyspark.RDD of Row.
New in version 1.3.
Note
Deprecated in 1.4, use registerTempTable() instead.
Registers this RDD as a temporary table using the given name.
The lifetime of this temporary table is tied to the SQLContext that was used to create this DataFrame.
>>> df.registerTempTable("people")
>>> df2 = sqlContext.sql("select * from people")
>>> sorted(df.collect()) == sorted(df2.collect())
True
New in version 1.3.
Returns a new DataFrame that has exactly numPartitions partitions.
>>> df.repartition(10).rdd.getNumPartitions()
10
New in version 1.3.
Returns a new DataFrame replacing a value with another value. DataFrame.replace() and DataFrameNaFunctions.replace() are aliases of each other.
Parameters: |
|
---|
>>> df4.na.replace(10, 20).show()
+----+------+-----+
| age|height| name|
+----+------+-----+
| 20| 80|Alice|
| 5| null| Bob|
|null| null| Tom|
|null| null| null|
+----+------+-----+
>>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
+----+------+----+
| age|height|name|
+----+------+----+
| 10| 80| A|
| 5| null| B|
|null| null| Tom|
|null| null|null|
+----+------+----+
New in version 1.4.
Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them.
>>> df.rollup('name', df.age).count().show()
+-----+----+-----+
| name| age|count|
+-----+----+-----+
|Alice|null| 1|
| Bob| 5| 1|
| Bob|null| 1|
| null|null| 2|
|Alice| 2| 1|
+-----+----+-----+
New in version 1.4.
Returns a sampled subset of this DataFrame.
>>> df.sample(False, 0.5, 42).count()
1
New in version 1.3.
Saves the contents of the DataFrame to a data source.
Note
Deprecated in 1.4, use DataFrameWriter.save() instead.
New in version 1.3.
Saves the contents as a Parquet file, preserving the schema.
Note
Deprecated in 1.4, use DataFrameWriter.parquet() instead.
Saves the contents of this DataFrame to a data source as a table.
Note
Deprecated in 1.4, use DataFrameWriter.saveAsTable() instead.
Returns the schema of this DataFrame as a types.StructType.
>>> df.schema
StructType(List(StructField(age,IntegerType,true),StructField(name,StringType,true)))
New in version 1.3.
Projects a set of expressions and returns a new DataFrame.
Parameters: | cols – list of column names (string) or expressions (Column). If one of the column names is ‘*’, that column is expanded to include all columns in the current DataFrame. |
---|
>>> df.select('*').collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.select('name', 'age').collect()
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
>>> df.select(df.name, (df.age + 10).alias('age')).collect()
[Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
New in version 1.3.
Projects a set of SQL expressions and returns a new DataFrame.
This is a variant of select() that accepts SQL expressions.
>>> df.selectExpr("age * 2", "abs(age)").collect()
[Row((age * 2)=4, Abs(age)=2), Row((age * 2)=10, Abs(age)=5)]
New in version 1.3.
Prints the first n rows to the console.
>>> df
DataFrame[age: int, name: string]
>>> df.show()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 5| Bob|
+---+-----+
New in version 1.3.
Returns a new DataFrame sorted by the specified column(s).
Parameters: |
|
---|
>>> df.sort(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.sort("age", ascending=False).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
New in version 1.3.
Returns a DataFrameStatFunctions for statistic functions.
New in version 1.4.
Return a new DataFrame containing rows in this frame but not in another frame.
This is equivalent to EXCEPT in SQL.
New in version 1.3.
Returns the first num rows as a list of Row.
>>> df.take(2)
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
New in version 1.3.
Converts a DataFrame into a RDD of string.
Each row is turned into a JSON document as one element in the returned RDD.
>>> df.toJSON().first()
u'{"age":2,"name":"Alice"}'
New in version 1.3.
Returns the contents of this DataFrame as Pandas pandas.DataFrame.
This is only available if Pandas is installed and available.
>>> df.toPandas()
age name
0 2 Alice
1 5 Bob
New in version 1.3.
Return a new DataFrame containing union of rows in this frame and another frame.
This is equivalent to UNION ALL in SQL.
New in version 1.3.
Marks the DataFrame as non-persistent, and remove all blocks for it from memory and disk.
New in version 1.3.
Filters rows using the given condition.
where() is an alias for filter().
Parameters: | condition – a Column of types.BooleanType or a string of SQL expression. |
---|
>>> df.filter(df.age > 3).collect()
[Row(age=5, name=u'Bob')]
>>> df.where(df.age == 2).collect()
[Row(age=2, name=u'Alice')]
>>> df.filter("age > 3").collect()
[Row(age=5, name=u'Bob')]
>>> df.where("age = 2").collect()
[Row(age=2, name=u'Alice')]
New in version 1.3.
Returns a new DataFrame by adding a column.
Parameters: |
|
---|
>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
New in version 1.3.
Returns a new DataFrame by renaming an existing column.
Parameters: |
|
---|
>>> df.withColumnRenamed('age', 'age2').collect()
[Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')]
New in version 1.3.
Interface for saving the content of the DataFrame out into external storage.
Returns: | DataFrameWriter |
---|
New in version 1.4.
A set of methods for aggregations on a DataFrame, created by DataFrame.groupBy().
Note
Experimental
New in version 1.3.
Compute aggregates and returns the result as a DataFrame.
The available aggregate functions are avg, max, min, sum, count.
If exprs is a single dict mapping from string to string, then the key is the column to perform aggregation on, and the value is the aggregate function.
Alternatively, exprs can also be a list of aggregate Column expressions.
Parameters: | exprs – a dict mapping from column name (string) to aggregate functions (string), or a list of Column. |
---|
>>> gdf = df.groupBy(df.name)
>>> gdf.agg({"*": "count"}).collect()
[Row(name=u'Alice', COUNT(1)=1), Row(name=u'Bob', COUNT(1)=1)]
>>> from pyspark.sql import functions as F
>>> gdf.agg(F.min(df.age)).collect()
[Row(name=u'Alice', MIN(age)=2), Row(name=u'Bob', MIN(age)=5)]
New in version 1.3.
Computes average values for each numeric columns for each group.
Parameters: | cols – list of column names (string). Non-numeric columns are ignored. |
---|
>>> df.groupBy().avg('age').collect()
[Row(AVG(age)=3.5)]
>>> df3.groupBy().avg('age', 'height').collect()
[Row(AVG(age)=3.5, AVG(height)=82.5)]
New in version 1.3.
Counts the number of records for each group.
>>> df.groupBy(df.age).count().collect()
[Row(age=2, count=1), Row(age=5, count=1)]
New in version 1.3.
Computes the max value for each numeric columns for each group.
>>> df.groupBy().max('age').collect()
[Row(MAX(age)=5)]
>>> df3.groupBy().max('age', 'height').collect()
[Row(MAX(age)=5, MAX(height)=85)]
New in version 1.3.
Computes average values for each numeric columns for each group.
Parameters: | cols – list of column names (string). Non-numeric columns are ignored. |
---|
>>> df.groupBy().mean('age').collect()
[Row(AVG(age)=3.5)]
>>> df3.groupBy().mean('age', 'height').collect()
[Row(AVG(age)=3.5, AVG(height)=82.5)]
New in version 1.3.
Computes the min value for each numeric column for each group.
Parameters: | cols – list of column names (string). Non-numeric columns are ignored. |
---|
>>> df.groupBy().min('age').collect()
[Row(MIN(age)=2)]
>>> df3.groupBy().min('age', 'height').collect()
[Row(MIN(age)=2, MIN(height)=80)]
New in version 1.3.
Compute the sum for each numeric columns for each group.
Parameters: | cols – list of column names (string). Non-numeric columns are ignored. |
---|
>>> df.groupBy().sum('age').collect()
[Row(SUM(age)=7)]
>>> df3.groupBy().sum('age', 'height').collect()
[Row(SUM(age)=7, SUM(height)=165)]
New in version 1.3.
A column in a DataFrame.
Column instances can be created by:
# 1. Select a column out of a DataFrame
df.colName
df["colName"]
# 2. Create from an expression
df.colName + 1
1 / df.colName
Note
Experimental
New in version 1.3.
Returns this column aliased with a new name or names (in the case of expressions that return more than one column, such as explode).
>>> df.select(df.age.alias("age2")).collect()
[Row(age2=2), Row(age2=5)]
New in version 1.3.
Returns a sort expression based on the ascending order of the given column name.
Convert the column into type dataType.
>>> df.select(df.age.cast("string").alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]
>>> df.select(df.age.cast(StringType()).alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]
New in version 1.3.
A boolean expression that is evaluated to true if the value of this expression is between the given columns.
>>> df.select(df.name, df.age.between(2, 4)).show()
+-----+--------------------------+
| name|((age >= 2) && (age <= 4))|
+-----+--------------------------+
|Alice| true|
| Bob| false|
+-----+--------------------------+
New in version 1.3.
binary operator
binary operator
binary operator
Convert the column into type dataType.
>>> df.select(df.age.cast("string").alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]
>>> df.select(df.age.cast(StringType()).alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]
New in version 1.3.
Returns a sort expression based on the descending order of the given column name.
binary operator
An expression that gets a field by name in a StructField.
>>> from pyspark.sql import Row
>>> df = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
>>> df.select(df.r.getField("b")).show()
+----+
|r[b]|
+----+
| b|
+----+
>>> df.select(df.r.a).show()
+----+
|r[a]|
+----+
| 1|
+----+
New in version 1.3.
An expression that gets an item at position ordinal out of a list, or gets an item by key out of a dict.
>>> df = sc.parallelize([([1, 2], {"key": "value"})]).toDF(["l", "d"])
>>> df.select(df.l.getItem(0), df.d.getItem("key")).show()
+----+------+
|l[0]|d[key]|
+----+------+
| 1| value|
+----+------+
>>> df.select(df.l[0], df.d["key"]).show()
+----+------+
|l[0]|d[key]|
+----+------+
| 1| value|
+----+------+
New in version 1.3.
A boolean expression that is evaluated to true if the value of this expression is contained by the evaluated values of the arguments.
>>> df[df.name.inSet("Bob", "Mike")].collect()
[Row(age=5, name=u'Bob')]
>>> df[df.age.inSet([1, 2, 3])].collect()
[Row(age=2, name=u'Alice')]
New in version 1.3.
True if the current expression is not null.
True if the current expression is null.
binary operator
Evaluates a list of conditions and returns one of multiple possible result expressions. If Column.otherwise() is not invoked, None is returned for unmatched conditions.
See pyspark.sql.functions.when() for example usage.
Parameters: | value – a literal value, or a Column expression. |
---|
>>> from pyspark.sql import functions as F
>>> df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
+-----+---------------------------------+
| name|CASE WHEN (age > 3) THEN 1 ELSE 0|
+-----+---------------------------------+
|Alice| 0|
| Bob| 1|
+-----+---------------------------------+
New in version 1.4.
Define a windowing column.
Parameters: | window – a WindowSpec |
---|---|
Returns: | a Column |
>>> from pyspark.sql import Window
>>> window = Window.partitionBy("name").orderBy("age").rowsBetween(-1, 1)
>>> from pyspark.sql.functions import rank, min
>>> # df.select(rank().over(window), min('age').over(window))
Note
Window functions is only supported with HiveContext in 1.4
New in version 1.4.
binary operator
binary operator
Return a Column which is a substring of the column.
Parameters: |
|
---|
>>> df.select(df.name.substr(1, 3).alias("col")).collect()
[Row(col=u'Ali'), Row(col=u'Bob')]
New in version 1.3.
Evaluates a list of conditions and returns one of multiple possible result expressions. If Column.otherwise() is not invoked, None is returned for unmatched conditions.
See pyspark.sql.functions.when() for example usage.
Parameters: |
---|
>>> from pyspark.sql import functions as F
>>> df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
+-----+--------------------------------------------------------+
| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0|
+-----+--------------------------------------------------------+
|Alice| -1|
| Bob| 1|
+-----+--------------------------------------------------------+
New in version 1.4.
A row in DataFrame. The fields in it can be accessed like attributes.
Row can be used to create a row object by using named arguments, the fields will be sorted by names.
>>> row = Row(name="Alice", age=11)
>>> row
Row(age=11, name='Alice')
>>> row.name, row.age
('Alice', 11)
Row also can be used to create another Row like class, then it could be used to create Row objects, such as
>>> Person = Row("name", "age")
>>> Person
<Row(name, age)>
>>> Person("Alice", 11)
Row(name='Alice', age=11)
Return as an dict
Functionality for working with missing data in DataFrame.
New in version 1.4.
Returns a new DataFrame omitting rows with null values. DataFrame.dropna() and DataFrameNaFunctions.drop() are aliases of each other.
Parameters: |
|
---|
>>> df4.na.drop().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10| 80|Alice|
+---+------+-----+
New in version 1.3.1.
Replace null values, alias for na.fill(). DataFrame.fillna() and DataFrameNaFunctions.fill() are aliases of each other.
Parameters: |
|
---|
>>> df4.na.fill(50).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10| 80|Alice|
| 5| 50| Bob|
| 50| 50| Tom|
| 50| 50| null|
+---+------+-----+
>>> df4.na.fill({'age': 50, 'name': 'unknown'}).show()
+---+------+-------+
|age|height| name|
+---+------+-------+
| 10| 80| Alice|
| 5| null| Bob|
| 50| null| Tom|
| 50| null|unknown|
+---+------+-------+
New in version 1.3.1.
Returns a new DataFrame replacing a value with another value. DataFrame.replace() and DataFrameNaFunctions.replace() are aliases of each other.
Parameters: |
|
---|
>>> df4.na.replace(10, 20).show()
+----+------+-----+
| age|height| name|
+----+------+-----+
| 20| 80|Alice|
| 5| null| Bob|
|null| null| Tom|
|null| null| null|
+----+------+-----+
>>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
+----+------+----+
| age|height|name|
+----+------+----+
| 10| 80| A|
| 5| null| B|
|null| null| Tom|
|null| null|null|
+----+------+----+
New in version 1.4.
Functionality for statistic functions with DataFrame.
New in version 1.4.
Calculates the correlation of two columns of a DataFrame as a double value. Currently only supports the Pearson Correlation Coefficient. DataFrame.corr() and DataFrameStatFunctions.corr() are aliases of each other.
Parameters: |
|
---|
New in version 1.4.
Calculate the sample covariance for the given columns, specified by their names, as a double value. DataFrame.cov() and DataFrameStatFunctions.cov() are aliases.
Parameters: |
|
---|
New in version 1.4.
Computes a pair-wise frequency table of the given columns. Also known as a contingency table. The number of distinct values for each column should be less than 1e4. At most 1e6 non-zero pair frequencies will be returned. The first column of each row will be the distinct values of col1 and the column names will be the distinct values of col2. The name of the first column will be $col1_$col2. Pairs that have no occurrences will have null as their counts. DataFrame.crosstab() and DataFrameStatFunctions.crosstab() are aliases.
Parameters: |
|
---|
New in version 1.4.
Finding frequent items for columns, possibly with false positives. Using the frequent element count algorithm described in “http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou”. DataFrame.freqItems() and DataFrameStatFunctions.freqItems() are aliases.
Note
This function is meant for exploratory data analysis, as we make no guarantee about the backward compatibility of the schema of the resulting DataFrame.
Parameters: |
|
---|
New in version 1.4.
Utility functions for defining window in DataFrames.
For example:
>>> # PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
>>> window = Window.partitionBy("country").orderBy("date").rowsBetween(-sys.maxsize, 0)
>>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
>>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)
Note
Experimental
New in version 1.4.
Creates a WindowSpec with the partitioning defined.
New in version 1.4.
Creates a WindowSpec with the partitioning defined.
New in version 1.4.
A window specification that defines the partitioning, ordering, and frame boundaries.
Use the static methods in Window to create a WindowSpec.
Note
Experimental
New in version 1.4.
Defines the ordering columns in a WindowSpec.
Parameters: | cols – names of columns or expressions |
---|
New in version 1.4.
Defines the partitioning columns in a WindowSpec.
Parameters: | cols – names of columns or expressions |
---|
New in version 1.4.
Defines the frame boundaries, from start (inclusive) to end (inclusive).
Both start and end are relative from the current row. For example, “0” means “current row”, while “-1” means one off before the current row, and “5” means the five off after the current row.
Parameters: |
|
---|
New in version 1.4.
Defines the frame boundaries, from start (inclusive) to end (inclusive).
Both start and end are relative positions from the current row. For example, “0” means “current row”, while “-1” means the row before the current row, and “5” means the fifth row after the current row.
Parameters: |
|
---|
New in version 1.4.
Interface used to load a DataFrame from external storage systems (e.g. file systems, key-value stores, etc). Use SQLContext.read() to access this.
::Note: Experimental
New in version 1.4.
Specifies the input data source format.
Parameters: | source – string, name of the data source, e.g. ‘json’, ‘parquet’. |
---|
>>> df = sqlContext.read.format('json').load('python/test_support/sql/people.json')
>>> df.dtypes
[('age', 'bigint'), ('name', 'string')]
New in version 1.4.
Construct a DataFrame representing the database table accessible via JDBC URL url named table and connection properties.
The column parameter could be used to partition the table, then it will be retrieved in parallel based on the parameters passed to this function.
The predicates parameter gives a list expressions suitable for inclusion in WHERE clauses; each one defines one partition of the DataFrame.
::Note: Don’t create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.
Parameters: |
|
---|---|
Returns: | a DataFrame |
New in version 1.4.
Loads a JSON file (one object per line) and returns the result as a :class`DataFrame`.
If the schema parameter is not specified, this function goes through the input once to determine the input schema.
Parameters: |
|
---|
>>> df = sqlContext.read.json('python/test_support/sql/people.json')
>>> df.dtypes
[('age', 'bigint'), ('name', 'string')]
New in version 1.4.
Loads data from a data source and returns it as a :class`DataFrame`.
Parameters: |
|
---|
>>> df = sqlContext.read.load('python/test_support/sql/parquet_partitioned')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
New in version 1.4.
Adds an input option for the underlying data source.
New in version 1.5.
Adds input options for the underlying data source.
New in version 1.4.
Loads a Parquet file, returning the result as a DataFrame.
>>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
New in version 1.4.
Specifies the input schema.
Some data sources (e.g. JSON) can infer the input schema automatically from data. By specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading.
Parameters: | schema – a StructType object |
---|
New in version 1.4.
Returns the specified table as a DataFrame.
Parameters: | tableName – string, name of the table. |
---|
>>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned')
>>> df.registerTempTable('tmpTable')
>>> sqlContext.read.table('tmpTable').dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
New in version 1.4.
Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems, key-value stores, etc). Use DataFrame.write() to access this.
::Note: Experimental
New in version 1.4.
Specifies the underlying output data source.
Parameters: | source – string, name of the data source, e.g. ‘json’, ‘parquet’. |
---|
>>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data'))
New in version 1.4.
Inserts the content of the DataFrame to the specified table.
It requires that the schema of the class:DataFrame is the same as the schema of the table.
Optionally overwriting any existing data.
New in version 1.4.
Saves the content of the DataFrame to a external database table via JDBC.
Note
Don’t create too many partitions in parallel on a large cluster; otherwise Spark might crash your external database systems.
Parameters: |
|
---|
New in version 1.4.
Saves the content of the DataFrame in JSON format at the specified path.
Parameters: |
|
---|
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
New in version 1.4.
Specifies the behavior when data or table already exists.
Options include:
>>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
New in version 1.4.
Adds an output option for the underlying data source.
New in version 1.5.
Adds output options for the underlying data source.
New in version 1.4.
Saves the content of the DataFrame in Parquet format at the specified path.
Parameters: |
|
---|
>>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
New in version 1.4.
Partitions the output by the given columns on the file system.
If specified, the output is laid out on the file system similar to Hive’s partitioning scheme.
Parameters: | cols – name of columns |
---|
>>> df.write.partitionBy('year', 'month').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
New in version 1.4.
Saves the contents of the DataFrame to a data source.
The data source is specified by the format and a set of options. If format is not specified, the default data source configured by spark.sql.sources.default will be used.
Parameters: |
|
---|
>>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
New in version 1.4.
Saves the content of the DataFrame as the specified table.
In the case the table already exists, behavior of this function depends on the save mode, specified by the mode function (default to throwing an exception). When mode is Overwrite, the schema of the [[DataFrame]] does not need to be the same as that of the existing table.
Parameters: |
|
---|
New in version 1.4.
Null type.
The data type representing None, used for the types that cannot be inferred.
Decimal (decimal.Decimal) data type.
Long data type, i.e. a signed 64-bit integer.
If the values are beyond the range of [-9223372036854775808, 9223372036854775807], please use DecimalType.
Array data type.
Parameters: |
|
---|
Map data type.
Parameters: |
---|
Keys in a map data type are not allowed to be null (None).
A field in StructType.
Parameters: |
|
---|
A collections of builtin functions
Computes the absolute value.
New in version 1.3.
Computes the cosine inverse of the given value; the returned angle is in the range0.0 through pi.
New in version 1.4.
Returns a new Column for approximate distinct count of col.
>>> df.agg(approxCountDistinct(df.age).alias('c')).collect()
[Row(c=2)]
New in version 1.3.
Creates a new array column.
Parameters: | cols – list of column names (string) or list of Column expressions that have the same data type. |
---|
>>> df.select(array('age', 'age').alias("arr")).collect()
[Row(arr=[2, 2]), Row(arr=[5, 5])]
>>> df.select(array([df.age, df.age]).alias("arr")).collect()
[Row(arr=[2, 2]), Row(arr=[5, 5])]
New in version 1.4.
Returns a sort expression based on the ascending order of the given column name.
New in version 1.3.
Computes the sine inverse of the given value; the returned angle is in the range-pi/2 through pi/2.
New in version 1.4.
Computes the tangent inverse of the given value.
New in version 1.4.
Returns the angle theta from the conversion of rectangular coordinates (x, y) topolar coordinates (r, theta).
New in version 1.4.
Aggregate function: returns the average of the values in a group.
New in version 1.3.
Computes bitwise not.
New in version 1.4.
Computes the cube-root of the given value.
New in version 1.4.
Computes the ceiling of the given value.
New in version 1.4.
Returns the first column that is not null.
>>> cDf = sqlContext.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))
>>> cDf.show()
+----+----+
| a| b|
+----+----+
|null|null|
| 1|null|
|null| 2|
+----+----+
>>> cDf.select(coalesce(cDf["a"], cDf["b"])).show()
+-------------+
|Coalesce(a,b)|
+-------------+
| null|
| 1|
| 2|
+-------------+
>>> cDf.select('*', coalesce(cDf["a"], lit(0.0))).show()
+----+----+---------------+
| a| b|Coalesce(a,0.0)|
+----+----+---------------+
|null|null| 0.0|
| 1|null| 1.0|
|null| 2| 0.0|
+----+----+---------------+
New in version 1.4.
Returns a Column based on the given column name.
New in version 1.3.
Returns a Column based on the given column name.
New in version 1.3.
Computes the cosine of the given value.
New in version 1.4.
Computes the hyperbolic cosine of the given value.
New in version 1.4.
Aggregate function: returns the number of items in a group.
New in version 1.3.
Returns a new Column for distinct count of col or cols.
>>> df.agg(countDistinct(df.age, df.name).alias('c')).collect()
[Row(c=2)]
>>> df.agg(countDistinct("age", "name").alias('c')).collect()
[Row(c=2)]
New in version 1.3.
Window function: returns the cumulative distribution of values within a window partition, i.e. the fraction of rows that are below the current row.
This is equivalent to the CUME_DIST function in SQL.
New in version 1.4.
Window function: returns the rank of rows within a window partition, without any gaps.
The difference between rank and denseRank is that denseRank leaves no gaps in ranking sequence when there are ties. That is, if you were ranking a competition using denseRank and had three people tie for second place, you would say that all three were in second place and that the next person came in third.
This is equivalent to the DENSE_RANK function in SQL.
New in version 1.4.
Returns a sort expression based on the descending order of the given column name.
New in version 1.3.
Computes the exponential of the given value.
New in version 1.4.
Returns a new row for each element in the given array or map.
>>> from pyspark.sql import Row
>>> eDF = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
>>> eDF.select(explode(eDF.intlist).alias("anInt")).collect()
[Row(anInt=1), Row(anInt=2), Row(anInt=3)]
>>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
+---+-----+
|key|value|
+---+-----+
| a| b|
+---+-----+
New in version 1.4.
Computes the exponential of the given value minus one.
New in version 1.4.
Aggregate function: returns the first value in a group.
New in version 1.3.
Computes the floor of the given value.
New in version 1.4.
Computes sqrt(a^2^ + b^2^) without intermediate overflow or underflow.
New in version 1.4.
Window function: returns the value that is offset rows before the current row, and defaultValue if there is less than offset rows before the current row. For example, an offset of one will return the previous row at any given point in the window partition.
This is equivalent to the LAG function in SQL.
Parameters: |
|
---|
New in version 1.4.
Aggregate function: returns the last value in a group.
New in version 1.3.
Window function: returns the value that is offset rows after the current row, and defaultValue if there is less than offset rows after the current row. For example, an offset of one will return the next row at any given point in the window partition.
This is equivalent to the LEAD function in SQL.
Parameters: |
|
---|
New in version 1.4.
Creates a Column of literal value.
New in version 1.3.
Computes the natural logarithm of the given value.
New in version 1.4.
Computes the logarithm of the given value in Base 10.
New in version 1.4.
Computes the natural logarithm of the given value plus one.
New in version 1.4.
Converts a string expression to upper case.
New in version 1.3.
Aggregate function: returns the maximum value of the expression in a group.
New in version 1.3.
Aggregate function: returns the average of the values in a group.
New in version 1.3.
Aggregate function: returns the minimum value of the expression in a group.
New in version 1.3.
A column that generates monotonically increasing 64-bit integers.
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records.
As an example, consider a DataFrame with two partitions, each with 3 records. This expression would return the following IDs: 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
>>> df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), (3,)]).toDF(['col1'])
>>> df0.select(monotonicallyIncreasingId().alias('id')).collect()
[Row(id=0), Row(id=1), Row(id=2), Row(id=8589934592), Row(id=8589934593), Row(id=8589934594)]
New in version 1.4.
Window function: returns a group id from 1 to n (inclusive) in a round-robin fashion in a window partition. Fow example, if n is 3, the first row will get 1, the second row will get 2, the third row will get 3, and the fourth row will get 1...
This is equivalent to the NTILE function in SQL.
Parameters: | n – an integer |
---|
New in version 1.4.
Window function: returns the relative rank (i.e. percentile) of rows within a window partition.
This is equivalent to the PERCENT_RANK function in SQL.
New in version 1.4.
Returns the value of the first argument raised to the power of the second argument.
New in version 1.4.
Generates a random column with i.i.d. samples from U[0.0, 1.0].
New in version 1.4.
Generates a column with i.i.d. samples from the standard normal distribution.
New in version 1.4.
Window function: returns the rank of rows within a window partition.
The difference between rank and denseRank is that denseRank leaves no gaps in ranking sequence when there are ties. That is, if you were ranking a competition using denseRank and had three people tie for second place, you would say that all three were in second place and that the next person came in third.
This is equivalent to the RANK function in SQL.
New in version 1.4.
Returns the double value that is closest in value to the argument and is equal to a mathematical integer.
New in version 1.4.
Window function: returns a sequential number starting at 1 within a window partition.
This is equivalent to the ROW_NUMBER function in SQL.
New in version 1.4.
Computes the signum of the given value.
New in version 1.4.
Computes the sine of the given value.
New in version 1.4.
Computes the hyperbolic sine of the given value.
New in version 1.4.
A column for partition ID of the Spark task.
Note that this is indeterministic because it depends on data partitioning and task scheduling.
>>> df.repartition(1).select(sparkPartitionId().alias("pid")).collect()
[Row(pid=0), Row(pid=0)]
New in version 1.4.
Computes the square root of the specified float value.
New in version 1.3.
Creates a new struct column.
Parameters: | cols – list of column names (string) or list of Column expressions that are named or aliased. |
---|
>>> df.select(struct('age', 'name').alias("struct")).collect()
[Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
>>> df.select(struct([df.age, df.name]).alias("struct")).collect()
[Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
New in version 1.4.
Aggregate function: returns the sum of all values in the expression.
New in version 1.3.
Aggregate function: returns the sum of distinct values in the expression.
New in version 1.3.
Computes the tangent of the given value.
New in version 1.4.
Computes the hyperbolic tangent of the given value.
New in version 1.4.
Converts an angle measured in radians to an approximately equivalent angle measured in degrees.
New in version 1.4.
Converts an angle measured in degrees to an approximately equivalent angle measured in radians.
New in version 1.4.
Creates a Column expression representing a user defined function (UDF).
>>> from pyspark.sql.types import IntegerType
>>> slen = udf(lambda s: len(s), IntegerType())
>>> df.select(slen(df.name).alias('slen')).collect()
[Row(slen=5), Row(slen=3)]
New in version 1.3.
Converts a string expression to upper case.
New in version 1.3.
Evaluates a list of conditions and returns one of multiple possible result expressions. If Column.otherwise() is not invoked, None is returned for unmatched conditions.
Parameters: |
|
---|
>>> df.select(when(df['age'] == 2, 3).otherwise(4).alias("age")).collect()
[Row(age=3), Row(age=4)]
>>> df.select(when(df.age == 2, df.age + 1).alias("age")).collect()
[Row(age=3), Row(age=None)]
New in version 1.4.