1. Increase Shuffle Partitions
Given the size of your cluster, you can increase the shuffle partitions significantly to leverage the parallelism.
spark.conf.set("spark.sql.shuffle.partitions", 1500) // Adjust as necessary
2. Increase Executor Memory and Cores
With ample memory per node, allocate more memory to each executor to improve performance.
spark.conf.set("spark.executor.memory", "200g")
spark.conf.set("spark.executor.cores", 8) // Adjust based on your node's CPU capacity
spark.conf.set("spark.executor.instances", 90) // Adjust to use the available nodes effectively
3. Increase Driver Memory
Ensure the driver has sufficient memory to handle the coordination tasks.
spark.conf.set("spark.driver.memory", "200g")
4. Enable Dynamic Allocation
Allow Spark to dynamically allocate executor resources based on the workload.
spark.conf.set("spark.dynamicAllocation.enabled", true)
spark.conf.set("spark.dynamicAllocation.minExecutors", 50)
spark.conf.set("spark.dynamicAllocation.maxExecutors", 750) // Use all nodes if necessary
5. Optimize Broadcast Joins
Given the large memory, increase the threshold for broadcast joins.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 209715200) // 200MB or adjust as necessary
6. Use Adaptive Query Execution (AQE)
Ensure AQE is enabled to optimize query plans based on runtime statistics.
spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)
7. Use Off-Heap Memory
Utilize off-heap memory to further enhance performance.
spark.conf.set("spark.memory.offHeap.enabled", true)
spark.conf.set("spark.memory.offHeap.size", "50g")
8. Optimize Shuffle Operations
Ensure optimal shuffling by tuning the shuffle configurations.
spark.conf.set("spark.shuffle.service.enabled", true)
spark.conf.set("spark.shuffle.service.port", 7337)
spark.conf.set("spark.shuffle.compress", true)
spark.conf.set("spark.shuffle.spill.compress", true)
9. Enable In-Memory Columnar Storage
Optimize query execution by compressing and batching data in memory.
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", true)
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", 100000)
10. Enable Dynamic Partition Pruning
Improve join performance by dynamically pruning partitions.
spark.conf.set("spark.sql.dynamicPartitionPruning.enabled", true)
Example Configuration Code:
Here’s an example of how you can configure these settings in your Spark application:
// Increase shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", 1500)
// Increase executor memory and cores
spark.conf.set("spark.executor.memory", "200g")
spark.conf.set("spark.executor.cores", 8)
spark.conf.set("spark.executor.instances", 90)
// Increase driver memory
spark.conf.set("spark.driver.memory", "200g")
// Enable dynamic allocation
spark.conf.set("spark.dynamicAllocation.enabled", true)
spark.conf.set("spark.dynamicAllocation.minExecutors", 50)
spark.conf.set("spark.dynamicAllocation.maxExecutors", 750)
// Optimize broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 209715200)
// Enable Adaptive Query Execution
spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)
// Use off-heap memory
spark.conf.set("spark.memory.offHeap.enabled", true)
spark.conf.set("spark.memory.offHeap.size", "50g")
// Optimize shuffle operations
spark.conf.set("spark.shuffle.service.enabled", true)
spark.conf.set("spark.shuffle.service.port", 7337)
spark.conf.set("spark.shuffle.compress", true)
spark.conf.set("spark.shuffle.spill.compress", true)
// Enable in-memory columnar storage
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", true)
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", 100000)
// Enable dynamic partition pruning
spark.conf.set("spark.sql.dynamicPartitionPruning.enabled", true)
You can pass all the configuration settings during runtime using the --conf
flag in the spark-submit
command. Here’s an example of how you can set these configurations: