Count Check Process

#!/usr/bin/env bash
###############################################################################
# build_view_load_map.sh (Optimized for Daily Run)
#
# Builds db_admin.view_load_audit_map:
#   - Avoids reloading existing entries (by id, view, table, table_type)
#   - Uses cached view -> table and table -> partition_column mappings
###############################################################################
set -euo pipefail

JDBC="jdbc:hive2://hiveserver:10000"
HUSER="hive"            # change as needed
HPASS="hive_pwd"
RUN_DT="${1:-$(date '+%Y-%m-%d')}"
WORKDIR="/tmp/view_map.$$"
mkdir -p "$WORKDIR"

echo "\n▶︎ Starting build_view_load_map.sh  (run_dt=$RUN_DT)"

#########################
## 1. Collect view data
#########################

# 1A. Direct views
beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" \
  --showHeader=false --outputFormat=tsv2 --silent=true -e \
  "SELECT id, tbl_nm FROM db.tbl_info;" \
| awk -F'\t' '{print $1"\t"$2"\tdirect\tNA\tNA\tNA\tNA"}' \
> "$WORKDIR/list_direct.tsv"

# 1B. Enquery views from SQL
beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" \
  --showHeader=false --outputFormat=tsv2 --silent=true -e \
  "SELECT id, template, query, sys_dt FROM db.query_store;" \
| while IFS=$'\t' read -r ID TEMPLATE QUERY SYS_DT; do
    SRC_TABLE=$(echo "$QUERY" |
      grep -iEo '(from|join)[[:space:]]+[`]?[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+[`]?' |
      head -n1 | awk '{print $NF}' | tr -d '`')

    VIEW_NAME=$(echo "$QUERY" |
      grep -iEo '(insert[[:space:]]+into)[[:space:]]+[`]?[a-zA-Z0-9_.]+[`]?' |
      head -n1 | awk '{print $NF}' | tr -d '`')

    echo -e "$ID\t$VIEW_NAME\tenquery\t$TEMPLATE\t$QUERY\t$SYS_DT\t$SRC_TABLE"
done > "$WORKDIR/list_parsed.tsv"

# 1C. Merge
cat "$WORKDIR/list_direct.tsv" "$WORKDIR/list_parsed.tsv" > "$WORKDIR/all_views.tsv"

#################################################
## 1D. Remove already-loaded entries (dedup step)
#################################################
echo "▶︎ Checking already existing entries in db_admin.view_load_audit_map"

beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true \
  --outputFormat=tsv2 --showHeader=false -e \
  "SELECT DISTINCT id, view_name, table_name, table_type FROM db_admin.view_load_audit_map;" \
> "$WORKDIR/existing_loaded.tsv"

awk -F'\t' '{print $1"|"$2"|"$3"|"$4}' "$WORKDIR/existing_loaded.tsv" > "$WORKDIR/existing_keys.txt"

awk -F'\t' '{print $1"|"$2"|NA|"$3"\t"$0}' "$WORKDIR/all_views.tsv" > "$WORKDIR/all_views_with_key.tsv"
grep -F -v -f "$WORKDIR/existing_keys.txt" "$WORKDIR/all_views_with_key.tsv" | cut -f2- > "$WORKDIR/all_views_final.tsv"

echo "   • Found $(wc -l < "$WORKDIR/existing_keys.txt") previously loaded rows"
echo "   • Proceeding with $(wc -l < "$WORKDIR/all_views_final.tsv") new rows only"

############################
## 2. Cache view -> table
############################
echo "▶︎ Caching view -> table"
cut -f2 "$WORKDIR/all_views_final.tsv" | sort -u > "$WORKDIR/unique_views.txt"
echo -e "view_name\ttable_name" > "$WORKDIR/view_to_table.tsv"

while read -r VIEW; do
  [[ -z "$VIEW" ]] && continue
  DEF=$(beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true \
        --showHeader=false --outputFormat=tsv2 \
        -e "SHOW CREATE TABLE $VIEW;" 2>/dev/null | tr '\n' ' ')
  TABLE=$(echo "$DEF" |
    grep -iEo '(from|join)[[:space:]]+[`]?[a-zA-Z0-9_.]+[`]?' |
    head -n1 | awk '{print $NF}' | tr -d '`')
  echo -e "$VIEW\t$TABLE"
done < "$WORKDIR/unique_views.txt" >> "$WORKDIR/view_to_table.tsv"

############################
## 3. Cache table -> partition
############################
echo "▶︎ Caching table -> partition column"
cut -f2 "$WORKDIR/view_to_table.tsv" | sort -u > "$WORKDIR/unique_tables.txt"
echo -e "table_name\tpartition_column" > "$WORKDIR/table_to_partcol.tsv"

while read -r TABLE; do
  [[ -z "$TABLE" ]] && continue
  PART=$(beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true \
      --showHeader=false --outputFormat=tsv2 \
      -e "SHOW PARTITIONS $TABLE;" 2>/dev/null | head -n1)
  PCOL=""
  [[ -n "$PART" ]] && PCOL="${PART%%=*}"
  echo -e "$TABLE\t$PCOL"
done < "$WORKDIR/unique_tables.txt" >> "$WORKDIR/table_to_partcol.tsv"

##################################
## 4. Load caches into memory
##################################
declare -A VIEW2TABLE
declare -A TABLE2PART

tail -n +2 "$WORKDIR/view_to_table.tsv" | while IFS=$'\t' read -r V T; do
  VIEW2TABLE["$V"]="$T"
done

tail -n +2 "$WORKDIR/table_to_partcol.tsv" | while IFS=$'\t' read -r T P; do
  TABLE2PART["$T"]="$P"
done

while IFS=$'\t' read -r V T; do
VIEW2TABLE["$V"]="$T"
done < "$WORKDIR/view_to_table.tsv"
tail -n +2 "$WORKDIR/table_to_partcol.tsv" > "$WORKDIR/table_to_partcol_nh.tsv"

while IFS=$'\t' read -r T P; do
TABLE2PART["$T"]="$P"
done < "$WORKDIR/table_to_partcol_nh.tsv"

##################################
## 5. Final Output
##################################
echo "▶︎ Writing final output"
echo -e "id\tview_name\ttable_name\tpartition_column\ttable_type\ttemplate\traw_query\tsource_table\tregistered_ts\trun_dt" > "$WORKDIR/view_map.txt"

while IFS=$'\t' read -r ID VIEW TYPE TEMPLATE QUERY SYS_DT SRC_TBL; do
  TABLE="${VIEW2TABLE[$VIEW]:-NA}"
  PART_COL="${TABLE2PART[$TABLE]:-NA}"
  printf "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n" \
         "$ID" "$VIEW" "$TABLE" "$PART_COL" "$TYPE" \
         "$TEMPLATE" "$QUERY" "$SRC_TBL" "$(date '+%F %T')" "$RUN_DT" \
         >> "$WORKDIR/view_map.txt"
done < "$WORKDIR/all_views_final.tsv"

############################
## 6. Append New Rows Only
############################
echo "▶︎ Inserting new rows into db_admin.view_load_audit_map"

while IFS=$'\t' read -r ID VIEW TABLE PART TYPE TEMPLATE QUERY SRC_TBL REG_TS RUN_DT; do
  CLEAN_QUERY=$(echo "$QUERY" | sed "s/'/\\'/g" | cut -c1-4000)
  beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true -e "
    INSERT INTO db_admin.view_load_audit_map
    VALUES ('$ID', '$VIEW', '$TABLE', '$PART', '$TYPE',
            '$TEMPLATE', '$CLEAN_QUERY', '$SRC_TBL', '$REG_TS', '$RUN_DT');"
done < <(tail -n +2 "$WORKDIR/view_map.txt")

echo "\n✅ Done. Inserted $(($(wc -l < "$WORKDIR/view_map.txt") - 1)) new rows."
# rm -r "$WORKDIR"

 

 

 

 

 

DROP TABLE IF EXISTS db_admin.view_load_audit_map;
CREATE TABLE db_admin.view_load_audit_map (
id STRING,
view_name STRING,
table_name STRING,
partition_column STRING,
table_type STRING,
registered_ts TIMESTAMP,
run_dt DATE
);

 

 

 


#!/usr/bin/env bash
###############################################################################
# count_check.sh (Optimized Count Checker for Audit Tables)
#
# - Uses parallel processing for table counts
# - Sends an email summary at the end of day showing pending tables
###############################################################################
set -euo pipefail

JDBC="jdbc:hive2://hiveserver:10000"
HUSER="hive"
HPASS="hive_pwd"
RUN_DT="${1:-$(date '+%Y-%m-%d')}"
WORKDIR="/tmp/view_count_check.$$"
mkdir -p "$WORKDIR"
EMAIL_TO="[email protected]"   # <-- Update your email recipient
MAX_PARALLEL=10                     # Number of parallel count jobs

echo "\n▶︎ Starting count check for run_dt=$RUN_DT"

############################################
# 1. Get unique tables from audit map
############################################
beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true \
  --outputFormat=tsv2 --showHeader=false -e \
  "SELECT DISTINCT table_name, partition_column FROM db_admin.view_load_audit_map;" \
  > "$WORKDIR/unique_tables.tsv"

############################################
# 2. Run count per unique table in parallel
############################################
echo -e "table_name\tcount" > "$WORKDIR/table_counts.tsv"

echo "▶︎ Running counts in parallel ($MAX_PARALLEL threads)"
cat "$WORKDIR/unique_tables.tsv" | grep -v "^table_name" | \
  xargs -L1 -P "$MAX_PARALLEL" -I{} bash -c '
    TBL=$(echo {} | cut -f1)
    PART=$(echo {} | cut -f2)
    SQL="SELECT COUNT(*) FROM $TBL"
    [[ "$PART" != "" && "$PART" != "NA" ]] && SQL="SELECT COUNT(*) FROM $TBL WHERE $PART = '''$RUN_DT'''"
    CNT=$(beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true --outputFormat=tsv2 --showHeader=false -e "$SQL" 2>/dev/null | tail -n1)
    CNT=${CNT:-0}
    echo -e "$TBL\t$CNT" >> "$WORKDIR/table_counts.tsv"
'

############################################
# 3. Map counts back to each ID/view
############################################
beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true \
  --outputFormat=tsv2 --showHeader=false -e \
  "SELECT id, view_name, table_name, table_type FROM db_admin.view_load_audit_map;" \
  > "$WORKDIR/all_mappings.tsv"

join -t $'\t' -1 3 -2 1 \
  <(sort -k3 "$WORKDIR/all_mappings.tsv") \
  <(sort -k1 "$WORKDIR/table_counts.tsv") \
| awk -F'\t' -v dt="$RUN_DT" '{
    printf "INSERT INTO db_admin.view_load_audit_result \
            VALUES (%s, \"%s\", \"%s\", \"%s\", %d, \"OK\", \"%s\");\n",
            $1, $2, $3, $4, $5, dt
}' > "$WORKDIR/audit_inserts.sql"

############################################
# 4. Insert result to audit table
############################################
beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true -f "$WORKDIR/audit_inserts.sql"

############################################
# 5. Final check & email notification
############################################
echo "▶︎ Checking pending entries"
beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true \
  --outputFormat=tsv2 --showHeader=false -e \
  "SELECT COUNT(*) FROM db_admin.view_load_audit_map a
   LEFT JOIN db_admin.view_load_audit_result b
   ON a.id = b.id AND a.table_name = b.table_name AND b.run_dt = '$RUN_DT'
   WHERE b.id IS NULL;" > "$WORKDIR/pending_count.txt"

PENDING=$(cat "$WORKDIR/pending_count.txt")
mail -s "[View Load Audit] $PENDING table(s) pending as of $RUN_DT" "$EMAIL_TO" <<< "There are $PENDING table(s) with no count result on $RUN_DT."

echo "\n✅ Count check completed. Inserted $(wc -l < "$WORKDIR/audit_inserts.sql") rows. Pending: $PENDING"
# rm -r "$WORKDIR"
declare -A VIEW2TABLE
declare -A TABLE2PART

tail -n +2 "$WORKDIR/view_to_table.tsv" > "$WORKDIR/view_to_table_nh.tsv"
while IFS=$'\t' read -r V T; do
  VIEW2TABLE["$V"]="$T"
done < "$WORKDIR/view_to_table_nh.tsv"

tail -n +2 "$WORKDIR/table_to_partcol.tsv" > "$WORKDIR/table_to_partcol_nh.tsv"
while IFS=$'\t' read -r T P; do
  TABLE2PART["$T"]="$P"
done < "$WORKDIR/table_to_partcol_nh.tsv"