sql to json
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ // Initialize Spark session val spark = SparkSession.builder() .appName("Convert Columns to JSON with Actual Column Names") .enableHiveSupport() .getOrCreate() // Load the table that contains the JSON mappings val aliasMappingTableDF = spark.table("alias_mapping_table") // Explode the 'temp' array within the 'mapping_json' struct val aliasMappingDF = aliasMappingTableDF .withColumn("mapping", explode(col("mapping_json.temp"))) // Explode the array .select( col("mapping.aliasName").as("aliasCol"), // Access aliasName within the exploded struct col("mapping.actName").as("actCol") // Access actName within the exploded struct ) // Convert the alias mapping to a Map val aliasToActualMap = aliasMappingDF .as[(String, String)] .collect() .toMap // Load the table that contains the data val originalDF = spark.table("original_table") // Convert the columns to a JSON structure with actual column names val jsonColumns = aliasToActualMap.map { case (aliasCol, actCol) => col(aliasCol).as(actCol) }.toSeq // Create the final DataFrame with the JSON column val finalDF = originalDF .select(to_json(struct(jsonColumns: _*)).as("json_columns")) .withColumn("unique_id", col("tbl.keycolval")) // Adjust according to your query logic .withColumn("hrs_date", col("hrs_date")) // Adjust according to your query logic .withColumn("static_col1", lit("static_value1")) // Static columns .withColumn("static_col2", lit("static_value2")) // Show the transformed data finalDF.show(false) // Optionally, save the transformed data back to a table finalDF.write.mode("overwrite").saveAsTable("prt_src_dt_transformed") // Stop the Spark session spark.stop()
- Accessing Fields:
- Once exploded, the
aliasName
andactName
fields are accessed asaliasCol
andactCol
respectively.
- Once exploded, the
- JSON Conversion:
- The alias-to-actual column mapping is then used to convert the selected columns to JSON using the actual column names.
FINAL UPDATED
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser // Initialize Spark session val spark = SparkSession.builder() .appName("Dynamic SQL Execution with JSON Conversion") .enableHiveSupport() .getOrCreate() // Load the table containing SQL queries val sqlQueriesDF = spark.table("sql_queries_table") // Load the alias mapping table val aliasMappingTableDF = spark.table("alias_mapping_table") // Explode the 'temp' array within the 'mapping_json' struct val aliasMappingDF = aliasMappingTableDF .withColumn("mapping", explode(col("mapping_json.temp"))) // Explode the array .select( col("mapping.aliasName").as("aliasCol"), // Access aliasName within the exploded struct col("mapping.actName").as("actCol") // Access actName within the exploded struct ) // Convert the alias mapping to a Map val aliasToActualMap = aliasMappingDF .as[(String, String)] .collect() .toMap // Filter the queries based on a condition (e.g., query_id between 1 and 10) val filteredQueriesDF = sqlQueriesDF.filter(col("query_id") >= 1 && col("query_id") <= 10) // Collect the SQL queries to run them val sqlQueriesToRun = filteredQueriesDF.select("sql_query").as[String].collect() // Function to extract and modify the SQL query dynamically def modifyQueryForJson(sqlQuery: String, aliasToActualMap: Map[String, String], staticValues: Map[String, String]): String = { println(s"Original SQL Query:\n$sqlQuery\n") // Case-insensitive regex to identify the WITH clause val withClauseRegex = "(?i)WITH\\s+[\\w\\W]+?\\)\\s*".r val withClause = withClauseRegex.findFirstIn(sqlQuery).getOrElse("") // Case-insensitive regex to extract the last SELECT clause val selectClause = sqlQuery.split("(?i)SELECT").last.split("(?i)FROM")(0).trim val columns = selectClause.split(",").map(_.trim) // Identify the unique_id column from the WITH clause or the SELECT clause val uniqueIdColumn = if (withClause.nonEmpty) { withClause.split(",").find(_.toLowerCase.contains(".keycolval")).getOrElse("") } else { columns.find(_.toLowerCase.contains(".keycolval")).getOrElse("") } // Identify the hrs_date column from the WITH clause or the SELECT clause val hrsDateColumn = if (withClause.nonEmpty) { withClause.split(",").find(_.toLowerCase.contains("hrs_date")).getOrElse("") } else { columns.find(_.toLowerCase.contains("hrs_date")).getOrElse("") } println(s"Identified unique_id column: $uniqueIdColumn") println(s"Identified hrs_date column: $hrsDateColumn") // Filter out the unique_id and hrs_date columns from the columns list val jsonColumns = columns.filterNot(c => c.equalsIgnoreCase(uniqueIdColumn) || c.equalsIgnoreCase(hrsDateColumn)) // Replace alias column names with actual column names val actualColumns = jsonColumns.map { colName => val actualColName = aliasToActualMap.getOrElse(colName, colName) s"$colName AS `$actualColName`" } println(s"Columns for JSON conversion with actual names: ${actualColumns.mkString(", ")}") // Create the JSON conversion part of the query val jsonStruct = actualColumns.mkString(", ") val jsonExpression = s"to_json(struct($jsonStruct)) AS json_columns" // Rebuild the SELECT clause with static values val staticValuesExpression = staticValues.map { case (col, value) => s"'$value' AS $col" }.mkString(", ") val modifiedSelectClause = s"$jsonExpression, $uniqueIdColumn AS unique_id, $hrsDateColumn AS hrs_date, $staticValuesExpression" // Replace the original SELECT clause with the modified one val modifiedSqlQuery = sqlQuery.replace(selectClause, modifiedSelectClause) // Prepend the WITH clause back to the modified query if it was present val finalQuery = if (withClause.nonEmpty) { modifiedSqlQuery } else { modifiedSqlQuery } // Modify the INSERT INTO part to store in `prt_src_dt` table val insertIntoRegex = "(?i)INSERT\\s+INTO\\s+\\S+".r val finalInsertQuery = insertIntoRegex.replaceFirstIn(finalQuery, "INSERT INTO prt_src_dt") println(s"Modified SQL Query to store results in prt_src_dt with static values and hrs_date:\n$finalInsertQuery\n") finalInsertQuery } // Define the static values to include in the table val staticValues = Map( "static_col1" -> "static_value1", "static_col2" -> "static_value2" ) // Execute each SQL query dynamically with JSON conversion, static values, hrs_date, and store the output in `prt_src_dt` sqlQueriesToRun.foreach { sqlQuery => try { // Modify the query to convert columns to JSON, include static values and hrs_date, and store output in `prt_src_dt` val modifiedSqlQuery = modifyQueryForJson(sqlQuery, aliasToActualMap, staticValues) // Execute the modified SQL query spark.sql(modifiedSqlQuery) println("Query executed successfully and results stored in prt_src_dt with static values and hrs_date.") } catch { case e: Exception => println(s"Failed to execute query: $sqlQuery") e.printStackTrace() } } // Stop the Spark session spark.stop()
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser // Initialize Spark session val spark = SparkSession.builder() .appName("Dynamic SQL Execution with JSON Conversion") .enableHiveSupport() .getOrCreate() // Load the table containing SQL queries val sqlQueriesDF = spark.table("sql_queries_table") // Filter the queries based on a condition (e.g., query_id between 1 and 10) val filteredQueriesDF = sqlQueriesDF.filter(col("query_id") >= 1 && col("query_id") <= 10) // Collect the SQL queries to run them val sqlQueriesToRun = filteredQueriesDF.select("sql_query").as[String].collect() // Function to extract and modify the SQL query dynamically def modifyQueryForJson(sqlQuery: String): String = { println(s"Original SQL Query:\n$sqlQuery\n") // Case-insensitive regex to identify the WITH clause val withClauseRegex = "(?i)WITH\\s+[\\w\\W]+?\\)\\s*".r val withClause = withClauseRegex.findFirstIn(sqlQuery).getOrElse("") // Case-insensitive regex to extract the last SELECT clause val selectClause = sqlQuery.split("(?i)SELECT").last.split("(?i)FROM")(0).trim val columns = selectClause.split(",").map(_.trim) // Identify the unique_id column from the WITH clause or the SELECT clause val uniqueIdColumn = if (withClause.nonEmpty) { withClause.split(",").find(_.toLowerCase.contains(".keycolval")).getOrElse("") } else { columns.find(_.toLowerCase.contains(".keycolval")).getOrElse("") } println(s"Identified unique_id column: $uniqueIdColumn") // Filter out the unique_id column from the columns list val jsonColumns = columns.filterNot(_.equalsIgnoreCase(uniqueIdColumn)) println(s"Columns for JSON conversion: ${jsonColumns.mkString(", ")}") // Create the JSON conversion part of the query val jsonStruct = jsonColumns.mkString(", ") val jsonExpression = s"to_json(struct($jsonStruct)) AS json_columns" // Rebuild the SELECT clause val modifiedSelectClause = s"$jsonExpression, $uniqueIdColumn AS unique_id" // Replace the original SELECT clause with the modified one val modifiedSqlQuery = sqlQuery.replace(selectClause, modifiedSelectClause) // Prepend the WITH clause back to the modified query if it was present val finalQuery = if (withClause.nonEmpty) { modifiedSqlQuery } else { modifiedSqlQuery } // Modify the INSERT INTO part to store in `prt_src_dt` table val insertIntoRegex = "(?i)INSERT\\s+INTO\\s+\\S+".r val finalInsertQuery = insertIntoRegex.replaceFirstIn(finalQuery, "INSERT INTO prt_src_dt") println(s"Modified SQL Query to store results in prt_src_dt:\n$finalInsertQuery\n") finalInsertQuery } // Execute each SQL query dynamically with JSON conversion and store the output in `prt_src_dt` sqlQueriesToRun.foreach { sqlQuery => try { // Modify the query to convert columns to JSON and store output in `prt_src_dt` val modifiedSqlQuery = modifyQueryForJson(sqlQuery) // Execute the modified SQL query spark.sql(modifiedSqlQuery) println("Query executed successfully and results stored in prt_src_dt.") } catch { case e: Exception => println(s"Failed to execute query: $sqlQuery") e.printStackTrace() } } // Stop the Spark session spark.stop() ..
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser // Initialize Spark session val spark = SparkSession.builder() .appName("Dynamic SQL Execution with JSON Conversion") .enableHiveSupport() .getOrCreate() // Load the table containing SQL queries val sqlQueriesDF = spark.table("sql_queries_table") // Filter the queries based on a condition (e.g., query_id between 1 and 10) val filteredQueriesDF = sqlQueriesDF.filter(col("query_id") >= 1 && col("query_id") <= 10) // Collect the SQL queries to run them val sqlQueriesToRun = filteredQueriesDF.select("sql_query").as[String].collect() // Function to extract and modify the SQL query dynamically def modifyQueryForJson(sqlQuery: String, staticValues: Map[String, String]): String = { println(s"Original SQL Query:\n$sqlQuery\n") // Case-insensitive regex to identify the WITH clause val withClauseRegex = "(?i)WITH\\s+[\\w\\W]+?\\)\\s*".r val withClause = withClauseRegex.findFirstIn(sqlQuery).getOrElse("") // Case-insensitive regex to extract the last SELECT clause val selectClause = sqlQuery.split("(?i)SELECT").last.split("(?i)FROM")(0).trim val columns = selectClause.split(",").map(_.trim) // Identify the unique_id column from the WITH clause or the SELECT clause val uniqueIdColumn = if (withClause.nonEmpty) { withClause.split(",").find(_.toLowerCase.contains(".keycolval")).getOrElse("") } else { columns.find(_.toLowerCase.contains(".keycolval")).getOrElse("") } // Identify the hrs_date column from the WITH clause or the SELECT clause val hrsDateColumn = if (withClause.nonEmpty) { withClause.split(",").find(_.toLowerCase.contains("hrs_date")).getOrElse("") } else { columns.find(_.toLowerCase.contains("hrs_date")).getOrElse("") } println(s"Identified unique_id column: $uniqueIdColumn") println(s"Identified hrs_date column: $hrsDateColumn") // Filter out the unique_id and hrs_date columns from the columns list val jsonColumns = columns.filterNot(c => c.equalsIgnoreCase(uniqueIdColumn) || c.equalsIgnoreCase(hrsDateColumn)) println(s"Columns for JSON conversion: ${jsonColumns.mkString(", ")}") // Create the JSON conversion part of the query val jsonStruct = jsonColumns.mkString(", ") val jsonExpression = s"to_json(struct($jsonStruct)) AS json_columns" // Rebuild the SELECT clause with static values val staticValuesExpression = staticValues.map { case (col, value) => s"'$value' AS $col" }.mkString(", ") val modifiedSelectClause = s"$jsonExpression, $uniqueIdColumn AS unique_id, $hrsDateColumn AS hrs_date, $staticValuesExpression" // Replace the original SELECT clause with the modified one val modifiedSqlQuery = sqlQuery.replace(selectClause, modifiedSelectClause) // Prepend the WITH clause back to the modified query if it was present val finalQuery = if (withClause.nonEmpty) { modifiedSqlQuery } else { modifiedSqlQuery } // Modify the INSERT INTO part to store in `prt_src_dt` table val insertIntoRegex = "(?i)INSERT\\s+INTO\\s+\\S+".r val finalInsertQuery = insertIntoRegex.replaceFirstIn(finalQuery, "INSERT INTO prt_src_dt") println(s"Modified SQL Query to store results in prt_src_dt with static values and hrs_date:\n$finalInsertQuery\n") finalInsertQuery } // Define the static values to include in the table val staticValues = Map( "static_col1" -> "static_value1", "static_col2" -> "static_value2" ) // Execute each SQL query dynamically with JSON conversion, static values, hrs_date, and store the output in `prt_src_dt` sqlQueriesToRun.foreach { sqlQuery => try { // Modify the query to convert columns to JSON, include static values and hrs_date, and store output in `prt_src_dt` val modifiedSqlQuery = modifyQueryForJson(sqlQuery, staticValues) // Execute the modified SQL query spark.sql(modifiedSqlQuery) println("Query executed successfully and results stored in prt_src_dt with static values and hrs_date.") } catch { case e: Exception => println(s"Failed to execute query: $sqlQuery") e.printStackTrace() } } // Stop the Spark session spark.stop()