new

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import java.util.regex.Pattern

// Initialize Spark session
val spark = SparkSession.builder()
  .appName("Dynamic SQL Execution with JSON Conversion")
  .enableHiveSupport()
  .getOrCreate()

// Load the table containing SQL queries
val sqlQueriesDF = spark.table("sql_queries_table")

// Load the table containing alias and actual column mappings
val mappingDF = spark.table("mapping_table").selectExpr("inline(temp) as aliasCol, actCol")

// Load the table containing static columns
val staticColumnsDF = spark.table("static_columns_table")
val staticValues = staticColumnsDF.collect().map { row =>
  row.getString(0) -> row.getString(1)
}.toMap

// Function to extract and modify the SQL query dynamically
def modifyQueryForJson(sqlQuery: String, staticValues: Map[String, String], aliasToActualMap: Map[String, String]): String = {
  println(s"Original SQL Query:\n$sqlQuery\n")

  // Case-insensitive regex to identify the WITH clause
  val withClauseRegex = "(?i)WITH\\s+[\\w\\W]+?\\)\\s*".r
  val withClause = withClauseRegex.findFirstIn(sqlQuery).getOrElse("")
  
  // Extract the last SELECT clause case-insensitively
  val selectPattern = "(?i)SELECT\\s+[\\w\\W]+?\\s+(?i)FROM".r
  val matcher = selectPattern.findFirstMatchIn(sqlQuery)
  
  val originalSelectClause = matcher.map(_.group(0).stripSuffix("FROM").trim).getOrElse("")
  
  val columns = originalSelectClause.split(",").map(_.trim)

  // Identify the unique_id and hrs_date columns, handling them wherever they appear in the query
  val uniqueIdColumn = columns.find(_.toLowerCase.contains(".keycolval")).getOrElse("")
  val hrsDateColumn = columns.find(_.toLowerCase.contains("hrs_date")).getOrElse("")

  // Filter out the unique_id and hrs_date columns from the columns list
  val jsonColumns = columns.filterNot(c => c.equalsIgnoreCase(uniqueIdColumn) || c.equalsIgnoreCase(hrsDateColumn))

  println(s"Columns for JSON conversion: ${jsonColumns.mkString(", ")}")

  // Create the JSON conversion part of the query
  val jsonStruct = jsonColumns.map { col =>
    val aliasName = col.split("\\s+as\\s+")(1).trim.toLowerCase.replaceAll(".*\\.", "")
    val actName = aliasToActualMap.getOrElse(aliasName, aliasName)
    s"struct('$aliasName' as aliasName, '$actName' as actName, $col as value)"
  }.mkString(", ")

  val jsonExpression = s"array($jsonStruct) AS json_columns"

  // Rebuild the SELECT clause with static values
  val staticValuesExpression = staticValues.map { case (col, value) => s"'$value' AS $col" }.mkString(", ")
  val modifiedSelectClause = s"$jsonExpression, $uniqueIdColumn AS unique_id, $hrsDateColumn AS hrs_date, $staticValuesExpression"

  // Replace the original SELECT clause with the modified one using a case-insensitive pattern
  val selectPatternIgnoreCase = "(?i)(SELECT\\s+[\\w\\W]+?\\s+FROM)".r
  val modifiedSqlQuery = selectPatternIgnoreCase.replaceFirstIn(sqlQuery, s"SELECT $modifiedSelectClause FROM")

  // Prepend the WITH clause back to the modified query if it was present
  val finalQuery = if (withClause.nonEmpty) {
    withClause + modifiedSqlQuery
  } else {
    modifiedSqlQuery
  }

  // Modify the INSERT INTO part to store in `prt_src_dt` table
  val insertIntoRegex = "(?i)INSERT\\s+INTO\\s+\\S+".r
  val finalInsertQuery = insertIntoRegex.replaceFirstIn(finalQuery, "INSERT INTO prt_src_dt")

  println(s"Modified SQL Query to store results in prt_src_dt with static values and hrs_date:\n$finalInsertQuery\n")
  finalInsertQuery
}

// Load the alias-to-actual column mappings
val aliasToActualMap = mappingDF.collect().map { row =>
  val alias = row.getAs[String]("aliasCol").toLowerCase.replaceAll(".*\\.", "") // Remove table aliases
  val act = row.getAs[String]("actCol")
  (alias, act)
}.toMap

// Execute each SQL query dynamically with JSON conversion, static values, hrs_date, and store the output in `prt_src_dt`
sqlQueriesDF.select("sql_query").as[String].collect().foreach { sqlQuery =>
  try {
    // Modify the query to convert columns to JSON, include static values and hrs_date, and store output in `prt_src_dt`
    val modifiedSqlQuery = modifyQueryForJson(sqlQuery, staticValues, aliasToActualMap)

    // Execute the modified SQL query
    spark.sql(modifiedSqlQuery)
    println("Query executed successfully and results stored in prt_src_dt with static values and hrs_date.")
  } catch {
    case e: Exception =>
      println(s"Failed to execute query: $sqlQuery")
      e.printStackTrace()
  }
}

// Stop the Spark session
spark.stop()

 

 

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import scala.util.matching.Regex

// Initialize Spark session
val spark = SparkSession.builder()
  .appName("Dynamic SQL Execution with JSON Conversion")
  .enableHiveSupport()
  .getOrCreate()

// Example SQL query coming from a table
val sqlQuery = """INSERT INTO some_table
WITH temp1 AS (
  SELECT t.abc as col1, t.col2, t.time as col3, t.keycolval AS unique_id, t2.hrs_date
  FROM table1 t
  JOIN table2 t2 ON t.id = t2.id
),
temp2 AS (
  SELECT col1, col2, col3, test, gdfgsdf
  FROM temp1
)
SELECT trim(temp2.col1) as col1, temp2.col2, temp2.col3, temp2.test, t3.hrs_date
FROM temp2
JOIN table3 t3 ON temp2.id = t3.id"""

// Example JSON mapping from another table
val mappingJson = """{
  "set": "some_value",
  "temp": [
    {"aliasCol": "col1", "actCol": "account name"},
    {"aliasCol": "col2", "actCol": "bio name"},
    {"aliasCol": "col3", "actCol": "date of birth"}
  ]
}"""

// Parse JSON mapping
val mappingDF = spark.read.json(Seq(mappingJson).toDS)
val aliasMapping = mappingDF.select(explode(col("temp"))).select("col.*")

// Create a Map for alias to actual name mapping
val aliasToActualMap = aliasMapping.collect().map { row =>
  val alias = row.getAs[String]("aliasCol").toLowerCase.replaceAll(".*\\.", "") // Handle table aliases and case
  val act = row.getAs[String]("actCol")
  (alias, act)
}.toMap

// Function to modify the SQL query
def modifyQueryForJson(sqlQuery: String, aliasToActualMap: Map[String, String]): String = {
  println(s"Original SQL Query:\n$sqlQuery\n")

  // Regex for case-insensitive SELECT clause that accounts for INSERT and FROM
  val selectClauseRegex: Regex = "(?i)INSERT\\s+(INTO|OVERWRITE)\\s+.*?\\s+SELECT\\s+(DISTINCT\\s+)?(.*?)\\s+FROM".r

  // Extract the final SELECT clause in the query
  val selectClause = selectClauseRegex.findFirstMatchIn(sqlQuery).map(_.group(3)).getOrElse("")

  // Extract columns from the SELECT clause
  val columns = selectClause.split(",").map(_.trim)

  // Identify unique_id and hrs_date columns
  val uniqueIdColumn = columns.find(_.toLowerCase.contains("keycolval")).map(_.trim.split("\\s+").head).getOrElse("")
  val hrsDateColumn = columns.find(_.toLowerCase.contains("hrs_date")).map(_.trim.split("\\s+").head).getOrElse("")

  println(s"Identified unique_id column: $uniqueIdColumn")
  println(s"Identified hrs_date column: $hrsDateColumn")

  // Filter out the unique_id and hrs_date columns from the columns list for JSON
  val jsonColumns = columns.filterNot(c => c.equalsIgnoreCase(uniqueIdColumn) || c.equalsIgnoreCase(hrsDateColumn))

  // Create JSON conversion part of the query
  val jsonStruct = jsonColumns.map { col =>
    val aliasName = col.split("\\s+as\\s+").last.trim.replaceAll(".*\\.", "").toLowerCase
    val actName = aliasToActualMap.getOrElse(aliasName, aliasName)
    s"struct('$aliasName' as aliasName, '$actName' as actName, $col as value)"
  }.mkString(", ")
  val jsonExpression = s"array($jsonStruct) AS json_columns"

  // Replace the original SELECT clause with the modified one
  val modifiedSelectClause = s"$jsonExpression, $uniqueIdColumn AS unique_id, $hrsDateColumn AS hrs_date"

  // Modify the SQL query with the new SELECT clause
  val modifiedSqlQuery = selectClauseRegex.replaceFirstIn(sqlQuery, s"SELECT $modifiedSelectClause FROM")

  // Replace the INSERT INTO or INSERT OVERWRITE TABLE part to store in `prt_src_dt` table
  val insertIntoRegex = "(?i)INSERT\\s+(INTO|OVERWRITE)\\s+TABLE\\s+\\S+".r
  val finalInsertQuery = insertIntoRegex.replaceFirstIn(modifiedSqlQuery, "INSERT INTO prt_src_dt")

  println(s"Modified SQL Query:\n$finalInsertQuery\n")
  finalInsertQuery
}

// Modify the query and print the output
val modifiedSqlQuery = modifyQueryForJson(sqlQuery, aliasToActualMap)

// Execute the modified SQL query
spark.sql(modifiedSqlQuery)

// Stop the Spark session
spark.stop()

 

 

 

 

 

 

 

 

 

 

 

 

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import java.util.regex.Pattern

// Initialize Spark session
val spark = SparkSession.builder()
  .appName("Dynamic SQL Execution with JSON Conversion")
  .enableHiveSupport()
  .getOrCreate()

// Load the table containing SQL queries
val sqlQueriesDF = spark.table("sql_queries_table")

// Load the table containing alias and actual column mappings
val mappingDF = spark.table("mapping_table").selectExpr("inline(temp) as aliasCol, actCol")

// Load the table containing static columns
val staticColumnsDF = spark.table("static_columns_table")
val staticValues = staticColumnsDF.collect().map { row =>
  row.getString(0) -> row.getString(1)
}.toMap

// Function to divide the SQL query into three parts and modify the query
def modifyQueryForJson(sqlQuery: String, staticValues: Map[String, String], aliasToActualMap: Map[String, String]): String = {
  println(s"Original SQL Query:\n$sqlQuery\n")

  // Case-insensitive regex to identify the INSERT INTO or INSERT OVERWRITE statement
  val insertIntoRegex = "(?i)(INSERT\\s+(INTO|OVERWRITE\\s+TABLE)\\s+\\S+)"
  val insertPattern = Pattern.compile(insertIntoRegex)
  val insertMatcher = insertPattern.matcher(sqlQuery)

  // If an INSERT statement is found, split the SQL query into three parts
  if (insertMatcher.find()) {
    val insertStatement = insertMatcher.group(1)
    val beforeInsert = sqlQuery.substring(0, insertMatcher.start()).trim
    val afterInsert = sqlQuery.substring(insertMatcher.end()).trim

    println(s"INSERT Statement: $insertStatement")
    println(s"Before INSERT: $beforeInsert")
    println(s"After INSERT: $afterInsert")

    // Case-insensitive regex to identify the WITH clause in the afterInsert part
    val withClauseRegex = "(?i)WITH\\s+[\\w\\W]+?\\)\\s*".r
    val withClauses = withClauseRegex.findAllIn(afterInsert).mkString("\n")

    // Remove the WITH clause from the afterInsert part to simplify parsing the SELECT part
    val sqlWithoutWithClause = afterInsert.replace(withClauses, "")

    // Extract the last SELECT clause case-insensitively
    val selectPattern = "(?i)SELECT\\s+[\\w\\W]+?\\s+(?i)FROM".r
    val matcher = selectPattern.findFirstMatchIn(sqlWithoutWithClause)

    val originalSelectClause = matcher.map(_.group(0).stripSuffix("FROM").trim).getOrElse("")

    val columns = originalSelectClause.split(",").map(_.trim)

    // Identify the unique_id and hrs_date columns, handling them wherever they appear in the query
    val uniqueIdColumn = columns.find(_.toLowerCase.contains(".keycolval")).getOrElse("")
    val hrsDateColumn = columns.find(_.toLowerCase.contains("hrs_date")).getOrElse("")

    // Filter out the unique_id and hrs_date columns from the columns list
    val jsonColumns = columns.filterNot(c => c.equalsIgnoreCase(uniqueIdColumn) || c.equalsIgnoreCase(hrsDateColumn))

    println(s"Columns for JSON conversion: ${jsonColumns.mkString(", ")}")

    // Create the JSON conversion part of the query
    val jsonStruct = jsonColumns.map { col =>
      val aliasName = col.split("\\s+as\\s+")(1).trim.toLowerCase.replaceAll(".*\\.", "")
      val actName = aliasToActualMap.getOrElse(aliasName, aliasName)
      s"struct('$aliasName' as aliasName, '$actName' as actName, $col as value)"
    }.mkString(", ")

    val jsonExpression = s"array($jsonStruct) AS json_columns"

    // Rebuild the SELECT clause with static values
    val staticValuesExpression = staticValues.map { case (col, value) => s"'$value' AS $col" }.toSeq.mkString(", ")
    val modifiedSelectClause = s"$jsonExpression, $uniqueIdColumn AS unique_id, $hrsDateColumn AS hrs_date, $staticValuesExpression"

    // Replace the original SELECT clause with the modified one using a case-insensitive pattern
    val selectPatternIgnoreCase = "(?i)(SELECT\\s+[\\w\\W]+?\\s+FROM)".r
    val modifiedSqlWithoutWithClause = selectPatternIgnoreCase.replaceFirstIn(sqlWithoutWithClause, s"SELECT $modifiedSelectClause FROM")

    // Prepend the WITH clause back to the modified query if it was present
    val finalQuery = if (withClauses.nonEmpty) {
      withClauses + modifiedSqlWithoutWithClause
    } else {
      modifiedSqlWithoutWithClause
    }

    // Modify the INSERT INTO part to store in `prt_src_dt` table
    val finalInsertQuery = insertStatement.replace(insertStatement, "INSERT INTO prt_src_dt") + " " + finalQuery

    println(s"Modified SQL Query to store results in prt_src_dt with static values and hrs_date:\n$finalInsertQuery\n")
    finalInsertQuery
  } else {
    // If no INSERT statement is found, return the original query
    sqlQuery
  }
}

// Load the alias-to-actual column mappings
val aliasToActualMap = mappingDF.collect().map { row =>
  val alias = row.getAs[String]("aliasCol").toLowerCase.replaceAll(".*\\.", "") // Remove table aliases
  val act = row.getAs[String]("actCol")
  (alias, act)
}.toMap

// Execute each SQL query dynamically with JSON conversion, static values, hrs_date, and store the output in `prt_src_dt`
sqlQueriesDF.select("sql_query").as[String].collect().foreach { sqlQuery =>
  try {
    // Modify the query to convert columns to JSON, include static values and hrs_date, and store output in `prt_src_dt`
    val modifiedSqlQuery = modifyQueryForJson(sqlQuery, staticValues, aliasToActualMap)

    // Execute the modified SQL query
    spark.sql(modifiedSqlQuery)
    println("Query executed successfully and results stored in prt_src_dt with static values and hrs_date.")
  } catch {
    case e: Exception =>
      println(s"Failed to execute query: $sqlQuery")
      e.printStackTrace()
  }
}

// Stop the Spark session
spark.stop()

 

 

  1. Process One File Only: The script is modified to process only the first matching .txt file by using a file_processed flag and a break statement to exit the loop after the first match.
  2. Skip if No File Found: If no matching file is found, the script will print a message and skip the process.

 

 

 

 

#!/bin/bash

# Define the directory to search for files
search_dir="/path/to/your/directory"

# Define the other shell script to run
other_script="/path/to/your/other_script.sh"

# Flag to indicate if a file was processed
file_processed=false

# Loop through all .txt files in the directory
for file in "$search_dir"/*.txt; do
  # Check if the file matches the general pattern
  if [[ "$file" =~ ^.*/([A-Z]+)_TIMER_RUN_([0-9]+)\.txt$ ]]; then
    # Extract the necessary parts from the filename
    prefix="${BASH_REMATCH[1]}"
    number="${BASH_REMATCH[2]}"
    
    # Run the other shell script with the extracted parameters
    "$other_script" "$prefix" "$number"
    
    # Create the signal file
    signal_file="${file%.txt}.complete"
    touch "$signal_file"
    
    echo "Processed file: $file"
    echo "Created signal file: $signal_file"
    
    # Set the flag to true
    file_processed=true
    
    # Break the loop after processing the first matching file
    break
  fi
done

# Check if no file was processed
if [ "$file_processed" = false ]; then
  echo "No matching .txt file found. Skipping process."
fi



import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import java.security.KeyStore
import javax.crypto.SecretKey
import java.io.ByteArrayInputStream
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration

// Initialize Spark session
val spark = SparkSession.builder()
  .appName("Dynamic SQL Execution with JSON Conversion")
  .enableHiveSupport()
  .getOrCreate()

// Load the table containing SQL queries
val sqlQueriesDF = spark.table("sql_queries_table")
val mappingDF = spark.table("mapping_table").selectExpr("inline(temp) as aliasCol, actCol")
val staticColumnsDF = spark.table("static_columns_table")
val staticValues = staticColumnsDF.collect().map(row => row.getString(0) -> row.getString(1)).toMap

// Function to load Oracle connection details from JCEKS file in HDFS
def loadOracleConnectionDetailsFromHDFS(hdfsFilePath: String, alias: String, password: Array[Char]): (String, String, String, String) = {
  val conf = new Configuration()
  val fs = FileSystem.get(conf)
  val hdfsPath = new Path(hdfsFilePath)
  val inputStream = fs.open(hdfsPath)
  val byteArrayOutputStream = new java.io.ByteArrayOutputStream()
  org.apache.commons.io.IOUtils.copy(inputStream, byteArrayOutputStream)
  inputStream.close()

  val keyStore = KeyStore.getInstance("JCEKS")
  val byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray)
  keyStore.load(byteArrayInputStream, password)
  byteArrayInputStream.close()

  val secretKey = keyStore.getKey(alias, password).asInstanceOf[SecretKey]
  val connectionDetails = new String(secretKey.getEncoded).split(",")

  (connectionDetails(0), connectionDetails(1), connectionDetails(2), connectionDetails(3))
}

// Function to modify query
def modifyQueryForJson(sqlQuery: String, staticValues: Map[String, String], aliasToActualMap: Map[String, String]): String = {
  val insertRegex = "(?i)(INSERT\\s+(INTO|OVERWRITE\\s+TABLE)\\s+\\S+)".r
  val withRegex = "(?i)WITH\\s+[\\w\\W]+?\\)\\s*".r
  val selectRegex = "(?i)(SELECT\\s+[\\w\\W]+?\\s+FROM)".r

  val insertMatch = insertRegex.findFirstMatchIn(sqlQuery).get
  val insertStatement = insertMatch.group(1)
  val beforeInsert = sqlQuery.substring(0, insertMatch.start()).trim
  val afterInsert = sqlQuery.substring(insertMatch.end()).trim

  val withClauses = withRegex.findAllIn(afterInsert).mkString("\n")
  val sqlWithoutWithClause = afterInsert.replace(withClauses, "")
  val originalSelectClause = selectRegex.findFirstMatchIn(sqlWithoutWithClause).map(_.group(0).stripSuffix("FROM").trim).getOrElse("")
  val columns = originalSelectClause.split(",").map(_.trim)

  val uniqueIdColumn = columns.find(_.toLowerCase.contains(".keycolval")).getOrElse("")
  val hrsDateColumn = columns.find(_.toLowerCase.contains("hrs_date")).getOrElse("")
  val jsonColumns = columns.filterNot(c => c.equalsIgnoreCase(uniqueIdColumn) || c.equalsIgnoreCase(hrsDateColumn))

  val jsonStruct = jsonColumns.map { col =>
    val aliasName = col.split("\\s+as\\s+")(1).trim.toLowerCase.replaceAll(".*\\.", "")
    val actName = aliasToActualMap.getOrElse(aliasName, aliasName)
    s"struct('$aliasName' as aliasName, '$actName' as actName, $col as value)"
  }.mkString(", ")

  val jsonExpression = s"array($jsonStruct) AS json_columns"
  val staticValuesExpression = staticValues.map { case (col, value) => s"'$value' AS $col" }.mkString(", ")
  val modifiedSelectClause = s"$jsonExpression, $uniqueIdColumn AS unique_id, $hrsDateColumn AS hrs_date, $staticValuesExpression"
  val modifiedSqlWithoutWithClause = selectRegex.replaceFirstIn(sqlWithoutWithClause, s"SELECT $modifiedSelectClause FROM")

  val finalQuery = if (withClauses.nonEmpty) withClauses + modifiedSqlWithoutWithClause else modifiedSqlWithoutWithClause
  val combinedQuery = s"$beforeInsert $insertStatement $finalQuery"

  combinedQuery
}

// Function to modify DataFrame schema to match Oracle table schema
def modifyDataFrameSchema(df: DataFrame): DataFrame = {
  df.withColumn("json_data", col("json_columns").cast("string"))
    .select("json_data", "static_col1", "static_col2", "unique_id", "hrs_date")
}

// Function to write DataFrame to Oracle table
def writeToOracle(df: DataFrame, url: String, user: String, password: String, tableName: String): Unit = {
  df.write
    .format("jdbc")
    .option("url", url)
    .option("dbtable", tableName)
    .option("user", user)
    .option("password", password)
    .option("oracle.jdbc.mapDateToTimestamp", "false") // Optional: To handle DATE and TIMESTAMP types
    .mode(SaveMode.Append)
    .save()
}

// Load alias-to-actual column mappings
val aliasToActualMap = mappingDF.collect().map { row =>
  val alias = row.getAs[String]("aliasCol").toLowerCase.replaceAll(".*\\.", "")
  val act = row.getAs[String]("actCol")
  (alias, act)
}.toMap

// Load Oracle connection details from JCEKS file in HDFS
val hdfsJceksFilePath = "/path/to/your/keystore.jceks"
val alias = "oracle_credentials"
val keystorePassword = "your_keystore_password".toCharArray()
val (oracleUrl, oracleUser, oraclePassword, oracleTable) = loadOracleConnectionDetailsFromHDFS(hdfsJceksFilePath, alias, keystorePassword)

// Check json_ind value and process accordingly
sqlQueriesDF.collect().foreach { row =>
  val jsonIndValue = row.getAs[String]("json_ind")
  val sqlQuery = row.getAs[String]("sql_query")

  if (jsonIndValue == "Y") {
    // Modify and execute queries with JSON conversion
    try {
      val modifiedSqlQuery = modifyQueryForJson(sqlQuery, staticValues, aliasToActualMap)
      val finalDF = spark.sql(modifiedSqlQuery)
      val modifiedDF = modifyDataFrameSchema(finalDF)
      writeToOracle(modifiedDF, oracleUrl, oracleUser, oraclePassword, oracleTable)
      println("Query executed and data stored in Oracle table successfully.")
    } catch {
      case e: Exception => println(s"Failed to execute query: $sqlQuery"); e.printStackTrace()
    }
  } else {
    // Execute queries directly without JSON conversion
    try {
      val resultDF = spark.sql(sqlQuery)
      writeToOracle(resultDF, oracleUrl, oracleUser, oraclePassword, oracleTable)
      println("Query executed and data stored in Oracle table successfully without JSON conversion.")
    } catch {
      case e: Exception => println(s"Failed to execute query: $sqlQuery"); e.printStackTrace()
    }
  }
}

// Stop Spark session
spark.stop()