Count Check Process
#!/usr/bin/env bash ############################################################################### # build_view_load_map.sh (Optimized for Daily Run) # # Builds db_admin.view_load_audit_map: # - Avoids reloading existing entries (by id, view, table, table_type) # - Uses cached view -> table and table -> partition_column mappings ############################################################################### set -euo pipefail JDBC="jdbc:hive2://hiveserver:10000" HUSER="hive" # change as needed HPASS="hive_pwd" RUN_DT="${1:-$(date '+%Y-%m-%d')}" WORKDIR="/tmp/view_map.$$" mkdir -p "$WORKDIR" echo "\n▶︎ Starting build_view_load_map.sh (run_dt=$RUN_DT)" ######################### ## 1. Collect view data ######################### # 1A. Direct views beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" \ --showHeader=false --outputFormat=tsv2 --silent=true -e \ "SELECT id, tbl_nm FROM db.tbl_info;" \ | awk -F'\t' '{print $1"\t"$2"\tdirect\tNA\tNA\tNA\tNA"}' \ > "$WORKDIR/list_direct.tsv" # 1B. Enquery views from SQL beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" \ --showHeader=false --outputFormat=tsv2 --silent=true -e \ "SELECT id, template, query, sys_dt FROM db.query_store;" \ | while IFS=$'\t' read -r ID TEMPLATE QUERY SYS_DT; do SRC_TABLE=$(echo "$QUERY" | grep -iEo '(from|join)[[:space:]]+[`]?[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+[`]?' | head -n1 | awk '{print $NF}' | tr -d '`') VIEW_NAME=$(echo "$QUERY" | grep -iEo '(insert[[:space:]]+into)[[:space:]]+[`]?[a-zA-Z0-9_.]+[`]?' | head -n1 | awk '{print $NF}' | tr -d '`') echo -e "$ID\t$VIEW_NAME\tenquery\t$TEMPLATE\t$QUERY\t$SYS_DT\t$SRC_TABLE" done > "$WORKDIR/list_parsed.tsv" # 1C. Merge cat "$WORKDIR/list_direct.tsv" "$WORKDIR/list_parsed.tsv" > "$WORKDIR/all_views.tsv" ################################################# ## 1D. Remove already-loaded entries (dedup step) ################################################# echo "▶︎ Checking already existing entries in db_admin.view_load_audit_map" beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true \ --outputFormat=tsv2 --showHeader=false -e \ "SELECT DISTINCT id, view_name, table_name, table_type FROM db_admin.view_load_audit_map;" \ > "$WORKDIR/existing_loaded.tsv" awk -F'\t' '{print $1"|"$2"|"$3"|"$4}' "$WORKDIR/existing_loaded.tsv" > "$WORKDIR/existing_keys.txt" awk -F'\t' '{print $1"|"$2"|NA|"$3"\t"$0}' "$WORKDIR/all_views.tsv" > "$WORKDIR/all_views_with_key.tsv" grep -F -v -f "$WORKDIR/existing_keys.txt" "$WORKDIR/all_views_with_key.tsv" | cut -f2- > "$WORKDIR/all_views_final.tsv" echo " • Found $(wc -l < "$WORKDIR/existing_keys.txt") previously loaded rows" echo " • Proceeding with $(wc -l < "$WORKDIR/all_views_final.tsv") new rows only" ############################ ## 2. Cache view -> table ############################ echo "▶︎ Caching view -> table" cut -f2 "$WORKDIR/all_views_final.tsv" | sort -u > "$WORKDIR/unique_views.txt" echo -e "view_name\ttable_name" > "$WORKDIR/view_to_table.tsv" while read -r VIEW; do [[ -z "$VIEW" ]] && continue DEF=$(beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true \ --showHeader=false --outputFormat=tsv2 \ -e "SHOW CREATE TABLE $VIEW;" 2>/dev/null | tr '\n' ' ') TABLE=$(echo "$DEF" | grep -iEo '(from|join)[[:space:]]+[`]?[a-zA-Z0-9_.]+[`]?' | head -n1 | awk '{print $NF}' | tr -d '`') echo -e "$VIEW\t$TABLE" done < "$WORKDIR/unique_views.txt" >> "$WORKDIR/view_to_table.tsv" ############################ ## 3. Cache table -> partition ############################ echo "▶︎ Caching table -> partition column" cut -f2 "$WORKDIR/view_to_table.tsv" | sort -u > "$WORKDIR/unique_tables.txt" echo -e "table_name\tpartition_column" > "$WORKDIR/table_to_partcol.tsv" while read -r TABLE; do [[ -z "$TABLE" ]] && continue PART=$(beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true \ --showHeader=false --outputFormat=tsv2 \ -e "SHOW PARTITIONS $TABLE;" 2>/dev/null | head -n1) PCOL="" [[ -n "$PART" ]] && PCOL="${PART%%=*}" echo -e "$TABLE\t$PCOL" done < "$WORKDIR/unique_tables.txt" >> "$WORKDIR/table_to_partcol.tsv" ################################## ## 4. Load caches into memory ################################## declare -A VIEW2TABLE declare -A TABLE2PART tail -n +2 "$WORKDIR/view_to_table.tsv" | while IFS=$'\t' read -r V T; do VIEW2TABLE["$V"]="$T" done tail -n +2 "$WORKDIR/table_to_partcol.tsv" | while IFS=$'\t' read -r T P; do TABLE2PART["$T"]="$P" done while IFS=$'\t' read -r V T; do VIEW2TABLE["$V"]="$T" done < "$WORKDIR/view_to_table.tsv" tail -n +2 "$WORKDIR/table_to_partcol.tsv" > "$WORKDIR/table_to_partcol_nh.tsv" while IFS=$'\t' read -r T P; do TABLE2PART["$T"]="$P" done < "$WORKDIR/table_to_partcol_nh.tsv" ################################## ## 5. Final Output ################################## echo "▶︎ Writing final output" echo -e "id\tview_name\ttable_name\tpartition_column\ttable_type\ttemplate\traw_query\tsource_table\tregistered_ts\trun_dt" > "$WORKDIR/view_map.txt" while IFS=$'\t' read -r ID VIEW TYPE TEMPLATE QUERY SYS_DT SRC_TBL; do TABLE="${VIEW2TABLE[$VIEW]:-NA}" PART_COL="${TABLE2PART[$TABLE]:-NA}" printf "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n" \ "$ID" "$VIEW" "$TABLE" "$PART_COL" "$TYPE" \ "$TEMPLATE" "$QUERY" "$SRC_TBL" "$(date '+%F %T')" "$RUN_DT" \ >> "$WORKDIR/view_map.txt" done < "$WORKDIR/all_views_final.tsv" ############################ ## 6. Append New Rows Only ############################ echo "▶︎ Inserting new rows into db_admin.view_load_audit_map" while IFS=$'\t' read -r ID VIEW TABLE PART TYPE TEMPLATE QUERY SRC_TBL REG_TS RUN_DT; do CLEAN_QUERY=$(echo "$QUERY" | sed "s/'/\\'/g" | cut -c1-4000) beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true -e " INSERT INTO db_admin.view_load_audit_map VALUES ('$ID', '$VIEW', '$TABLE', '$PART', '$TYPE', '$TEMPLATE', '$CLEAN_QUERY', '$SRC_TBL', '$REG_TS', '$RUN_DT');" done < <(tail -n +2 "$WORKDIR/view_map.txt") echo "\n✅ Done. Inserted $(($(wc -l < "$WORKDIR/view_map.txt") - 1)) new rows." # rm -r "$WORKDIR"
DROP TABLE IF EXISTS db_admin.view_load_audit_map;
CREATE TABLE db_admin.view_load_audit_map (
id STRING,
view_name STRING,
table_name STRING,
partition_column STRING,
table_type STRING,
registered_ts TIMESTAMP,
run_dt DATE
);
#!/usr/bin/env bash ############################################################################### # count_check.sh (Optimized Count Checker for Audit Tables) # # - Uses parallel processing for table counts # - Sends an email summary at the end of day showing pending tables ############################################################################### set -euo pipefail JDBC="jdbc:hive2://hiveserver:10000" HUSER="hive" HPASS="hive_pwd" RUN_DT="${1:-$(date '+%Y-%m-%d')}" WORKDIR="/tmp/view_count_check.$$" mkdir -p "$WORKDIR" EMAIL_TO="[email protected]" # <-- Update your email recipient MAX_PARALLEL=10 # Number of parallel count jobs echo "\n▶︎ Starting count check for run_dt=$RUN_DT" ############################################ # 1. Get unique tables from audit map ############################################ beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true \ --outputFormat=tsv2 --showHeader=false -e \ "SELECT DISTINCT table_name, partition_column FROM db_admin.view_load_audit_map;" \ > "$WORKDIR/unique_tables.tsv" ############################################ # 2. Run count per unique table in parallel ############################################ echo -e "table_name\tcount" > "$WORKDIR/table_counts.tsv" echo "▶︎ Running counts in parallel ($MAX_PARALLEL threads)" cat "$WORKDIR/unique_tables.tsv" | grep -v "^table_name" | \ xargs -L1 -P "$MAX_PARALLEL" -I{} bash -c ' TBL=$(echo {} | cut -f1) PART=$(echo {} | cut -f2) SQL="SELECT COUNT(*) FROM $TBL" [[ "$PART" != "" && "$PART" != "NA" ]] && SQL="SELECT COUNT(*) FROM $TBL WHERE $PART = '''$RUN_DT'''" CNT=$(beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true --outputFormat=tsv2 --showHeader=false -e "$SQL" 2>/dev/null | tail -n1) CNT=${CNT:-0} echo -e "$TBL\t$CNT" >> "$WORKDIR/table_counts.tsv" ' ############################################ # 3. Map counts back to each ID/view ############################################ beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true \ --outputFormat=tsv2 --showHeader=false -e \ "SELECT id, view_name, table_name, table_type FROM db_admin.view_load_audit_map;" \ > "$WORKDIR/all_mappings.tsv" join -t $'\t' -1 3 -2 1 \ <(sort -k3 "$WORKDIR/all_mappings.tsv") \ <(sort -k1 "$WORKDIR/table_counts.tsv") \ | awk -F'\t' -v dt="$RUN_DT" '{ printf "INSERT INTO db_admin.view_load_audit_result \ VALUES (%s, \"%s\", \"%s\", \"%s\", %d, \"OK\", \"%s\");\n", $1, $2, $3, $4, $5, dt }' > "$WORKDIR/audit_inserts.sql" ############################################ # 4. Insert result to audit table ############################################ beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true -f "$WORKDIR/audit_inserts.sql" ############################################ # 5. Final check & email notification ############################################ echo "▶︎ Checking pending entries" beeline -u "$JDBC" -n "$HUSER" -p "$HPASS" --silent=true \ --outputFormat=tsv2 --showHeader=false -e \ "SELECT COUNT(*) FROM db_admin.view_load_audit_map a LEFT JOIN db_admin.view_load_audit_result b ON a.id = b.id AND a.table_name = b.table_name AND b.run_dt = '$RUN_DT' WHERE b.id IS NULL;" > "$WORKDIR/pending_count.txt" PENDING=$(cat "$WORKDIR/pending_count.txt") mail -s "[View Load Audit] $PENDING table(s) pending as of $RUN_DT" "$EMAIL_TO" <<< "There are $PENDING table(s) with no count result on $RUN_DT." echo "\n✅ Count check completed. Inserted $(wc -l < "$WORKDIR/audit_inserts.sql") rows. Pending: $PENDING" # rm -r "$WORKDIR"
declare -A VIEW2TABLE declare -A TABLE2PART tail -n +2 "$WORKDIR/view_to_table.tsv" > "$WORKDIR/view_to_table_nh.tsv" while IFS=$'\t' read -r V T; do VIEW2TABLE["$V"]="$T" done < "$WORKDIR/view_to_table_nh.tsv" tail -n +2 "$WORKDIR/table_to_partcol.tsv" > "$WORKDIR/table_to_partcol_nh.tsv" while IFS=$'\t' read -r T P; do TABLE2PART["$T"]="$P" done < "$WORKDIR/table_to_partcol_nh.tsv"