Exception spark

try {
// Spark SQL code that might throw an exception
spark.sql(“SELECT * FROM invalid_table”)
} catch {
case e: org.apache.spark.sql.AnalysisException => println(“Analysis exception: ” + e.getMessage)
case e: org.apache.spark.sql.ParseException => println(“Parse exception: ” + e.getMessage)
case e: java.lang.IllegalArgumentException => println(“Illegal argument exception: ” + e.getMessage)
case e: java.lang.UnsupportedOperationException => println(“Unsupported operation exception: ” + e.getMessage)
case e: org.apache.spark.SparkException => println(“Spark exception: ” + e.getMessage)
case e: Exception => println(“General exception: ” + e.getMessage)
}

 

case e: org.apache.spark.SparkException =>
if (e.getMessage.contains(“Job aborted”)) {
println(“Job aborted exception: ” + e.getMessage)
} else {
println(“Spark exception: ” + e.getMessage)
}

import org.apache.spark.sql.{SparkSession, DataFrame, AnalysisException}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.log4j.Logger

val logger = Logger.getLogger(getClass.getName)

val spark = SparkSession.builder()
  .appName("SafeQueryRunner")
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._

def runQuery(query: String): DataFrame = {
  try {
    spark.sql(query)
  } catch {
    case ae: AnalysisException =>
      logger.warn(s"AnalysisException occurred: ${ae.getMessage}", ae)
      Seq(s"AnalysisException: ${ae.getMessage}").toDF("error_message")

    case pe: ParseException =>
      logger.warn(s"ParseException occurred: ${pe.getMessage}", pe)
      Seq(s"ParseException: ${pe.getMessage}").toDF("error_message")

    case e: Exception =>
      logger.warn(s"Unexpected exception occurred: ${e.getMessage}", e)
      Seq(s"General Exception: ${e.getMessage}").toDF("error_message")
  }
}

 

 

import org.apache.spark.sql.{SparkSession, Row, DataFrame}
import org.apache.spark.sql.types.{StructType, StructField, StringType}

val error = "AnalysisException: " + ae.getMessage

val schema = StructType(Seq(StructField("error_message", StringType, true)))
val data = Seq(Row(error))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

def runQuery(query: String): DataFrame = {
  try {
    spark.sql(query)
  } catch {
    case ae: AnalysisException =>
      val msg = s"AnalysisException: ${ae.getMessage.take(100)} ..."
      logger.warn(msg)
      createErrorDF(msg)

    case pe: ParseException =>
      val msg = s"ParseException: ${pe.getMessage.take(100)} ..."
      logger.warn(msg)
      createErrorDF(msg)

    case se: org.apache.spark.SparkException =>
      val msg = s"SparkException: ${se.getMessage.take(100)} ..."
      logger.warn(msg)
      createErrorDF(msg)

    case oom: java.lang.OutOfMemoryError =>
      val msg = s"OutOfMemoryError: ${oom.getMessage.take(100)} ..."
      logger.warn(msg)
      createErrorDF(msg)

    case tk: org.apache.spark.SparkException if tk.getMessage.contains("TaskKilledException") =>
      val msg = s"TaskKilledException: ${tk.getMessage.take(100)} ..."
      logger.warn(msg)
      createErrorDF(msg)

    case e: Exception =>
      val msg = s"Exception: ${e.getMessage.take(100)} ..."
      logger.warn(msg)
      createErrorDF(msg)
  }
}

def createErrorDF(msg: String): DataFrame = {
  val schema = StructType(Seq(StructField("error_message", StringType, true)))
  val data = Seq(Row(msg))
  spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
}