diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 64f5081cc3..8a06facccd 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -32,7 +32,7 @@ env: jobs: cargo-fmt: - runs-on: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || 'ubuntu-22.04') }} + runs-on: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || '"ubuntu-22.04"') }} steps: - name: Checkout @@ -91,6 +91,11 @@ jobs: with: repo-token: ${{ secrets.GITHUB_TOKEN }} + # Needed for hwloc + - name: Install automake (macOS) + run: brew install automake + if: runner.os == 'macOS' + # Workaround to resolve link error with C:\msys64\mingw64\bin\libclang.dll - name: Remove msys64 run: Remove-Item -LiteralPath "C:\msys64\" -Force -Recurse @@ -113,7 +118,7 @@ jobs: args: --locked --all-targets --features "runtime-benchmarks" -- -D warnings cargo-docs: - runs-on: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || 'ubuntu-22.04') }} + runs-on: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || '"ubuntu-22.04"') }} steps: - name: Checkout uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8 # @v3.1.0 @@ -170,6 +175,11 @@ jobs: with: repo-token: ${{ secrets.GITHUB_TOKEN }} + # Needed for hwloc + - name: Install automake (macOS) + run: brew install automake + if: runner.os == 'macOS' + # Workaround to resolve link error with C:\msys64\mingw64\bin\libclang.dll - name: Remove msys64 run: Remove-Item -LiteralPath "C:\msys64\" -Force -Recurse diff --git a/.github/workflows/snapshot-build.yml b/.github/workflows/snapshot-build.yml index 3a3bfff9a2..eb33cf5515 100644 --- a/.github/workflows/snapshot-build.yml +++ b/.github/workflows/snapshot-build.yml @@ -18,7 +18,7 @@ env: jobs: container-linux: - runs-on: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || 'ubuntu-22.04') }} + runs-on: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || '"ubuntu-22.04"') }} permissions: contents: write packages: write @@ -84,35 +84,35 @@ jobs: strategy: matrix: build: - - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || 'ubuntu-20.04') }} + - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || '"ubuntu-20.04"') }} target: x86_64-unknown-linux-gnu suffix: ubuntu-x86_64-skylake-${{ github.ref_name }} rustflags: "-C target-cpu=skylake" - - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || 'ubuntu-20.04') }} + - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || '"ubuntu-20.04"') }} target: x86_64-unknown-linux-gnu suffix: ubuntu-x86_64-v2-${{ github.ref_name }} rustflags: "-C target-cpu=x86-64-v2 -C target-feature=+aes" - - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || 'ubuntu-20.04') }} + - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "ubuntu-20.04-x86-64"]' || '"ubuntu-20.04"') }} target: aarch64-unknown-linux-gnu suffix: ubuntu-aarch64-${{ github.ref_name }} # TODO: AES flag is such that we have decent performance on ARMv8, remove once `aes` crate bumps MSRV to # at least 1.61: https://github.com/RustCrypto/block-ciphers/issues/373 rustflags: "-C linker=aarch64-linux-gnu-gcc --cfg aes_armv8" - - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "macos-12-arm64"]' || 'macos-12') }} + - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "macos-12-arm64"]' || '"macos-12"') }} target: aarch64-apple-darwin suffix: macos-aarch64-${{ github.ref_name }} # TODO: AES flag is such that we have decent performance on ARMv8, remove once `aes` crate bumps MSRV to # at least 1.61: https://github.com/RustCrypto/block-ciphers/issues/373 rustflags: "--cfg aes_armv8" - - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "macos-12-arm64"]' || 'macos-12') }} + - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "macos-12-arm64"]' || '"macos-12"') }} target: x86_64-apple-darwin suffix: macos-x86_64-${{ github.ref_name }} rustflags: "" - - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "windows-server-2022-x86-64"]' || 'windows-2022') }} + - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "windows-server-2022-x86-64"]' || '"windows-2022"') }} target: x86_64-pc-windows-msvc suffix: windows-x86_64-skylake-${{ github.ref_name }} rustflags: "-C target-cpu=skylake" - - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "windows-server-2022-x86-64"]' || 'windows-2022') }} + - os: ${{ fromJson(github.repository_owner == 'subspace' && '["self-hosted", "windows-server-2022-x86-64"]' || '"windows-2022"') }} target: x86_64-pc-windows-msvc suffix: windows-x86_64-v2-${{ github.ref_name }} rustflags: "-C target-cpu=x86-64-v2 -C target-feature=+aes" @@ -148,6 +148,11 @@ jobs: with: repo-token: ${{ secrets.GITHUB_TOKEN }} + # Needed for hwloc + - name: Install automake (macOS) + run: brew install automake + if: runner.os == 'macOS' + # Workaround to resolve link error with C:\msys64\mingw64\bin\libclang.dll - name: Remove msys64 run: Remove-Item -LiteralPath "C:\msys64\" -Force -Recurse @@ -156,14 +161,49 @@ jobs: continue-on-error: true - name: AArch64 cross-compile packages - run: sudo apt-get update && sudo apt-get install -y --no-install-recommends g++-aarch64-linux-gnu gcc-aarch64-linux-gnu libc6-dev-arm64-cross + run: | + FLAVOR="$(lsb_release -sc)" + + sudo tee /etc/apt/sources.list.d/arm64.list <> $GITHUB_ENV if: matrix.build.target == 'aarch64-unknown-linux-gnu' - - name: Build farmer + - name: Build farmer (Linux and Windows) uses: actions-rs/cargo@ae10961054e4aa8b4aa7dffede299aaf087aa33b # @v1.0.1 with: command: build args: --locked -Z build-std --target ${{ matrix.build.target }} --profile production --bin subspace-farmer + if: runner.os != 'macOS' + + # We build macOS without `numa` feature, primarily because of https://github.com/HadrienG2/hwlocality/issues/31 + - name: Build farmer (macOS) + uses: actions-rs/cargo@ae10961054e4aa8b4aa7dffede299aaf087aa33b # @v1.0.1 + with: + command: build + args: --locked -Z build-std --target ${{ matrix.build.target }} --profile production --bin subspace-farmer --no-default-features + if: runner.os == 'macOS' - name: Build node uses: actions-rs/cargo@ae10961054e4aa8b4aa7dffede299aaf087aa33b # @v1.0.1 diff --git a/Cargo.lock b/Cargo.lock index 4421fc5bcc..41b02a582e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1044,6 +1044,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "autotools" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aef8da1805e028a172334c3b680f93e71126f2327622faef2ec3d893c0a4ad77" +dependencies = [ + "cc", +] + [[package]] name = "backoff" version = "0.4.0" @@ -1662,6 +1671,15 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd7cc57abe963c6d3b9d8be5b06ba7c8957a930305ca90304f24ef040aa6f961" +[[package]] +name = "cmake" +version = "0.1.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" +dependencies = [ + "cc", +] + [[package]] name = "codespan-reporting" version = "0.11.1" @@ -2119,6 +2137,12 @@ dependencies = [ "cipher 0.4.4", ] +[[package]] +name = "cty" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b365fabc795046672053e29c954733ec3b05e4be654ab130fe8f1f94d7051f35" + [[package]] name = "curve25519-dalek" version = "2.1.3" @@ -3773,7 +3797,7 @@ source = "git+https://github.com/subspace/frontier?rev=37ee45323120b21adc1d69ae7 dependencies = [ "evm", "frame-support", - "num_enum", + "num_enum 0.6.1", "parity-scale-codec", "scale-info", "serde", @@ -4762,6 +4786,35 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "hwlocality" +version = "1.0.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6020affad7f95b46f12607a8714aa70bd02c8df3b3abf9ef5c8cd2f7ae57a033" +dependencies = [ + "arrayvec 0.7.4", + "bitflags 2.4.0", + "derive_more", + "errno", + "hwlocality-sys", + "libc", + "num_enum 0.7.1", + "thiserror", +] + +[[package]] +name = "hwlocality-sys" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "381b203a23b41c29be64454e9cee8d16360606d7e871f5d22532796b6095f164" +dependencies = [ + "autotools", + "cmake", + "libc", + "pkg-config", + "windows-sys 0.52.0", +] + [[package]] name = "hyper" version = "0.14.27" @@ -5358,6 +5411,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3979b5c37ece694f1f5e51e7ecc871fdb0f517ed04ee45f88d15d6d553cb9664" dependencies = [ "cc", + "cty", "libc", ] @@ -7103,7 +7157,16 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a015b430d3c108a207fd776d2e2196aaf8b1cf8cf93253e3a097ff3085076a1" dependencies = [ - "num_enum_derive", + "num_enum_derive 0.6.1", +] + +[[package]] +name = "num_enum" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683751d591e6d81200c39fb0d1032608b77724f34114db54f571ff1317b337c0" +dependencies = [ + "num_enum_derive 0.7.1", ] [[package]] @@ -7118,6 +7181,17 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "num_enum_derive" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c11e44798ad209ccdd91fc192f0526a369a01234f7373e1b141c96d7cee4f0e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "object" version = "0.30.4" @@ -11704,9 +11778,13 @@ dependencies = [ "fs4 0.7.0", "futures", "hex", + "hwlocality", "jsonrpsee", + "libc", + "libmimalloc-sys", "lru 0.11.1", "mimalloc", + "num_cpus", "parity-scale-codec", "parking_lot 0.12.1", "prometheus-client 0.22.0", @@ -11733,6 +11811,7 @@ dependencies = [ "tracing", "tracing-subscriber 0.3.17", "ulid", + "windows-sys 0.52.0", "zeroize", ] @@ -13895,6 +13974,15 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-targets" version = "0.42.2" @@ -13925,6 +14013,21 @@ dependencies = [ "windows_x86_64_msvc 0.48.5", ] +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" @@ -13937,6 +14040,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + [[package]] name = "windows_aarch64_msvc" version = "0.42.2" @@ -13949,6 +14058,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + [[package]] name = "windows_i686_gnu" version = "0.42.2" @@ -13961,6 +14076,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + [[package]] name = "windows_i686_msvc" version = "0.42.2" @@ -13973,6 +14094,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + [[package]] name = "windows_x86_64_gnu" version = "0.42.2" @@ -13985,6 +14112,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" @@ -13997,6 +14130,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + [[package]] name = "windows_x86_64_msvc" version = "0.42.2" @@ -14009,6 +14148,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + [[package]] name = "winnow" version = "0.5.15" diff --git a/Dockerfile-bootstrap-node b/Dockerfile-bootstrap-node index b5c9e62f6b..85e5109e7f 100644 --- a/Dockerfile-bootstrap-node +++ b/Dockerfile-bootstrap-node @@ -19,7 +19,9 @@ RUN \ git \ llvm \ clang \ - cmake \ + automake \ + libtool \ + pkg-config \ make && \ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain $RUSTC_VERSION diff --git a/Dockerfile-bootstrap-node.aarch64 b/Dockerfile-bootstrap-node.aarch64 index e338397b70..6b0902da88 100644 --- a/Dockerfile-bootstrap-node.aarch64 +++ b/Dockerfile-bootstrap-node.aarch64 @@ -19,7 +19,9 @@ RUN \ git \ llvm \ clang \ - cmake \ + automake \ + libtool \ + pkg-config \ make && \ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain $RUSTC_VERSION @@ -39,6 +41,7 @@ COPY test /code/test # Up until this line all Rust images in this repo should be the same to share the same layers ENV RUSTFLAGS="${RUSTFLAGS} -C linker=aarch64-linux-gnu-gcc" +ENV PKG_CONFIG_ALLOW_CROSS=true # Dependencies necessary for successful cross-compilation RUN \ diff --git a/Dockerfile-farmer b/Dockerfile-farmer index 186be74565..dd32f45ab0 100644 --- a/Dockerfile-farmer +++ b/Dockerfile-farmer @@ -19,7 +19,9 @@ RUN \ git \ llvm \ clang \ - cmake \ + automake \ + libtool \ + pkg-config \ make && \ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain $RUSTC_VERSION diff --git a/Dockerfile-farmer.aarch64 b/Dockerfile-farmer.aarch64 index 42a0dc0e33..11046baa2c 100644 --- a/Dockerfile-farmer.aarch64 +++ b/Dockerfile-farmer.aarch64 @@ -19,7 +19,9 @@ RUN \ git \ llvm \ clang \ - cmake \ + automake \ + libtool \ + pkg-config \ make && \ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain $RUSTC_VERSION @@ -39,6 +41,7 @@ COPY test /code/test # Up until this line all Rust images in this repo should be the same to share the same layers ENV RUSTFLAGS="${RUSTFLAGS} -C linker=aarch64-linux-gnu-gcc" +ENV PKG_CONFIG_ALLOW_CROSS=true # Dependencies necessary for successful cross-compilation RUN \ diff --git a/Dockerfile-node b/Dockerfile-node index dd50db59ae..a3775ac1fa 100644 --- a/Dockerfile-node +++ b/Dockerfile-node @@ -19,7 +19,9 @@ RUN \ git \ llvm \ clang \ - cmake \ + automake \ + libtool \ + pkg-config \ make && \ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain $RUSTC_VERSION diff --git a/Dockerfile-node.aarch64 b/Dockerfile-node.aarch64 index 55693d3816..0dd70f8822 100644 --- a/Dockerfile-node.aarch64 +++ b/Dockerfile-node.aarch64 @@ -19,7 +19,9 @@ RUN \ git \ llvm \ clang \ - cmake \ + automake \ + libtool \ + pkg-config \ make && \ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain $RUSTC_VERSION @@ -39,6 +41,7 @@ COPY test /code/test # Up until this line all Rust images in this repo should be the same to share the same layers ENV RUSTFLAGS="${RUSTFLAGS} -C linker=aarch64-linux-gnu-gcc" +ENV PKG_CONFIG_ALLOW_CROSS=true # Dependencies necessary for successful cross-compilation RUN \ diff --git a/Dockerfile-runtime b/Dockerfile-runtime index 6be061e62c..3bd02e65fa 100644 --- a/Dockerfile-runtime +++ b/Dockerfile-runtime @@ -14,11 +14,14 @@ RUN \ apt-get update && \ DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ ca-certificates \ + protobuf-compiler \ curl \ git \ llvm \ clang \ - cmake \ + automake \ + libtool \ + pkg-config \ make && \ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain $RUSTC_VERSION diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index e5a646f2c6..c650aacb34 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -26,7 +26,7 @@ use subspace_core_primitives::{ use subspace_erasure_coding::ErasureCoding; use subspace_proof_of_space::{Table, TableGenerator}; use thiserror::Error; -use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore}; +use tokio::sync::{AcquireError, Semaphore}; use tokio::task::yield_now; use tracing::{debug, trace, warn}; @@ -230,6 +230,11 @@ where table_generator, } = options; + let _downloading_permit = match downloading_semaphore { + Some(downloading_semaphore) => Some(downloading_semaphore.acquire_owned().await?), + None => None, + }; + let download_sector_fut = download_sector(DownloadSectorOptions { public_key, sector_index, @@ -238,9 +243,13 @@ where farmer_protocol_info, kzg, pieces_in_sector, - downloading_semaphore, }); + let _encoding_permit = match encoding_semaphore { + Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await?), + None => None, + }; + encode_sector( download_sector_fut.await?, EncodeSectorOptions:: { @@ -249,7 +258,6 @@ where pieces_in_sector, sector_output, sector_metadata_output, - encoding_semaphore, table_generator, }, ) @@ -262,7 +270,6 @@ pub struct DownloadedSector { piece_indices: Vec, raw_sector: RawSector, farmer_protocol_info: FarmerProtocolInfo, - downloading_permit: Option, } /// Options for sector downloading @@ -281,9 +288,6 @@ pub struct DownloadSectorOptions<'a, PG> { pub kzg: &'a Kzg, /// How many pieces should sector contain pub pieces_in_sector: u16, - /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory - /// usage of the plotting process, permit will be held until the end of the plotting process - pub downloading_semaphore: Option>, } /// Download sector for plotting. @@ -304,14 +308,8 @@ where farmer_protocol_info, kzg, pieces_in_sector, - downloading_semaphore, } = options; - let downloading_permit = match downloading_semaphore { - Some(downloading_semaphore) => Some(downloading_semaphore.acquire_owned().await?), - None => None, - }; - let sector_id = SectorId::new(public_key.hash(), sector_index); let piece_indices = (PieceOffset::ZERO..) @@ -374,7 +372,6 @@ where piece_indices, raw_sector: raw_sector.into_inner(), farmer_protocol_info, - downloading_permit, }) } @@ -399,9 +396,6 @@ where /// Where plotted sector metadata should be written, vector must either be empty (in which case /// it'll be resized to correct size automatically) or correctly sized from the beginning pub sector_metadata_output: &'a mut Vec, - /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically - /// allow one permit at a time for efficient CPU utilization - pub encoding_semaphore: Option<&'a Semaphore>, /// Proof of space table generator pub table_generator: &'a mut PosTable::Generator, } @@ -418,7 +412,6 @@ where piece_indices, mut raw_sector, farmer_protocol_info, - downloading_permit: _downloading_permit, } = downloaded_sector; let EncodeSectorOptions { sector_index, @@ -426,7 +419,6 @@ where pieces_in_sector, sector_output, sector_metadata_output, - encoding_semaphore, table_generator, } = encoding_options; @@ -452,11 +444,6 @@ where }); } - let _encoding_permit = match encoding_semaphore { - Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await?), - None => None, - }; - let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector); let mut chunks_scratch = Vec::with_capacity(Record::NUM_S_BUCKETS); diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index d9e1131ef2..ef7317bfe3 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -28,9 +28,12 @@ fdlimit = "0.3.0" fs4 = "0.7.0" futures = "0.3.29" hex = { version = "0.4.3", features = ["serde"] } +hwlocality = { version = "1.0.0-alpha.1", features = ["vendored"], optional = true } jsonrpsee = { version = "0.16.3", features = ["client"] } lru = "0.11.0" mimalloc = "0.1.39" +libmimalloc-sys = { version = "0.1.35", features = ["extended"] } +num_cpus = "1.16.0" parity-scale-codec = "3.6.5" parking_lot = "0.12.1" prometheus-client = "0.22.0" @@ -58,3 +61,13 @@ tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } ulid = { version = "1.0.0", features = ["serde"] } zeroize = "1.6.0" + +[target.'cfg(not(windows))'.dependencies] +libc = "0.2.146" + +[target.'cfg(windows)'.dependencies] +windows-sys = "0.52.0" + +[features] +default = ["numa"] +numa = ["dep:hwlocality"] diff --git a/crates/subspace-farmer/README.md b/crates/subspace-farmer/README.md index ff25e36f10..be6e8c2f53 100644 --- a/crates/subspace-farmer/README.md +++ b/crates/subspace-farmer/README.md @@ -2,41 +2,50 @@ Reference implementation of Subspace Farmer for Subspace Network Blockchain. -## Overview -**Notes:** The code is un-audited and not production ready, use it at your own risk. +## Running -This repo is an implementation of a Farmer for [Subspace Network Blockchain](https://subspace.network). +It is recommended to follow general farming instructions that explain how to run both farmer and node together. -Subspace is a proof-of-storage blockchain that resolves the farmer's dilemma, to learn more read our [white paper](https://drive.google.com/file/d/1v847u_XeVf0SBz7Y7LEMXi72QfqirstL/view). +## Build from source -## Some Notes on Plotting +Rust toolchain is expected to be installed for anything in this repository to compile, but there are some extra dependencies for farmer specifically. -### Time to Plot +`automake`,`libtool` and `pkg-config` on Linux/macOS or CMake on Windows for `hwlocality-sys` (if `numa` features is enabled, it is by default), also LLVM/Clang is necessary. -Plotting time is roughly linear with respect to number of cores and clock speed of the host system. On average, it takes ~ 1 minute to create a 1GB plot or 18 hours to to create a 1TB plot, though these numbers will depend on the system used. This is largely independent of the storage media used (i.e. HDD, SATA SSD, NVME SSD) as it is largely a CPU-bound task. +### Ubuntu -### Storage Overhead +```bash +sudo apt-get install automake libtool pkg-config llvm clang +``` -In addition to the plot a small Binary Search Tree (BST) is also stored on disk using RocksDB, which has roughly 1% of the storage size. -Due to current implementation two of such databases might be stored at once, though this will improve in the future. -There are also some supplementary database mappings. +### macOS -So creating a 1GB plot should actually consume about 1.03 GB of storage. -Plot size parameter specified in farming command accounts for this overhead, so you don't need to worry about implementation details. +1. Install via Homebrew: -## Running +```bash +brew install automake libtool llvm@15 clang +``` -It is recommended to follow general farming instructions that explain how to run both farmer and node together. +2. Add `llvm` to your `~/.zshrc` or `~/.bashrc`: -## Build from source +```bash +export PATH="/opt/homebrew/opt/llvm@15/bin:$PATH" +``` -Rust toolchain is expected to be installed for anything in this repository to compile, but there are some extra dependencies for farmer specifically. +3. Activate the changes: -Prost library from libp2p dependency needs CMake, also LLVM/Clang is necessary: ```bash -sudo apt-get install llvm clang cmake +source ~/.zshrc ``` +4. Verify that `llvm` is installed: + +```bash +llvm-config --version +``` + +### Build + Then build the farmer using Cargo: ``` cargo build --profile production --bin subspace-farmer @@ -83,28 +92,3 @@ target/production/subspace-farmer wipe /path/to/farm ``` This would wipe plots in the OS-specific users local data directory. - -## Architecture - -The farmer typically runs two processes in parallel: plotting and farming. - -### Plotting - -Think of it as the following pipeline: - -1. [Farmer receives new blocks from the blockchain](src/archiving.rs) -2. [Archives each of them](src/archiving.rs) -3. [Encodes each archived piece by applying the time-asymmetric SLOTH permutation as `encode(genesis_piece, farmer_public_key_hash, plot_index)`](src/single_plot_farm) -4. [Each encoding is written to the disk](src/single_plot_farm.rs) -3. [A commitment, or tag, to each encoding is created as `hmac(encoding, salt)` and stored within a binary search tree (BST)](src/single_plot_farm). - -This process currently takes ~ 36 hours per TiB on a quad-core machine, but for 1 GiB plotting should take between a few seconds and a few minutes. - -### [Farming](src/farming.rs) - -1. Connect to a client and subscribe to `slot_notifications` via JSON-RPC. -2. Given a global challenge as `hash(randomness || slot_index)` and `SOLUTION_RANGE`. -3. Derive local challenge as `hash(global_challenge || farmer_public_key_hash)`. -4. Query the BST for the nearest tag to the local challenge. -5. If it within `SOLUTION_RANGE` return a `SOLUTION` else return `None` -6. All the above can and will happen in parallel to plotting process, so it is possible to participate right away diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index a7a9e5753d..af1fea4333 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -28,7 +28,10 @@ use subspace_farmer::utils::farmer_piece_getter::FarmerPieceGetter; use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator; use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces; use subspace_farmer::utils::ss58::parse_ss58_reward_address; -use subspace_farmer::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop}; +use subspace_farmer::utils::{ + all_cpu_cores, create_tokio_thread_pool_manager_for_pinned_nodes, + run_future_in_dedicated_thread, thread_pool_core_indices, AsyncJoinOnDrop, +}; use subspace_farmer::{Identity, NodeClient, NodeRpcClient}; use subspace_farmer_components::plotting::PlottedSector; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; @@ -43,23 +46,12 @@ use zeroize::Zeroizing; const RECORDS_ROOTS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1_000_000).expect("Not zero; qed"); -fn available_parallelism() -> usize { - match std::thread::available_parallelism() { - Ok(parallelism) => parallelism.get(), - Err(error) => { - warn!( - %error, - "Unable to identify available parallelism, you might want to configure thread pool sizes with CLI \ - options manually" - ); - - 0 - } - } -} - fn should_farm_during_initial_plotting() -> bool { - available_parallelism() > 8 + let total_cpu_cores = all_cpu_cores() + .iter() + .flat_map(|set| set.cpu_cores()) + .count(); + total_cpu_cores > 8 } /// Arguments for farmer @@ -116,42 +108,46 @@ pub(crate) struct FarmingArgs { #[arg(long, alias = "metrics-endpoint")] metrics_endpoints: Vec, /// Defines how many sectors farmer will download concurrently, allows to limit memory usage of - /// the plotting process, increasing beyond 2 makes practical sense due to limited networking - /// concurrency and will likely result in slower plotting overall - #[arg(long, default_value = "2")] - sector_downloading_concurrency: NonZeroUsize, - /// Defines how many sectors farmer will encode concurrently, should generally never be set to - /// more than 1 because it will most likely result in slower plotting overall - #[arg(long, default_value = "1")] - sector_encoding_concurrency: NonZeroUsize, - /// Allows to enable farming during initial plotting. Not used by default because plotting is so - /// intense on CPU and memory that farming will likely not work properly, yet it will - /// significantly impact plotting speed, delaying the time when farming can actually work - /// properly. + /// the plotting process, defaults to `--sector-downloading-concurrency` + 1 to download future + /// sector ahead of time + #[arg(long)] + sector_downloading_concurrency: Option, + /// Defines how many sectors farmer will encode concurrently, defaults to 1 on UMA system and + /// number of NUMA nodes on NUMA system. It is further restricted by + /// `--sector-downloading-concurrency` and setting this option higher than + /// `--sector-downloading-concurrency` will have no effect. + #[arg(long)] + sector_encoding_concurrency: Option, + /// Allows to enable farming during initial plotting. Not used by default on machines with 8 or + /// less logical cores because plotting is so intense on CPU and memory that farming will likely + /// not work properly, yet it will significantly impact plotting speed, delaying the time when + /// farming can actually start properly. #[arg(long, default_value_t = should_farm_during_initial_plotting(), action = clap::ArgAction::Set)] farm_during_initial_plotting: bool, /// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some - /// compute-intensive operations during proving), defaults to number of CPU cores available in - /// the system - #[arg(long, default_value_t = available_parallelism())] - farming_thread_pool_size: usize, - /// Size of PER FARM thread pool used for plotting, defaults to number of CPU cores available - /// in the system. + /// compute-intensive operations during proving), defaults to number of logical CPUs + /// available on UMA system and number of logical CPUs in first NUMA node on NUMA system + #[arg(long)] + farming_thread_pool_size: Option, + /// Size of one thread pool used for plotting, defaults to number of logical CPUs available + /// on UMA system and number of logical CPUs available in NUMA node on NUMA system. + /// + /// Number of thread pools is defined by `--sector-encoding-concurrency` option, different + /// thread pools might have different number of threads if NUMA nodes do not have the same size. + /// + /// Threads will be pinned to corresponding CPU cores at creation. + #[arg(long)] + plotting_thread_pool_size: Option, + /// Size of one thread pool used for replotting, typically smaller pool than for plotting + /// to not affect farming as much, defaults to half of the number of logical CPUs available on + /// UMA system and number of logical CPUs available in NUMA node on NUMA system. /// - /// NOTE: The fact that this parameter is per farm doesn't mean farmer will plot multiple - /// sectors concurrently, see `--sector-downloading-concurrency` and - /// `--sector-encoding-concurrency` options. - #[arg(long, default_value_t = available_parallelism())] - plotting_thread_pool_size: usize, - /// Size of PER FARM thread pool used for replotting, typically smaller pool than for plotting - /// to not affect farming as much, defaults to half of the number of CPU cores available in the - /// system. + /// Number of thread pools is defined by `--sector-encoding-concurrency` option, different + /// thread pools might have different number of threads if NUMA nodes do not have the same size. /// - /// NOTE: The fact that this parameter is per farm doesn't mean farmer will replot multiple - /// sectors concurrently, see `--sector-downloading-concurrency` and - /// `--sector-encoding-concurrency` options. - #[arg(long, default_value_t = available_parallelism() / 2)] - replotting_thread_pool_size: usize, + /// Threads will be pinned to corresponding CPU cores at creation. + #[arg(long)] + replotting_thread_pool_size: Option, } fn cache_percentage_parser(s: &str) -> anyhow::Result { @@ -437,8 +433,67 @@ where None => farmer_app_info.protocol_info.max_pieces_in_sector, }; - let downloading_semaphore = Arc::new(Semaphore::new(sector_downloading_concurrency.get())); - let encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get())); + let plotting_thread_pool_core_indices = + thread_pool_core_indices(plotting_thread_pool_size, sector_encoding_concurrency); + let replotting_thread_pool_core_indices = { + let mut replotting_thread_pool_core_indices = + thread_pool_core_indices(replotting_thread_pool_size, sector_encoding_concurrency); + if replotting_thread_pool_size.is_none() { + // The default behavior is to use all CPU cores, but for replotting we just want half + replotting_thread_pool_core_indices + .iter_mut() + .for_each(|set| set.truncate(set.cpu_cores().len() / 2)); + } + replotting_thread_pool_core_indices + }; + + let downloading_semaphore = Arc::new(Semaphore::new( + sector_downloading_concurrency + .map(|sector_downloading_concurrency| sector_downloading_concurrency.get()) + .unwrap_or(plotting_thread_pool_core_indices.len() + 1), + )); + + let all_cpu_cores = all_cpu_cores(); + let plotting_thread_pool_manager = create_tokio_thread_pool_manager_for_pinned_nodes( + "plotting", + plotting_thread_pool_core_indices, + )?; + let replotting_thread_pool_manager = create_tokio_thread_pool_manager_for_pinned_nodes( + "replotting", + replotting_thread_pool_core_indices, + )?; + let farming_thread_pool_size = farming_thread_pool_size + .map(|farming_thread_pool_size| farming_thread_pool_size.get()) + .unwrap_or_else(|| { + all_cpu_cores + .first() + .expect("Not empty according to function description; qed") + .cpu_cores() + .len() + }); + + if all_cpu_cores.len() > 1 { + info!(numa_nodes = %all_cpu_cores.len(), "NUMA system detected"); + + if all_cpu_cores.len() < disk_farms.len() { + warn!( + numa_nodes = %all_cpu_cores.len(), + farms_count = %disk_farms.len(), + "Too few disk farms, CPU will not be utilized fully during plotting, same number of farms as NUMA \ + nodes or more is recommended" + ); + } + } + + // TODO: Remove code or environment variable once identified whether it helps or not + if std::env::var("NUMA_ALLOCATOR").is_ok() && all_cpu_cores.len() > 1 { + unsafe { + libmimalloc_sys::mi_option_set( + libmimalloc_sys::mi_option_use_numa_nodes, + all_cpu_cores.len() as std::ffi::c_long, + ); + } + } let mut plotting_delay_senders = Vec::with_capacity(disk_farms.len()); @@ -461,11 +516,10 @@ where piece_getter: piece_getter.clone(), cache_percentage, downloading_semaphore: Arc::clone(&downloading_semaphore), - encoding_semaphore: Arc::clone(&encoding_semaphore), farm_during_initial_plotting, farming_thread_pool_size, - plotting_thread_pool_size, - replotting_thread_pool_size, + plotting_thread_pool_manager: plotting_thread_pool_manager.clone(), + replotting_thread_pool_manager: replotting_thread_pool_manager.clone(), plotting_delay: Some(plotting_delay_receiver), }, disk_farm_index, diff --git a/crates/subspace-farmer/src/lib.rs b/crates/subspace-farmer/src/lib.rs index 691ad98c60..126cff7afa 100644 --- a/crates/subspace-farmer/src/lib.rs +++ b/crates/subspace-farmer/src/lib.rs @@ -40,6 +40,7 @@ pub mod node_client; pub mod piece_cache; pub mod reward_signing; pub mod single_disk_farm; +pub mod thread_pool_manager; pub mod utils; /// Size of the LRU cache for peers. diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 79a5e82bc1..6e73b95279 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -17,6 +17,7 @@ pub use crate::single_disk_farm::plotting::PlottingError; use crate::single_disk_farm::plotting::{ plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions, }; +use crate::thread_pool_manager::ThreadPoolManager; use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop}; use crate::KNOWN_PEERS_CACHE_SIZE; use async_lock::RwLock; @@ -281,19 +282,16 @@ pub struct SingleDiskFarmOptions { /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory /// usage of the plotting process, permit will be held until the end of the plotting process pub downloading_semaphore: Arc, - /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically - /// allow one permit at a time for efficient CPU utilization - pub encoding_semaphore: Arc, /// Whether to farm during initial plotting pub farm_during_initial_plotting: bool, /// Thread pool size used for farming (mostly for blocking I/O, but also for some /// compute-intensive operations during proving) pub farming_thread_pool_size: usize, - /// Thread pool size used for plotting - pub plotting_thread_pool_size: usize, - /// Thread pool size used for replotting, typically smaller pool than for plotting to not affect - /// farming as much - pub replotting_thread_pool_size: usize, + /// Thread pool manager used for plotting + pub plotting_thread_pool_manager: ThreadPoolManager, + /// Thread pool manager used for replotting, typically smaller pool than for plotting to not + /// affect farming as much + pub replotting_thread_pool_manager: ThreadPoolManager, /// Notification for plotter to start, can be used to delay plotting until some initialization /// has happened externally pub plotting_delay: Option>, @@ -625,10 +623,9 @@ impl SingleDiskFarm { erasure_coding, cache_percentage, downloading_semaphore, - encoding_semaphore, farming_thread_pool_size, - plotting_thread_pool_size, - replotting_thread_pool_size, + plotting_thread_pool_manager, + replotting_thread_pool_manager, plotting_delay, farm_during_initial_plotting, } = options; @@ -913,50 +910,6 @@ impl SingleDiskFarm { move || { let _span_guard = span.enter(); - let plotting_thread_pool = match ThreadPoolBuilder::new() - .thread_name(move |thread_index| { - format!("plotting-{disk_farm_index}.{thread_index}") - }) - .num_threads(plotting_thread_pool_size) - .spawn_handler(tokio_rayon_spawn_handler()) - .build() - .map_err(PlottingError::FailedToCreateThreadPool) - { - Ok(thread_pool) => thread_pool, - Err(error) => { - if let Some(error_sender) = error_sender.lock().take() { - if let Err(error) = error_sender.send(error.into()) { - error!( - %error, - "Plotting failed to send error to background task", - ); - } - } - return; - } - }; - let replotting_thread_pool = match ThreadPoolBuilder::new() - .thread_name(move |thread_index| { - format!("replotting-{disk_farm_index}.{thread_index}") - }) - .num_threads(replotting_thread_pool_size) - .spawn_handler(tokio_rayon_spawn_handler()) - .build() - .map_err(PlottingError::FailedToCreateThreadPool) - { - Ok(thread_pool) => thread_pool, - Err(error) => { - if let Some(error_sender) = error_sender.lock().take() { - if let Err(error) = error_sender.send(error.into()) { - error!( - %error, - "Plotting failed to send error to background task", - ); - } - } - return; - } - }; let plotting_options = PlottingOptions { public_key, @@ -975,9 +928,8 @@ impl SingleDiskFarm { modifying_sector_index, sectors_to_plot_receiver, downloading_semaphore, - encoding_semaphore: &encoding_semaphore, - plotting_thread_pool, - replotting_thread_pool, + plotting_thread_pool_manager, + replotting_thread_pool_manager, stop_receiver: &mut stop_receiver.resubscribe(), }; diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 85602d4f7d..7aa4bea139 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -2,6 +2,7 @@ use crate::single_disk_farm::{ BackgroundTaskError, Handlers, PlotMetadataHeader, SectorPlottingDetails, RESERVED_PLOT_METADATA, }; +use crate::thread_pool_manager::ThreadPoolManager; use crate::utils::AsyncJoinOnDrop; use crate::{node_client, NodeClient}; use async_lock::RwLock; @@ -10,7 +11,6 @@ use futures::channel::{mpsc, oneshot}; use futures::{select, FutureExt, SinkExt, StreamExt}; use lru::LruCache; use parity_scale_codec::Encode; -use rayon::{ThreadPool, ThreadPoolBuildError}; use std::collections::HashMap; use std::fs::File; use std::io; @@ -36,7 +36,8 @@ use subspace_farmer_components::sector::SectorMetadataChecksummed; use subspace_proof_of_space::Table; use thiserror::Error; use tokio::runtime::Handle; -use tokio::sync::{broadcast, Semaphore}; +use tokio::sync::{broadcast, OwnedSemaphorePermit, Semaphore}; +use tokio::task::yield_now; use tracing::{debug, info, trace, warn, Instrument}; const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500); @@ -91,9 +92,6 @@ pub enum PlottingError { /// I/O error occurred #[error("I/O error: {0}")] Io(#[from] io::Error), - /// Failed to create thread pool - #[error("Failed to create thread pool: {0}")] - FailedToCreateThreadPool(#[from] ThreadPoolBuildError), /// Background downloading panicked #[error("Background downloading panicked")] BackgroundDownloadingPanicked, @@ -118,11 +116,8 @@ pub(super) struct PlottingOptions<'a, NC, PG> { /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory /// usage of the plotting process, permit will be held until the end of the plotting process pub(crate) downloading_semaphore: Arc, - /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically - /// allow one permit at a time for efficient CPU utilization - pub(crate) encoding_semaphore: &'a Semaphore, - pub(super) plotting_thread_pool: ThreadPool, - pub(super) replotting_thread_pool: ThreadPool, + pub(super) plotting_thread_pool_manager: ThreadPoolManager, + pub(super) replotting_thread_pool_manager: ThreadPoolManager, pub(super) stop_receiver: &'a mut broadcast::Receiver<()>, } @@ -155,16 +150,16 @@ where modifying_sector_index, mut sectors_to_plot_receiver, downloading_semaphore, - encoding_semaphore, - plotting_thread_pool, - replotting_thread_pool, + plotting_thread_pool_manager, + replotting_thread_pool_manager, stop_receiver, } = plotting_options; let mut table_generator = PosTable::generator(); - let mut maybe_next_downloaded_sector_fut = - None::>>; + let mut maybe_next_downloaded_sector_fut = None::< + AsyncJoinOnDrop>, + >; while let Some(sector_to_plot) = sectors_to_plot_receiver.next().await { let SectorToPlot { sector_index, @@ -227,12 +222,17 @@ where break farmer_app_info; }; - let downloaded_sector = + let (_downloading_permit, downloaded_sector) = if let Some(downloaded_sector_fut) = maybe_next_downloaded_sector_fut.take() { downloaded_sector_fut .await .map_err(|_error| PlottingError::BackgroundDownloadingPanicked)?? } else { + let downloading_permit = Arc::clone(&downloading_semaphore) + .acquire_owned() + .await + .map_err(plotting::PlottingError::from)?; + let downloaded_sector_fut = download_sector(DownloadSectorOptions { public_key: &public_key, sector_index, @@ -243,20 +243,25 @@ where farmer_protocol_info: farmer_app_info.protocol_info, kzg, pieces_in_sector, - downloading_semaphore: Some(Arc::clone(&downloading_semaphore)), }); - downloaded_sector_fut.await? + + (downloading_permit, downloaded_sector_fut.await?) }; // Initiate downloading of pieces for the next segment index if already known if let Some(sector_index) = next_segment_index_hint { let piece_getter = piece_getter.clone(); - let downloading_semaphore = Some(Arc::clone(&downloading_semaphore)); + let downloading_semaphore = Arc::clone(&downloading_semaphore); let kzg = kzg.clone(); maybe_next_downloaded_sector_fut.replace(AsyncJoinOnDrop::new( tokio::spawn( async move { + let downloading_permit = downloading_semaphore + .acquire_owned() + .await + .map_err(plotting::PlottingError::from)?; + let downloaded_sector_fut = download_sector(DownloadSectorOptions { public_key: &public_key, sector_index, @@ -267,9 +272,9 @@ where farmer_protocol_info: farmer_app_info.protocol_info, kzg: &kzg, pieces_in_sector, - downloading_semaphore, }); - downloaded_sector_fut.await + + Ok((downloading_permit, downloaded_sector_fut.await?)) } .in_current_span(), ), @@ -299,7 +304,6 @@ where pieces_in_sector, sector_output: &mut sector, sector_metadata_output: &mut sector_metadata, - encoding_semaphore: Some(encoding_semaphore), table_generator: &mut table_generator, }, )); @@ -320,12 +324,17 @@ where }) }; - let plotting_result = if replotting { - replotting_thread_pool.install(plotting_fn) + let thread_pool = if replotting { + replotting_thread_pool_manager.get_thread_pool() } else { - plotting_thread_pool.install(plotting_fn) + plotting_thread_pool_manager.get_thread_pool() }; + // Give a chance to interrupt plotting if necessary + yield_now().await; + + let plotting_result = thread_pool.install(plotting_fn); + if matches!(plotting_result, Err(PlottingError::FarmIsShuttingDown)) { return Ok(()); } diff --git a/crates/subspace-farmer/src/thread_pool_manager.rs b/crates/subspace-farmer/src/thread_pool_manager.rs new file mode 100644 index 0000000000..1a6d6675d4 --- /dev/null +++ b/crates/subspace-farmer/src/thread_pool_manager.rs @@ -0,0 +1,97 @@ +use parking_lot::{Condvar, Mutex}; +use rayon::{ThreadPool, ThreadPoolBuildError}; +use std::num::NonZeroUsize; +use std::ops::Deref; +use std::sync::Arc; + +#[derive(Debug)] +struct Inner { + thread_pools: Vec, +} + +/// Wrapper around [`ThreadPool`] that on `Drop` will return thread pool back into corresponding +/// [`ThreadPoolManager`]. +#[derive(Debug)] +pub struct ThreadPoolGuard { + inner: Arc<(Mutex, Condvar)>, + thread_pool: Option, +} + +impl Deref for ThreadPoolGuard { + type Target = ThreadPool; + + fn deref(&self) -> &Self::Target { + self.thread_pool + .as_ref() + .expect("Value exists until `Drop`; qed") + } +} + +impl Drop for ThreadPoolGuard { + fn drop(&mut self) { + let (mutex, cvar) = &*self.inner; + let mut inner = mutex.lock(); + inner.thread_pools.push( + self.thread_pool + .take() + .expect("Happens only once in `Drop`; qed"), + ); + cvar.notify_one(); + } +} + +/// Thread pool manager. +/// +/// This abstraction wraps a set of thread pools and allows to use them one at a time. +/// +/// For example on machine with 64 logical cores and 4 NUMA nodes it would be recommended to create +/// 4 thread pools with 16 threads each, which would mean work done within thread pool is tied to +/// that thread pool. +#[derive(Debug, Clone)] +pub struct ThreadPoolManager { + inner: Arc<(Mutex, Condvar)>, +} + +impl ThreadPoolManager { + /// Create new thread pool manager by instantiating `thread_pools` thread pools using + /// `create_thread_pool`. + /// + /// `create_thread_pool` takes one argument `thread_pool_index`. + pub fn new( + create_thread_pool: C, + thread_pools: NonZeroUsize, + ) -> Result + where + C: FnMut(usize) -> Result, + { + let inner = Inner { + thread_pools: (0..thread_pools.get()) + .map(create_thread_pool) + .collect::, _>>()?, + }; + + Ok(Self { + inner: Arc::new((Mutex::new(inner), Condvar::new())), + }) + } + + /// Get one of inner thread pools, will block until one is available if needed + #[must_use] + pub fn get_thread_pool(&self) -> ThreadPoolGuard { + let (mutex, cvar) = &*self.inner; + let mut inner = mutex.lock(); + + let thread_pool = inner.thread_pools.pop().unwrap_or_else(|| { + cvar.wait(&mut inner); + + inner.thread_pools.pop().expect( + "Guaranteed by parking_lot's API to happen when thread pool is inserted; qed", + ) + }); + + ThreadPoolGuard { + inner: Arc::clone(&self.inner), + thread_pool: Some(thread_pool), + } + } +} diff --git a/crates/subspace-farmer/src/utils.rs b/crates/subspace-farmer/src/utils.rs index c396c52c53..c95e41d03f 100644 --- a/crates/subspace-farmer/src/utils.rs +++ b/crates/subspace-farmer/src/utils.rs @@ -5,18 +5,20 @@ pub mod ss58; #[cfg(test)] mod tests; +use crate::thread_pool_manager::ThreadPoolManager; use futures::channel::oneshot; use futures::channel::oneshot::Canceled; use futures::future::Either; -use rayon::ThreadBuilder; +use rayon::{ThreadBuilder, ThreadPoolBuildError, ThreadPoolBuilder}; use std::future::Future; +use std::num::NonZeroUsize; use std::ops::Deref; use std::pin::{pin, Pin}; use std::task::{Context, Poll}; use std::{io, thread}; use tokio::runtime::Handle; use tokio::task; -use tracing::debug; +use tracing::{debug, warn}; /// Joins async join handle on drop pub struct AsyncJoinOnDrop { @@ -136,11 +138,221 @@ where }) } -/// This function is supposed to be used with [`rayon::ThreadPoolBuilder::spawn_handler()`] to -/// inherit current tokio runtime. -pub fn tokio_rayon_spawn_handler() -> impl FnMut(ThreadBuilder) -> io::Result<()> { - let handle = Handle::current(); +/// Abstraction for CPU core set +#[derive(Debug, Clone)] +pub struct CpuCoreSet { + /// CPU cores that belong to this set + cores: Vec, + #[cfg(feature = "numa")] + topology: Option>, +} + +impl CpuCoreSet { + pub fn cpu_cores(&self) -> &[usize] { + &self.cores + } + + /// Will truncate list of CPU cores to this number. + /// + /// If `cores` is zero, call will do nothing since zero number of cores is not allowed. + pub fn truncate(&mut self, cores: usize) { + self.cores.truncate(cores.max(1)); + } + + /// Pin current thread to this NUMA node (not just one CPU core) + pub fn pin_current_thread(&self) { + #[cfg(feature = "numa")] + if let Some(topology) = &self.topology { + use hwlocality::cpu::binding::CpuBindingFlags; + use hwlocality::cpu::cpuset::CpuSet; + use hwlocality::ffi::PositiveInt; + + #[cfg(not(windows))] + let thread_id = unsafe { libc::pthread_self() }; + #[cfg(windows)] + let thread_id = unsafe { windows_sys::Win32::System::Threading::GetCurrentThread() }; + + // load the cpuset for the given core index. + let cpu_cores = CpuSet::from_iter( + self.cores + .iter() + .map(|&core| PositiveInt::try_from(core).expect("Valid CPU core")), + ); + + if let Err(error) = + topology.bind_thread_cpu(thread_id, &cpu_cores, CpuBindingFlags::empty()) + { + warn!(%error, ?cpu_cores, "Failed to pin thread to CPU cores") + } + } + } +} + +/// Get all cpu cores, grouped into sets according to NUMA nodes. +/// +/// Returned vector is guaranteed to have at least one element and have non-zero number of CPU cores +/// in each set. +pub fn all_cpu_cores() -> Vec { + #[cfg(feature = "numa")] + match hwlocality::Topology::new().map(std::sync::Arc::new) { + Ok(topology) => { + let cpu_cores = topology + // Iterate over NUMA nodes + .objects_at_depth(hwlocality::object::depth::Depth::NUMANode) + // For each NUMA nodes get CPU set + .filter_map(|node| node.cpuset()) + // For each CPU set extract individual cores + .map(|cpuset| cpuset.iter_set().map(usize::from).collect::>()) + .filter(|cores| !cores.is_empty()) + .map(|cores| CpuCoreSet { + cores, + topology: Some(std::sync::Arc::clone(&topology)), + }) + .collect::>(); + + if !cpu_cores.is_empty() { + return cpu_cores; + } + } + Err(error) => { + warn!(%error, "Failed to get CPU topology"); + } + } + vec![CpuCoreSet { + cores: (0..num_cpus::get()).collect(), + #[cfg(feature = "numa")] + topology: None, + }] +} + +/// Thread indices for each thread pool +pub fn thread_pool_core_indices( + thread_pool_size: Option, + thread_pools: Option, +) -> Vec { + let all_numa_nodes = all_cpu_cores(); + #[cfg(feature = "numa")] + let topology = &all_numa_nodes + .first() + .expect("Not empty according to function description; qed") + .topology; + + if let Some(thread_pools) = thread_pools { + let mut thread_pool_core_indices = Vec::::with_capacity(thread_pools.get()); + + if let Some(thread_pool_size) = thread_pool_size { + // If thread pool size is fixed, loop over all CPU cores as many times as necessary and + // assign contiguous ranges of CPU cores to corresponding thread pools + + let total_cpu_cores = all_numa_nodes + .iter() + .flat_map(|set| set.cpu_cores()) + .count(); + for _ in 0..thread_pools.get() { + let cpu_cores_range = if let Some(last_cpu_index) = thread_pool_core_indices + .last() + .and_then(|thread_indices| thread_indices.cpu_cores().last()) + .copied() + { + last_cpu_index + 1.. + } else { + 0.. + }; + + let cpu_cores = cpu_cores_range + .take(thread_pool_size.get()) + // To loop over all CPU cores multiple times, modulo naively obtained CPU + // cores by the total available number of CPU cores + .map(|core_index| core_index % total_cpu_cores) + .collect(); + + thread_pool_core_indices.push(CpuCoreSet { + cores: cpu_cores, + #[cfg(feature = "numa")] + topology: topology.clone(), + }); + } + } else { + // If thread pool size is not fixed, we iterate over all NUMA nodes as many times as + // necessary + + for thread_pool_index in 0..thread_pools.get() { + thread_pool_core_indices.push(CpuCoreSet { + cores: all_numa_nodes[thread_pool_index % all_numa_nodes.len()] + .cores + .clone(), + #[cfg(feature = "numa")] + topology: topology.clone(), + }); + } + } + thread_pool_core_indices + } else { + // If everything is set to defaults, use physical layout of CPUs + all_numa_nodes + } +} + +/// Creates thread pools for each of CPU core set with number of threads corresponding to number of cores in +/// each set and pins threads to all of those CPU cores (all at once, not thread per core). Each thread will +/// also have Tokio context available. +/// +/// The easiest way to obtain CPUs is using [`all_cpu_cores`], but [`thread_pool_core_indices`] in case +/// support for user customizations is desired. +pub fn create_tokio_thread_pool_manager_for_pinned_nodes( + thread_prefix: &'static str, + mut cpu_core_sets: Vec, +) -> Result { + let total_thread_pools = cpu_core_sets.len(); + ThreadPoolManager::new( + |thread_pool_index| { + let cpu_core_set = cpu_core_sets + .pop() + .expect("Number of thread pools is the same as cpu core sets; qed"); + + ThreadPoolBuilder::new() + .thread_name(move |thread_index| { + format!("{thread_prefix}-{thread_pool_index}.{thread_index}") + }) + .num_threads(cpu_core_set.cpu_cores().len()) + .spawn_handler({ + let handle = Handle::current(); + + rayon_custom_spawn_handler(move |thread| { + let cpu_core_set = cpu_core_set.clone(); + let handle = handle.clone(); + + move || { + cpu_core_set.pin_current_thread(); + drop(cpu_core_set); + + let _guard = handle.enter(); + + task::block_in_place(|| thread.run()) + } + }) + }) + .build() + }, + NonZeroUsize::new(total_thread_pools) + .expect("Thread pool is guaranteed to be non-empty; qed"), + ) +} + +/// This function is supposed to be used with [`rayon::ThreadPoolBuilder::spawn_handler()`] to +/// spawn handler with a custom logic defined by `spawn_hook_builder`. +/// +/// `spawn_hook_builder` is called with thread builder to create `spawn_handler` that in turn will +/// be spawn rayon's thread with desired environment. +pub fn rayon_custom_spawn_handler( + mut spawn_handler_builder: SpawnHandlerBuilder, +) -> impl FnMut(ThreadBuilder) -> io::Result<()> +where + SpawnHandlerBuilder: (FnMut(ThreadBuilder) -> SpawnHandler) + Clone, + SpawnHandler: (FnOnce() -> SpawnHandlerResult) + Send + 'static, + SpawnHandlerResult: Send + 'static, +{ move |thread: ThreadBuilder| { let mut b = thread::Builder::new(); if let Some(name) = thread.name() { @@ -150,12 +362,23 @@ pub fn tokio_rayon_spawn_handler() -> impl FnMut(ThreadBuilder) -> io::Result<() b = b.stack_size(stack_size); } + b.spawn(spawn_handler_builder(thread))?; + Ok(()) + } +} + +/// This function is supposed to be used with [`rayon::ThreadPoolBuilder::spawn_handler()`] to +/// inherit current tokio runtime. +pub fn tokio_rayon_spawn_handler() -> impl FnMut(ThreadBuilder) -> io::Result<()> { + let handle = Handle::current(); + + rayon_custom_spawn_handler(move |thread| { let handle = handle.clone(); - b.spawn(move || { + + move || { let _guard = handle.enter(); - tokio::task::block_in_place(|| thread.run()) - })?; - Ok(()) - } + task::block_in_place(|| thread.run()) + } + }) } diff --git a/crates/subspace-node/README.md b/crates/subspace-node/README.md index af9d1988c5..dc8667bbe7 100644 --- a/crates/subspace-node/README.md +++ b/crates/subspace-node/README.md @@ -14,12 +14,42 @@ It is recommended to follow general farming instructions that explain how to run Rust toolchain is expected to be installed for anything in this repository to compile, but there are some extra dependencies for farmer specifically. -Prost library from libp2p dependency needs CMake, also LLVM/Clang and `make` are necessary: +### Ubuntu + +LLVM/Clang and `make` are necessary: ```bash sudo apt-get install llvm clang cmake make ``` -Then build the farmer using Cargo: +### macOS + +1. Install via Homebrew: + +```bash +brew install llvm@15 clang cmake make +``` + +2. Add `llvm` to your `~/.zshrc` or `~/.bashrc`: + +```bash +export PATH="/opt/homebrew/opt/llvm@15/bin:$PATH" +``` + +3. Activate the changes: + +```bash +source ~/.zshrc +``` + +4. Verify that `llvm` is installed: + +```bash +llvm-config --version +``` + +### Build + +Then build the node using Cargo: ``` cargo build --profile production --bin subspace-node target/production/subspace-node --version diff --git a/docs/development.md b/docs/development.md index f55fd3ef33..596dfb54c9 100644 --- a/docs/development.md +++ b/docs/development.md @@ -2,39 +2,7 @@ You'll have to have [Rust toolchain](https://rustup.rs/) installed as well as LLVM, Clang and CMake in addition to usual developer tooling. -Below are some examples of how to install these dependencies on different operating systems. - -### Ubuntu - -```bash -sudo apt-get install llvm clang cmake -``` - -### macOS - -1. Install via Homebrew: - -```bash -brew install llvm@15 clang cmake -``` - -2. Add `llvm` to your `~/.zshrc` or `~/.bashrc`: - -```bash -export PATH="/opt/homebrew/opt/llvm@15/bin:$PATH" -``` - -3. Activate the changes: - -```bash -source ~/.zshrc -``` - -4. Verify that `llvm` is installed: - -```bash -llvm-config --version -``` +Check [crates/subspace-node](../crates/subspace-node/README.md) and [crates/subspace-farmer](../crates/subspace-farmer/README.md) for required dependencies. ## To Farm By Yourself (Offline) @@ -43,7 +11,7 @@ llvm-config --version **Linux/MacOS:** -1. Make them executable: `chmod +x subspace-farmer-x86_64-*-snapshot subspace-node-x86_64-*-snapshot` +1. Make files executable: `chmod +x subspace-farmer-x86_64-*-snapshot subspace-node-x86_64-*-snapshot` 2. Run the node: `./subspace-node-x86_64-*-snapshot --dev --tmp` 3. In macOS, it may prompt that this app is not verified. Click on `cancel` instead of moving it to trash. To allow execution, go to `System Preferences -> Security & Privacy -> General`, and click on `allow`. diff --git a/docs/farming.md b/docs/farming.md index 3c65bf98af..6c9c3c6045 100644 --- a/docs/farming.md +++ b/docs/farming.md @@ -349,10 +349,7 @@ If you're running unsupported Linux distribution or CPU architecture, you may tr NOTE: This is primarily targeted at tech-savvy users and not recommended unless you know what you're doing. Please try to find answer to your question online before reaching out to maintainers. -You'll have to have [Rust toolchain](https://rustup.rs/) installed as well as LLVM, Clang and CMake in addition to usual developer tooling (Ubuntu example): -```bash -sudo apt-get install llvm clang cmake -``` +Check [crates/subspace-node](../crates/subspace-node/README.md) and [crates/subspace-farmer](../crates/subspace-farmer/README.md) for required dependencies. Now clone the source and build snapshot `snapshot-2022-apr-29` (replace occurrences with the snapshot you want to build): ```bash