sql to json

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// Initialize Spark session
val spark = SparkSession.builder()
  .appName("Convert Columns to JSON with Actual Column Names")
  .enableHiveSupport()
  .getOrCreate()

// Load the table that contains the JSON mappings
val aliasMappingTableDF = spark.table("alias_mapping_table")

// Explode the 'temp' array within the 'mapping_json' struct
val aliasMappingDF = aliasMappingTableDF
  .withColumn("mapping", explode(col("mapping_json.temp"))) // Explode the array
  .select(
    col("mapping.aliasName").as("aliasCol"),  // Access aliasName within the exploded struct
    col("mapping.actName").as("actCol")       // Access actName within the exploded struct
  )

// Convert the alias mapping to a Map
val aliasToActualMap = aliasMappingDF
  .as[(String, String)]
  .collect()
  .toMap

// Load the table that contains the data
val originalDF = spark.table("original_table")

// Convert the columns to a JSON structure with actual column names
val jsonColumns = aliasToActualMap.map { case (aliasCol, actCol) =>
  col(aliasCol).as(actCol)
}.toSeq

// Create the final DataFrame with the JSON column
val finalDF = originalDF
  .select(to_json(struct(jsonColumns: _*)).as("json_columns"))
  .withColumn("unique_id", col("tbl.keycolval")) // Adjust according to your query logic
  .withColumn("hrs_date", col("hrs_date")) // Adjust according to your query logic
  .withColumn("static_col1", lit("static_value1")) // Static columns
  .withColumn("static_col2", lit("static_value2"))

// Show the transformed data
finalDF.show(false)

// Optionally, save the transformed data back to a table
finalDF.write.mode("overwrite").saveAsTable("prt_src_dt_transformed")

// Stop the Spark session
spark.stop()

 

  1. Accessing Fields:
    • Once exploded, the aliasName and actName fields are accessed as aliasCol and actCol respectively.
  2. JSON Conversion:
    • The alias-to-actual column mapping is then used to convert the selected columns to JSON using the actual column names.

 

 

 

FINAL UPDATED

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser

// Initialize Spark session
val spark = SparkSession.builder()
  .appName("Dynamic SQL Execution with JSON Conversion")
  .enableHiveSupport()
  .getOrCreate()

// Load the table containing SQL queries
val sqlQueriesDF = spark.table("sql_queries_table")

// Load the alias mapping table
val aliasMappingTableDF = spark.table("alias_mapping_table")

// Explode the 'temp' array within the 'mapping_json' struct
val aliasMappingDF = aliasMappingTableDF
  .withColumn("mapping", explode(col("mapping_json.temp"))) // Explode the array
  .select(
    col("mapping.aliasName").as("aliasCol"),  // Access aliasName within the exploded struct
    col("mapping.actName").as("actCol")       // Access actName within the exploded struct
  )

// Convert the alias mapping to a Map
val aliasToActualMap = aliasMappingDF
  .as[(String, String)]
  .collect()
  .toMap

// Filter the queries based on a condition (e.g., query_id between 1 and 10)
val filteredQueriesDF = sqlQueriesDF.filter(col("query_id") >= 1 && col("query_id") <= 10)

// Collect the SQL queries to run them
val sqlQueriesToRun = filteredQueriesDF.select("sql_query").as[String].collect()

// Function to extract and modify the SQL query dynamically
def modifyQueryForJson(sqlQuery: String, aliasToActualMap: Map[String, String], staticValues: Map[String, String]): String = {
  println(s"Original SQL Query:\n$sqlQuery\n")

  // Case-insensitive regex to identify the WITH clause
  val withClauseRegex = "(?i)WITH\\s+[\\w\\W]+?\\)\\s*".r
  val withClause = withClauseRegex.findFirstIn(sqlQuery).getOrElse("")
  
  // Case-insensitive regex to extract the last SELECT clause
  val selectClause = sqlQuery.split("(?i)SELECT").last.split("(?i)FROM")(0).trim
  val columns = selectClause.split(",").map(_.trim)
  
  // Identify the unique_id column from the WITH clause or the SELECT clause
  val uniqueIdColumn = if (withClause.nonEmpty) {
    withClause.split(",").find(_.toLowerCase.contains(".keycolval")).getOrElse("")
  } else {
    columns.find(_.toLowerCase.contains(".keycolval")).getOrElse("")
  }
  
  // Identify the hrs_date column from the WITH clause or the SELECT clause
  val hrsDateColumn = if (withClause.nonEmpty) {
    withClause.split(",").find(_.toLowerCase.contains("hrs_date")).getOrElse("")
  } else {
    columns.find(_.toLowerCase.contains("hrs_date")).getOrElse("")
  }

  println(s"Identified unique_id column: $uniqueIdColumn")
  println(s"Identified hrs_date column: $hrsDateColumn")

  // Filter out the unique_id and hrs_date columns from the columns list
  val jsonColumns = columns.filterNot(c => c.equalsIgnoreCase(uniqueIdColumn) || c.equalsIgnoreCase(hrsDateColumn))

  // Replace alias column names with actual column names
  val actualColumns = jsonColumns.map { colName =>
    val actualColName = aliasToActualMap.getOrElse(colName, colName)
    s"$colName AS `$actualColName`"
  }

  println(s"Columns for JSON conversion with actual names: ${actualColumns.mkString(", ")}")

  // Create the JSON conversion part of the query
  val jsonStruct = actualColumns.mkString(", ")
  val jsonExpression = s"to_json(struct($jsonStruct)) AS json_columns"

  // Rebuild the SELECT clause with static values
  val staticValuesExpression = staticValues.map { case (col, value) => s"'$value' AS $col" }.mkString(", ")
  val modifiedSelectClause = s"$jsonExpression, $uniqueIdColumn AS unique_id, $hrsDateColumn AS hrs_date, $staticValuesExpression"

  // Replace the original SELECT clause with the modified one
  val modifiedSqlQuery = sqlQuery.replace(selectClause, modifiedSelectClause)

  // Prepend the WITH clause back to the modified query if it was present
  val finalQuery = if (withClause.nonEmpty) {
    modifiedSqlQuery
  } else {
    modifiedSqlQuery
  }

  // Modify the INSERT INTO part to store in `prt_src_dt` table
  val insertIntoRegex = "(?i)INSERT\\s+INTO\\s+\\S+".r
  val finalInsertQuery = insertIntoRegex.replaceFirstIn(finalQuery, "INSERT INTO prt_src_dt")

  println(s"Modified SQL Query to store results in prt_src_dt with static values and hrs_date:\n$finalInsertQuery\n")
  finalInsertQuery
}

// Define the static values to include in the table
val staticValues = Map(
  "static_col1" -> "static_value1",
  "static_col2" -> "static_value2"
)

// Execute each SQL query dynamically with JSON conversion, static values, hrs_date, and store the output in `prt_src_dt`
sqlQueriesToRun.foreach { sqlQuery =>
  try {
    // Modify the query to convert columns to JSON, include static values and hrs_date, and store output in `prt_src_dt`
    val modifiedSqlQuery = modifyQueryForJson(sqlQuery, aliasToActualMap, staticValues)

    // Execute the modified SQL query
    spark.sql(modifiedSqlQuery)
    println("Query executed successfully and results stored in prt_src_dt with static values and hrs_date.")
  } catch {
    case e: Exception =>
      println(s"Failed to execute query: $sqlQuery")
      e.printStackTrace()
  }
}

// Stop the Spark session
spark.stop()

 

 

 

 

 

 

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser

// Initialize Spark session
val spark = SparkSession.builder()
  .appName("Dynamic SQL Execution with JSON Conversion")
  .enableHiveSupport()
  .getOrCreate()

// Load the table containing SQL queries
val sqlQueriesDF = spark.table("sql_queries_table")

// Filter the queries based on a condition (e.g., query_id between 1 and 10)
val filteredQueriesDF = sqlQueriesDF.filter(col("query_id") >= 1 && col("query_id") <= 10)

// Collect the SQL queries to run them
val sqlQueriesToRun = filteredQueriesDF.select("sql_query").as[String].collect()

// Function to extract and modify the SQL query dynamically
def modifyQueryForJson(sqlQuery: String): String = {
  println(s"Original SQL Query:\n$sqlQuery\n")

  // Case-insensitive regex to identify the WITH clause
  val withClauseRegex = "(?i)WITH\\s+[\\w\\W]+?\\)\\s*".r
  val withClause = withClauseRegex.findFirstIn(sqlQuery).getOrElse("")
  
  // Case-insensitive regex to extract the last SELECT clause
  val selectClause = sqlQuery.split("(?i)SELECT").last.split("(?i)FROM")(0).trim
  val columns = selectClause.split(",").map(_.trim)
  
  // Identify the unique_id column from the WITH clause or the SELECT clause
  val uniqueIdColumn = if (withClause.nonEmpty) {
    withClause.split(",").find(_.toLowerCase.contains(".keycolval")).getOrElse("")
  } else {
    columns.find(_.toLowerCase.contains(".keycolval")).getOrElse("")
  }
  
  println(s"Identified unique_id column: $uniqueIdColumn")

  // Filter out the unique_id column from the columns list
  val jsonColumns = columns.filterNot(_.equalsIgnoreCase(uniqueIdColumn))

  println(s"Columns for JSON conversion: ${jsonColumns.mkString(", ")}")

  // Create the JSON conversion part of the query
  val jsonStruct = jsonColumns.mkString(", ")
  val jsonExpression = s"to_json(struct($jsonStruct)) AS json_columns"

  // Rebuild the SELECT clause
  val modifiedSelectClause = s"$jsonExpression, $uniqueIdColumn AS unique_id"

  // Replace the original SELECT clause with the modified one
  val modifiedSqlQuery = sqlQuery.replace(selectClause, modifiedSelectClause)

  // Prepend the WITH clause back to the modified query if it was present
  val finalQuery = if (withClause.nonEmpty) {
    modifiedSqlQuery
  } else {
    modifiedSqlQuery
  }

  // Modify the INSERT INTO part to store in `prt_src_dt` table
  val insertIntoRegex = "(?i)INSERT\\s+INTO\\s+\\S+".r
  val finalInsertQuery = insertIntoRegex.replaceFirstIn(finalQuery, "INSERT INTO prt_src_dt")

  println(s"Modified SQL Query to store results in prt_src_dt:\n$finalInsertQuery\n")
  finalInsertQuery
}

// Execute each SQL query dynamically with JSON conversion and store the output in `prt_src_dt`
sqlQueriesToRun.foreach { sqlQuery =>
  try {
    // Modify the query to convert columns to JSON and store output in `prt_src_dt`
    val modifiedSqlQuery = modifyQueryForJson(sqlQuery)

    // Execute the modified SQL query
    spark.sql(modifiedSqlQuery)
    println("Query executed successfully and results stored in prt_src_dt.")
  } catch {
    case e: Exception =>
      println(s"Failed to execute query: $sqlQuery")
      e.printStackTrace()
  }
}

// Stop the Spark session
spark.stop()




..
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser

// Initialize Spark session
val spark = SparkSession.builder()
  .appName("Dynamic SQL Execution with JSON Conversion")
  .enableHiveSupport()
  .getOrCreate()

// Load the table containing SQL queries
val sqlQueriesDF = spark.table("sql_queries_table")

// Filter the queries based on a condition (e.g., query_id between 1 and 10)
val filteredQueriesDF = sqlQueriesDF.filter(col("query_id") >= 1 && col("query_id") <= 10)

// Collect the SQL queries to run them
val sqlQueriesToRun = filteredQueriesDF.select("sql_query").as[String].collect()

// Function to extract and modify the SQL query dynamically
def modifyQueryForJson(sqlQuery: String, staticValues: Map[String, String]): String = {
  println(s"Original SQL Query:\n$sqlQuery\n")

  // Case-insensitive regex to identify the WITH clause
  val withClauseRegex = "(?i)WITH\\s+[\\w\\W]+?\\)\\s*".r
  val withClause = withClauseRegex.findFirstIn(sqlQuery).getOrElse("")
  
  // Case-insensitive regex to extract the last SELECT clause
  val selectClause = sqlQuery.split("(?i)SELECT").last.split("(?i)FROM")(0).trim
  val columns = selectClause.split(",").map(_.trim)
  
  // Identify the unique_id column from the WITH clause or the SELECT clause
  val uniqueIdColumn = if (withClause.nonEmpty) {
    withClause.split(",").find(_.toLowerCase.contains(".keycolval")).getOrElse("")
  } else {
    columns.find(_.toLowerCase.contains(".keycolval")).getOrElse("")
  }
  
  // Identify the hrs_date column from the WITH clause or the SELECT clause
  val hrsDateColumn = if (withClause.nonEmpty) {
    withClause.split(",").find(_.toLowerCase.contains("hrs_date")).getOrElse("")
  } else {
    columns.find(_.toLowerCase.contains("hrs_date")).getOrElse("")
  }

  println(s"Identified unique_id column: $uniqueIdColumn")
  println(s"Identified hrs_date column: $hrsDateColumn")

  // Filter out the unique_id and hrs_date columns from the columns list
  val jsonColumns = columns.filterNot(c => c.equalsIgnoreCase(uniqueIdColumn) || c.equalsIgnoreCase(hrsDateColumn))

  println(s"Columns for JSON conversion: ${jsonColumns.mkString(", ")}")

  // Create the JSON conversion part of the query
  val jsonStruct = jsonColumns.mkString(", ")
  val jsonExpression = s"to_json(struct($jsonStruct)) AS json_columns"

  // Rebuild the SELECT clause with static values
  val staticValuesExpression = staticValues.map { case (col, value) => s"'$value' AS $col" }.mkString(", ")
  val modifiedSelectClause = s"$jsonExpression, $uniqueIdColumn AS unique_id, $hrsDateColumn AS hrs_date, $staticValuesExpression"

  // Replace the original SELECT clause with the modified one
  val modifiedSqlQuery = sqlQuery.replace(selectClause, modifiedSelectClause)

  // Prepend the WITH clause back to the modified query if it was present
  val finalQuery = if (withClause.nonEmpty) {
    modifiedSqlQuery
  } else {
    modifiedSqlQuery
  }

  // Modify the INSERT INTO part to store in `prt_src_dt` table
  val insertIntoRegex = "(?i)INSERT\\s+INTO\\s+\\S+".r
  val finalInsertQuery = insertIntoRegex.replaceFirstIn(finalQuery, "INSERT INTO prt_src_dt")

  println(s"Modified SQL Query to store results in prt_src_dt with static values and hrs_date:\n$finalInsertQuery\n")
  finalInsertQuery
}

// Define the static values to include in the table
val staticValues = Map(
  "static_col1" -> "static_value1",
  "static_col2" -> "static_value2"
)

// Execute each SQL query dynamically with JSON conversion, static values, hrs_date, and store the output in `prt_src_dt`
sqlQueriesToRun.foreach { sqlQuery =>
  try {
    // Modify the query to convert columns to JSON, include static values and hrs_date, and store output in `prt_src_dt`
    val modifiedSqlQuery = modifyQueryForJson(sqlQuery, staticValues)

    // Execute the modified SQL query
    spark.sql(modifiedSqlQuery)
    println("Query executed successfully and results stored in prt_src_dt with static values and hrs_date.")
  } catch {
    case e: Exception =>
      println(s"Failed to execute query: $sqlQuery")
      e.printStackTrace()
  }
}

// Stop the Spark session
spark.stop()