Skip to content

[WIP] Reduce HDFS NameNode RPC on vectorized Parquet reader #50765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

pan3793
Copy link
Member

@pan3793 pan3793 commented Apr 30, 2025

What changes were proposed in this pull request?

On a busy Hadoop cluster, the GetFileInfo and GetBlockLocations contribute the most RPCs to the HDFS NameNode. After investigating the Spark Parquet vectorized reader, I think 3/4 RPCs can be reduced.

Xnip2025-04-30_16-23-27

Currently, the Parquet vectorized reader produces 4 NameNode RPCs on reading each file (or split):

  1. Read the footer - one GetFileInfo and one GetBlockLocations
  2. Read the data (row groups) - one GetFileInfo and one GetBlockLocations

The key idea of this PR is:

  1. Driver already knows the FileStatus for each Parquet file during the planning phase, we can transfer the FileStatus from the driver to the executor via PartitionFile, so that the task doesn't need to ask the NameNode again, this saves two GetFileInfo RPCs.
  2. Reuse the SeekableInputStream on reading footer and row groups, this saves one GetBlockLocations RPC.

TODO: The PR currently requires some changes on Parquet side first.

Why are the changes needed?

Reduce unnecessary RPCs of NameNode to improve performance and stability for large Hadoop clusters.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Manually tested on a Hadoop cluster, the test uses TPC-H Q4, based on sf3000 Parquet tables.

HDFS NameNode metrics (master VS. this PR)

Xnip2025-04-30_16-43-13 Xnip2025-04-30_16-43-31

HDFS NameNode audit logs:

Taking file part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet as example, the file is supposed to be split into 3 splits

$ hadoop fs -ls /warehouse/tpch_3t_hive_parquet.db/lineitem/part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet
-rwxr-xr-x   3 hadoop supergroup  283071739 2025-04-04 01:37 /warehouse/tpch_3t_hive_parquet.db/lineitem/part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet

Before

$ cat hdfs-audit.log | grep part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet | grep application_1743671377509_0064
2025-04-30 16:34:23,533 INFO FSNamesystem.audit: allowed=true	ugi=hadoop (auth:SIMPLE)	ip=/10.45.133.85	cmd=open	src=/warehouse/tpch_3t_hive_parquet.db/lineitem/part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_application_1743671377509_0064_JId_1_SId_1_0_TId_3389_0
2025-04-30 16:34:23,546 INFO FSNamesystem.audit: allowed=true	ugi=hadoop (auth:SIMPLE)	ip=/10.45.133.85	cmd=getfileinfo	src=/warehouse/tpch_3t_hive_parquet.db/lineitem/part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_application_1743671377509_0064_JId_1_SId_1_0_TId_3389_0
2025-04-30 16:34:23,547 INFO FSNamesystem.audit: allowed=true	ugi=hadoop (auth:SIMPLE)	ip=/10.45.133.85	cmd=open	src=/warehouse/tpch_3t_hive_parquet.db/lineitem/part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_application_1743671377509_0064_JId_1_SId_1_0_TId_3389_0
2025-04-30 16:34:23,560 INFO FSNamesystem.audit: allowed=true	ugi=hadoop (auth:SIMPLE)	ip=/10.45.133.86	cmd=open	src=/warehouse/tpch_3t_hive_parquet.db/lineitem/part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_application_1743671377509_0064_JId_1_SId_1_0_TId_3392_0
2025-04-30 16:34:23,585 INFO FSNamesystem.audit: allowed=true	ugi=hadoop (auth:SIMPLE)	ip=/10.45.133.86	cmd=getfileinfo	src=/warehouse/tpch_3t_hive_parquet.db/lineitem/part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_application_1743671377509_0064_JId_1_SId_1_0_TId_3392_0
2025-04-30 16:34:23,586 INFO FSNamesystem.audit: allowed=true	ugi=hadoop (auth:SIMPLE)	ip=/10.45.133.86	cmd=open	src=/warehouse/tpch_3t_hive_parquet.db/lineitem/part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_application_1743671377509_0064_JId_1_SId_1_0_TId_3392_0
2025-04-30 16:35:01,762 INFO FSNamesystem.audit: allowed=true	ugi=hadoop (auth:SIMPLE)	ip=/10.45.133.85	cmd=open	src=/warehouse/tpch_3t_hive_parquet.db/lineitem/part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_application_1743671377509_0064_JId_1_SId_1_0_TId_5328_0
2025-04-30 16:35:01,769 INFO FSNamesystem.audit: allowed=true	ugi=hadoop (auth:SIMPLE)	ip=/10.45.133.85	cmd=getfileinfo	src=/warehouse/tpch_3t_hive_parquet.db/lineitem/part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_application_1743671377509_0064_JId_1_SId_1_0_TId_5328_0
2025-04-30 16:35:01,770 INFO FSNamesystem.audit: allowed=true	ugi=hadoop (auth:SIMPLE)	ip=/10.45.133.85	cmd=open	src=/warehouse/tpch_3t_hive_parquet.db/lineitem/part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_application_1743671377509_0064_JId_1_SId_1_0_TId_5328_0

After

$ cat hdfs-audit.log | grep part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet | grep application_1743671377509_0065
2025-04-30 16:39:29,684 INFO FSNamesystem.audit: allowed=true	ugi=hadoop (auth:SIMPLE)	ip=/10.45.133.86	cmd=open	src=/warehouse/tpch_3t_hive_parquet.db/lineitem/part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_application_1743671377509_0065_JId_0_SId_0_0_TId_3387_0
2025-04-30 16:39:29,702 INFO FSNamesystem.audit: allowed=true	ugi=hadoop (auth:SIMPLE)	ip=/10.45.133.85	cmd=open	src=/warehouse/tpch_3t_hive_parquet.db/lineitem/part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_application_1743671377509_0065_JId_0_SId_0_0_TId_3388_0
2025-04-30 16:40:08,547 INFO FSNamesystem.audit: allowed=true	ugi=hadoop (auth:SIMPLE)	ip=/10.45.133.86	cmd=open	src=/warehouse/tpch_3t_hive_parquet.db/lineitem/part-01027-419c80f3-8921-4ed3-b31a-0fe72b9c6732-c000.zstd.parquet	dst=null	perm=null	proto=rpc	callerContext=SPARK_TASK_application_1743671377509_0065_JId_0_SId_0_0_TId_5342_0

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Apr 30, 2025
ParquetFileReader fileReader;
if (fileFooter.isDefined()) {
fileReader = new ParquetFileReader(configuration, file, fileFooter.get());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor internally calls HadoopInputFile.fromPath(file, configuration), which produces an unnecessary GetFileInfo RPC

  public static HadoopInputFile fromPath(Path path, Configuration conf) throws IOException {
    FileSystem fs = path.getFileSystem(conf);
    return new HadoopInputFile(fs, fs.getFileStatus(path), conf);
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant