how to perform update & insert in Spark using dataframe
In Spark, you can perform updates and inserts on a DataFrame using the merge operation. The merge operation allows you to update or insert rows in a target DataFrame based on the values in a source DataFrame.
Here is an example of how you could use the merge operation to update rows in a target DataFrame:
Here is an example of how you could use the merge operation to update rows in a target DataFrame: import org.apache.spark.sql.functions._ val targetDF = Seq((1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)).toDF("id", "name", "age") val sourceDF = Seq((2, "Bob", 32), (3, "Charlie", 36), (4, "Dave", 40)).toDF("id", "name", "age") val updatedDF = targetDF.as("t") .merge(sourceDF.as("s"), "t.id = s.id") .whenMatched .updateExpr(Map("age" -> "s.age"))
// The resulting DataFrame will contain the updated rows: (2, “Bob”, 32), (3, “Charlie”, 36)
println(updatedDF.show())
Here is an example of how you could use the merge operation to insert rows into a target DataFrame:
import org.apache.spark.sql.functions._ val targetDF = Seq((1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)).toDF("id", "name", "age") val sourceDF = Seq((2, "Bob", 32), (3, "Charlie", 36), (4, "Dave", 40)).toDF("id", "name", "age") val updatedDF = targetDF.as("t") .merge(sourceDF.as("s"), "t.id = s.id") .whenNotMatched .insertExpr(Map("id" -> "s.id", "name" -> "s.name", "age" -> "s.age"))
// The resulting DataFrame will contain the new row: (4, "Dave", 40) println(updatedDF.show())
The merge action is used to combine two DataFrames or Datasets into a single one by performing a join operation on one or more columns. This action can be used to combine data from different DataFrames or Datasets based on a common key or join column.
The merge operation can be performed using the following methods:
.join(other: Dataset[T], joinExprs: Column, joinType: String)
: This method is used to join two DataFrames or Datasets on one or more columns. Theother
parameter is the DataFrame or Dataset that you want to join to the current DataFrame or Dataset. ThejoinExprs
parameter is the column or columns on which you want to perform the join operation. ThejoinType
parameter is the type of join to be performed, such as inner join, outer join, left join, or right join..union(other: Dataset[T])
: This method is used to combine two DataFrames or Datasets by appending the rows of one DataFrame or Dataset to the other..intersect(other: Dataset[T])
: This method is used to keep the rows in both DataFrames or Datasets that are present in both DataFrames or Datasets..subtract(other: Dataset[T])
: This method is used to keep the rows in the first DataFrame or Dataset that are not present in the second DataFrame or Dataset.
It’s important to note that the merge operation can be costly in terms of memory and computation, so it’s recommended to use it with caution on large datasets.