new
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import java.util.regex.Pattern // Initialize Spark session val spark = SparkSession.builder() .appName("Dynamic SQL Execution with JSON Conversion") .enableHiveSupport() .getOrCreate() // Load the table containing SQL queries val sqlQueriesDF = spark.table("sql_queries_table") // Load the table containing alias and actual column mappings val mappingDF = spark.table("mapping_table").selectExpr("inline(temp) as aliasCol, actCol") // Load the table containing static columns val staticColumnsDF = spark.table("static_columns_table") val staticValues = staticColumnsDF.collect().map { row => row.getString(0) -> row.getString(1) }.toMap // Function to extract and modify the SQL query dynamically def modifyQueryForJson(sqlQuery: String, staticValues: Map[String, String], aliasToActualMap: Map[String, String]): String = { println(s"Original SQL Query:\n$sqlQuery\n") // Case-insensitive regex to identify the WITH clause val withClauseRegex = "(?i)WITH\\s+[\\w\\W]+?\\)\\s*".r val withClause = withClauseRegex.findFirstIn(sqlQuery).getOrElse("") // Extract the last SELECT clause case-insensitively val selectPattern = "(?i)SELECT\\s+[\\w\\W]+?\\s+(?i)FROM".r val matcher = selectPattern.findFirstMatchIn(sqlQuery) val originalSelectClause = matcher.map(_.group(0).stripSuffix("FROM").trim).getOrElse("") val columns = originalSelectClause.split(",").map(_.trim) // Identify the unique_id and hrs_date columns, handling them wherever they appear in the query val uniqueIdColumn = columns.find(_.toLowerCase.contains(".keycolval")).getOrElse("") val hrsDateColumn = columns.find(_.toLowerCase.contains("hrs_date")).getOrElse("") // Filter out the unique_id and hrs_date columns from the columns list val jsonColumns = columns.filterNot(c => c.equalsIgnoreCase(uniqueIdColumn) || c.equalsIgnoreCase(hrsDateColumn)) println(s"Columns for JSON conversion: ${jsonColumns.mkString(", ")}") // Create the JSON conversion part of the query val jsonStruct = jsonColumns.map { col => val aliasName = col.split("\\s+as\\s+")(1).trim.toLowerCase.replaceAll(".*\\.", "") val actName = aliasToActualMap.getOrElse(aliasName, aliasName) s"struct('$aliasName' as aliasName, '$actName' as actName, $col as value)" }.mkString(", ") val jsonExpression = s"array($jsonStruct) AS json_columns" // Rebuild the SELECT clause with static values val staticValuesExpression = staticValues.map { case (col, value) => s"'$value' AS $col" }.mkString(", ") val modifiedSelectClause = s"$jsonExpression, $uniqueIdColumn AS unique_id, $hrsDateColumn AS hrs_date, $staticValuesExpression" // Replace the original SELECT clause with the modified one using a case-insensitive pattern val selectPatternIgnoreCase = "(?i)(SELECT\\s+[\\w\\W]+?\\s+FROM)".r val modifiedSqlQuery = selectPatternIgnoreCase.replaceFirstIn(sqlQuery, s"SELECT $modifiedSelectClause FROM") // Prepend the WITH clause back to the modified query if it was present val finalQuery = if (withClause.nonEmpty) { withClause + modifiedSqlQuery } else { modifiedSqlQuery } // Modify the INSERT INTO part to store in `prt_src_dt` table val insertIntoRegex = "(?i)INSERT\\s+INTO\\s+\\S+".r val finalInsertQuery = insertIntoRegex.replaceFirstIn(finalQuery, "INSERT INTO prt_src_dt") println(s"Modified SQL Query to store results in prt_src_dt with static values and hrs_date:\n$finalInsertQuery\n") finalInsertQuery } // Load the alias-to-actual column mappings val aliasToActualMap = mappingDF.collect().map { row => val alias = row.getAs[String]("aliasCol").toLowerCase.replaceAll(".*\\.", "") // Remove table aliases val act = row.getAs[String]("actCol") (alias, act) }.toMap // Execute each SQL query dynamically with JSON conversion, static values, hrs_date, and store the output in `prt_src_dt` sqlQueriesDF.select("sql_query").as[String].collect().foreach { sqlQuery => try { // Modify the query to convert columns to JSON, include static values and hrs_date, and store output in `prt_src_dt` val modifiedSqlQuery = modifyQueryForJson(sqlQuery, staticValues, aliasToActualMap) // Execute the modified SQL query spark.sql(modifiedSqlQuery) println("Query executed successfully and results stored in prt_src_dt with static values and hrs_date.") } catch { case e: Exception => println(s"Failed to execute query: $sqlQuery") e.printStackTrace() } } // Stop the Spark session spark.stop()
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import scala.util.matching.Regex // Initialize Spark session val spark = SparkSession.builder() .appName("Dynamic SQL Execution with JSON Conversion") .enableHiveSupport() .getOrCreate() // Example SQL query coming from a table val sqlQuery = """INSERT INTO some_table WITH temp1 AS ( SELECT t.abc as col1, t.col2, t.time as col3, t.keycolval AS unique_id, t2.hrs_date FROM table1 t JOIN table2 t2 ON t.id = t2.id ), temp2 AS ( SELECT col1, col2, col3, test, gdfgsdf FROM temp1 ) SELECT trim(temp2.col1) as col1, temp2.col2, temp2.col3, temp2.test, t3.hrs_date FROM temp2 JOIN table3 t3 ON temp2.id = t3.id""" // Example JSON mapping from another table val mappingJson = """{ "set": "some_value", "temp": [ {"aliasCol": "col1", "actCol": "account name"}, {"aliasCol": "col2", "actCol": "bio name"}, {"aliasCol": "col3", "actCol": "date of birth"} ] }""" // Parse JSON mapping val mappingDF = spark.read.json(Seq(mappingJson).toDS) val aliasMapping = mappingDF.select(explode(col("temp"))).select("col.*") // Create a Map for alias to actual name mapping val aliasToActualMap = aliasMapping.collect().map { row => val alias = row.getAs[String]("aliasCol").toLowerCase.replaceAll(".*\\.", "") // Handle table aliases and case val act = row.getAs[String]("actCol") (alias, act) }.toMap // Function to modify the SQL query def modifyQueryForJson(sqlQuery: String, aliasToActualMap: Map[String, String]): String = { println(s"Original SQL Query:\n$sqlQuery\n") // Regex for case-insensitive SELECT clause that accounts for INSERT and FROM val selectClauseRegex: Regex = "(?i)INSERT\\s+(INTO|OVERWRITE)\\s+.*?\\s+SELECT\\s+(DISTINCT\\s+)?(.*?)\\s+FROM".r // Extract the final SELECT clause in the query val selectClause = selectClauseRegex.findFirstMatchIn(sqlQuery).map(_.group(3)).getOrElse("") // Extract columns from the SELECT clause val columns = selectClause.split(",").map(_.trim) // Identify unique_id and hrs_date columns val uniqueIdColumn = columns.find(_.toLowerCase.contains("keycolval")).map(_.trim.split("\\s+").head).getOrElse("") val hrsDateColumn = columns.find(_.toLowerCase.contains("hrs_date")).map(_.trim.split("\\s+").head).getOrElse("") println(s"Identified unique_id column: $uniqueIdColumn") println(s"Identified hrs_date column: $hrsDateColumn") // Filter out the unique_id and hrs_date columns from the columns list for JSON val jsonColumns = columns.filterNot(c => c.equalsIgnoreCase(uniqueIdColumn) || c.equalsIgnoreCase(hrsDateColumn)) // Create JSON conversion part of the query val jsonStruct = jsonColumns.map { col => val aliasName = col.split("\\s+as\\s+").last.trim.replaceAll(".*\\.", "").toLowerCase val actName = aliasToActualMap.getOrElse(aliasName, aliasName) s"struct('$aliasName' as aliasName, '$actName' as actName, $col as value)" }.mkString(", ") val jsonExpression = s"array($jsonStruct) AS json_columns" // Replace the original SELECT clause with the modified one val modifiedSelectClause = s"$jsonExpression, $uniqueIdColumn AS unique_id, $hrsDateColumn AS hrs_date" // Modify the SQL query with the new SELECT clause val modifiedSqlQuery = selectClauseRegex.replaceFirstIn(sqlQuery, s"SELECT $modifiedSelectClause FROM") // Replace the INSERT INTO or INSERT OVERWRITE TABLE part to store in `prt_src_dt` table val insertIntoRegex = "(?i)INSERT\\s+(INTO|OVERWRITE)\\s+TABLE\\s+\\S+".r val finalInsertQuery = insertIntoRegex.replaceFirstIn(modifiedSqlQuery, "INSERT INTO prt_src_dt") println(s"Modified SQL Query:\n$finalInsertQuery\n") finalInsertQuery } // Modify the query and print the output val modifiedSqlQuery = modifyQueryForJson(sqlQuery, aliasToActualMap) // Execute the modified SQL query spark.sql(modifiedSqlQuery) // Stop the Spark session spark.stop()
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import java.util.regex.Pattern // Initialize Spark session val spark = SparkSession.builder() .appName("Dynamic SQL Execution with JSON Conversion") .enableHiveSupport() .getOrCreate() // Load the table containing SQL queries val sqlQueriesDF = spark.table("sql_queries_table") // Load the table containing alias and actual column mappings val mappingDF = spark.table("mapping_table").selectExpr("inline(temp) as aliasCol, actCol") // Load the table containing static columns val staticColumnsDF = spark.table("static_columns_table") val staticValues = staticColumnsDF.collect().map { row => row.getString(0) -> row.getString(1) }.toMap // Function to divide the SQL query into three parts and modify the query def modifyQueryForJson(sqlQuery: String, staticValues: Map[String, String], aliasToActualMap: Map[String, String]): String = { println(s"Original SQL Query:\n$sqlQuery\n") // Case-insensitive regex to identify the INSERT INTO or INSERT OVERWRITE statement val insertIntoRegex = "(?i)(INSERT\\s+(INTO|OVERWRITE\\s+TABLE)\\s+\\S+)" val insertPattern = Pattern.compile(insertIntoRegex) val insertMatcher = insertPattern.matcher(sqlQuery) // If an INSERT statement is found, split the SQL query into three parts if (insertMatcher.find()) { val insertStatement = insertMatcher.group(1) val beforeInsert = sqlQuery.substring(0, insertMatcher.start()).trim val afterInsert = sqlQuery.substring(insertMatcher.end()).trim println(s"INSERT Statement: $insertStatement") println(s"Before INSERT: $beforeInsert") println(s"After INSERT: $afterInsert") // Case-insensitive regex to identify the WITH clause in the afterInsert part val withClauseRegex = "(?i)WITH\\s+[\\w\\W]+?\\)\\s*".r val withClauses = withClauseRegex.findAllIn(afterInsert).mkString("\n") // Remove the WITH clause from the afterInsert part to simplify parsing the SELECT part val sqlWithoutWithClause = afterInsert.replace(withClauses, "") // Extract the last SELECT clause case-insensitively val selectPattern = "(?i)SELECT\\s+[\\w\\W]+?\\s+(?i)FROM".r val matcher = selectPattern.findFirstMatchIn(sqlWithoutWithClause) val originalSelectClause = matcher.map(_.group(0).stripSuffix("FROM").trim).getOrElse("") val columns = originalSelectClause.split(",").map(_.trim) // Identify the unique_id and hrs_date columns, handling them wherever they appear in the query val uniqueIdColumn = columns.find(_.toLowerCase.contains(".keycolval")).getOrElse("") val hrsDateColumn = columns.find(_.toLowerCase.contains("hrs_date")).getOrElse("") // Filter out the unique_id and hrs_date columns from the columns list val jsonColumns = columns.filterNot(c => c.equalsIgnoreCase(uniqueIdColumn) || c.equalsIgnoreCase(hrsDateColumn)) println(s"Columns for JSON conversion: ${jsonColumns.mkString(", ")}") // Create the JSON conversion part of the query val jsonStruct = jsonColumns.map { col => val aliasName = col.split("\\s+as\\s+")(1).trim.toLowerCase.replaceAll(".*\\.", "") val actName = aliasToActualMap.getOrElse(aliasName, aliasName) s"struct('$aliasName' as aliasName, '$actName' as actName, $col as value)" }.mkString(", ") val jsonExpression = s"array($jsonStruct) AS json_columns" // Rebuild the SELECT clause with static values val staticValuesExpression = staticValues.map { case (col, value) => s"'$value' AS $col" }.toSeq.mkString(", ") val modifiedSelectClause = s"$jsonExpression, $uniqueIdColumn AS unique_id, $hrsDateColumn AS hrs_date, $staticValuesExpression" // Replace the original SELECT clause with the modified one using a case-insensitive pattern val selectPatternIgnoreCase = "(?i)(SELECT\\s+[\\w\\W]+?\\s+FROM)".r val modifiedSqlWithoutWithClause = selectPatternIgnoreCase.replaceFirstIn(sqlWithoutWithClause, s"SELECT $modifiedSelectClause FROM") // Prepend the WITH clause back to the modified query if it was present val finalQuery = if (withClauses.nonEmpty) { withClauses + modifiedSqlWithoutWithClause } else { modifiedSqlWithoutWithClause } // Modify the INSERT INTO part to store in `prt_src_dt` table val finalInsertQuery = insertStatement.replace(insertStatement, "INSERT INTO prt_src_dt") + " " + finalQuery println(s"Modified SQL Query to store results in prt_src_dt with static values and hrs_date:\n$finalInsertQuery\n") finalInsertQuery } else { // If no INSERT statement is found, return the original query sqlQuery } } // Load the alias-to-actual column mappings val aliasToActualMap = mappingDF.collect().map { row => val alias = row.getAs[String]("aliasCol").toLowerCase.replaceAll(".*\\.", "") // Remove table aliases val act = row.getAs[String]("actCol") (alias, act) }.toMap // Execute each SQL query dynamically with JSON conversion, static values, hrs_date, and store the output in `prt_src_dt` sqlQueriesDF.select("sql_query").as[String].collect().foreach { sqlQuery => try { // Modify the query to convert columns to JSON, include static values and hrs_date, and store output in `prt_src_dt` val modifiedSqlQuery = modifyQueryForJson(sqlQuery, staticValues, aliasToActualMap) // Execute the modified SQL query spark.sql(modifiedSqlQuery) println("Query executed successfully and results stored in prt_src_dt with static values and hrs_date.") } catch { case e: Exception => println(s"Failed to execute query: $sqlQuery") e.printStackTrace() } } // Stop the Spark session spark.stop()
- Process One File Only: The script is modified to process only the first matching
.txt
file by using afile_processed
flag and abreak
statement to exit the loop after the first match. - Skip if No File Found: If no matching file is found, the script will print a message and skip the process.
#!/bin/bash # Define the directory to search for files search_dir="/path/to/your/directory" # Define the other shell script to run other_script="/path/to/your/other_script.sh" # Flag to indicate if a file was processed file_processed=false # Loop through all .txt files in the directory for file in "$search_dir"/*.txt; do # Check if the file matches the general pattern if [[ "$file" =~ ^.*/([A-Z]+)_TIMER_RUN_([0-9]+)\.txt$ ]]; then # Extract the necessary parts from the filename prefix="${BASH_REMATCH[1]}" number="${BASH_REMATCH[2]}" # Run the other shell script with the extracted parameters "$other_script" "$prefix" "$number" # Create the signal file signal_file="${file%.txt}.complete" touch "$signal_file" echo "Processed file: $file" echo "Created signal file: $signal_file" # Set the flag to true file_processed=true # Break the loop after processing the first matching file break fi done # Check if no file was processed if [ "$file_processed" = false ]; then echo "No matching .txt file found. Skipping process." fi
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import java.security.KeyStore import javax.crypto.SecretKey import java.io.ByteArrayInputStream import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration // Initialize Spark session val spark = SparkSession.builder() .appName("Dynamic SQL Execution with JSON Conversion") .enableHiveSupport() .getOrCreate() // Load the table containing SQL queries val sqlQueriesDF = spark.table("sql_queries_table") val mappingDF = spark.table("mapping_table").selectExpr("inline(temp) as aliasCol, actCol") val staticColumnsDF = spark.table("static_columns_table") val staticValues = staticColumnsDF.collect().map(row => row.getString(0) -> row.getString(1)).toMap // Function to load Oracle connection details from JCEKS file in HDFS def loadOracleConnectionDetailsFromHDFS(hdfsFilePath: String, alias: String, password: Array[Char]): (String, String, String, String) = { val conf = new Configuration() val fs = FileSystem.get(conf) val hdfsPath = new Path(hdfsFilePath) val inputStream = fs.open(hdfsPath) val byteArrayOutputStream = new java.io.ByteArrayOutputStream() org.apache.commons.io.IOUtils.copy(inputStream, byteArrayOutputStream) inputStream.close() val keyStore = KeyStore.getInstance("JCEKS") val byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray) keyStore.load(byteArrayInputStream, password) byteArrayInputStream.close() val secretKey = keyStore.getKey(alias, password).asInstanceOf[SecretKey] val connectionDetails = new String(secretKey.getEncoded).split(",") (connectionDetails(0), connectionDetails(1), connectionDetails(2), connectionDetails(3)) } // Function to modify query def modifyQueryForJson(sqlQuery: String, staticValues: Map[String, String], aliasToActualMap: Map[String, String]): String = { val insertRegex = "(?i)(INSERT\\s+(INTO|OVERWRITE\\s+TABLE)\\s+\\S+)".r val withRegex = "(?i)WITH\\s+[\\w\\W]+?\\)\\s*".r val selectRegex = "(?i)(SELECT\\s+[\\w\\W]+?\\s+FROM)".r val insertMatch = insertRegex.findFirstMatchIn(sqlQuery).get val insertStatement = insertMatch.group(1) val beforeInsert = sqlQuery.substring(0, insertMatch.start()).trim val afterInsert = sqlQuery.substring(insertMatch.end()).trim val withClauses = withRegex.findAllIn(afterInsert).mkString("\n") val sqlWithoutWithClause = afterInsert.replace(withClauses, "") val originalSelectClause = selectRegex.findFirstMatchIn(sqlWithoutWithClause).map(_.group(0).stripSuffix("FROM").trim).getOrElse("") val columns = originalSelectClause.split(",").map(_.trim) val uniqueIdColumn = columns.find(_.toLowerCase.contains(".keycolval")).getOrElse("") val hrsDateColumn = columns.find(_.toLowerCase.contains("hrs_date")).getOrElse("") val jsonColumns = columns.filterNot(c => c.equalsIgnoreCase(uniqueIdColumn) || c.equalsIgnoreCase(hrsDateColumn)) val jsonStruct = jsonColumns.map { col => val aliasName = col.split("\\s+as\\s+")(1).trim.toLowerCase.replaceAll(".*\\.", "") val actName = aliasToActualMap.getOrElse(aliasName, aliasName) s"struct('$aliasName' as aliasName, '$actName' as actName, $col as value)" }.mkString(", ") val jsonExpression = s"array($jsonStruct) AS json_columns" val staticValuesExpression = staticValues.map { case (col, value) => s"'$value' AS $col" }.mkString(", ") val modifiedSelectClause = s"$jsonExpression, $uniqueIdColumn AS unique_id, $hrsDateColumn AS hrs_date, $staticValuesExpression" val modifiedSqlWithoutWithClause = selectRegex.replaceFirstIn(sqlWithoutWithClause, s"SELECT $modifiedSelectClause FROM") val finalQuery = if (withClauses.nonEmpty) withClauses + modifiedSqlWithoutWithClause else modifiedSqlWithoutWithClause val combinedQuery = s"$beforeInsert $insertStatement $finalQuery" combinedQuery } // Function to modify DataFrame schema to match Oracle table schema def modifyDataFrameSchema(df: DataFrame): DataFrame = { df.withColumn("json_data", col("json_columns").cast("string")) .select("json_data", "static_col1", "static_col2", "unique_id", "hrs_date") } // Function to write DataFrame to Oracle table def writeToOracle(df: DataFrame, url: String, user: String, password: String, tableName: String): Unit = { df.write .format("jdbc") .option("url", url) .option("dbtable", tableName) .option("user", user) .option("password", password) .option("oracle.jdbc.mapDateToTimestamp", "false") // Optional: To handle DATE and TIMESTAMP types .mode(SaveMode.Append) .save() } // Load alias-to-actual column mappings val aliasToActualMap = mappingDF.collect().map { row => val alias = row.getAs[String]("aliasCol").toLowerCase.replaceAll(".*\\.", "") val act = row.getAs[String]("actCol") (alias, act) }.toMap // Load Oracle connection details from JCEKS file in HDFS val hdfsJceksFilePath = "/path/to/your/keystore.jceks" val alias = "oracle_credentials" val keystorePassword = "your_keystore_password".toCharArray() val (oracleUrl, oracleUser, oraclePassword, oracleTable) = loadOracleConnectionDetailsFromHDFS(hdfsJceksFilePath, alias, keystorePassword) // Check json_ind value and process accordingly sqlQueriesDF.collect().foreach { row => val jsonIndValue = row.getAs[String]("json_ind") val sqlQuery = row.getAs[String]("sql_query") if (jsonIndValue == "Y") { // Modify and execute queries with JSON conversion try { val modifiedSqlQuery = modifyQueryForJson(sqlQuery, staticValues, aliasToActualMap) val finalDF = spark.sql(modifiedSqlQuery) val modifiedDF = modifyDataFrameSchema(finalDF) writeToOracle(modifiedDF, oracleUrl, oracleUser, oraclePassword, oracleTable) println("Query executed and data stored in Oracle table successfully.") } catch { case e: Exception => println(s"Failed to execute query: $sqlQuery"); e.printStackTrace() } } else { // Execute queries directly without JSON conversion try { val resultDF = spark.sql(sqlQuery) writeToOracle(resultDF, oracleUrl, oracleUser, oraclePassword, oracleTable) println("Query executed and data stored in Oracle table successfully without JSON conversion.") } catch { case e: Exception => println(s"Failed to execute query: $sqlQuery"); e.printStackTrace() } } } // Stop Spark session spark.stop()