diff --git a/.github/Dockerfile b/.github/Dockerfile deleted file mode 100644 index 75e9f40..0000000 --- a/.github/Dockerfile +++ /dev/null @@ -1,29 +0,0 @@ -#Built for testing, not designed for application use. - -FROM ubuntu:20.04 -#="open-mpi/ompi" for github.com/open-mpi/ompi -ARG OPENMPI_REPO="open-mpi/ompi" -#="tags" or ="heads", for tag or branch name -ARG OPENMPI_VERS_PREFIX="tags" -#="v5.0.0rc10" or ="v5.0.x", ie tag name or branch name. -ARG OPENMPI_VERS="v5.0.0rc10" -run echo Using https://github.com/${OPENMPI_REPO}/git/refs/${OPENMPI_VERS_PREFIX}/${OPENMPI_VERS} - -RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y build-essential python3 m4 autoconf automake libtool flex git zlib1g-dev - -#Add files listing latest commit for this branch/tag, which invalidates the clone -#when a change has been pushed. -ADD https://api.github.com/repos/${OPENMPI_REPO}/git/refs/${OPENMPI_VERS_PREFIX}/${OPENMPI_VERS} commit_info -RUN git clone --recursive --branch ${OPENMPI_VERS} --depth 1 https://github.com/${OPENMPI_REPO}.git ompi_src && \ - mkdir ompi_build ompi_install && cd ompi_src && export AUTOMAKE_JOBS=8 && ./autogen.pl && cd ../ompi_build && ../ompi_src/configure --prefix=/ompi_install --disable-man-pages --with-ft=ulfm && make install -j8 && cd .. - - -#New build stage, tosses out src/build trees from openmpi -FROM ubuntu:20.04 -RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y build-essential cmake ssh zlib1g-dev -COPY . ./fenix_src -COPY --from=0 ompi_install/ /ompi_install/ -ENV PATH="$PATH:/ompi_install/bin" -RUN mkdir fenix_build fenix_install && cd fenix_build && cmake ../fenix_src -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_COMPILER=/ompi_install/bin/mpicc \ - -DFENIX_EXAMPLES=ON -DFENIX_TESTS=ON -DCMAKE_INSTALL_PREFIX=../fenix_install -DMPIEXEC_PREFLAGS="--allow-run-as-root;--map-by;:OVERSUBSCRIBE" && make install -j8 -CMD ["sh", "-c", "cd fenix_build && ctest --verbose --timeout 60"] diff --git a/.github/docker-compose.yml b/.github/docker-compose.yml index b29e083..c8c9024 100644 --- a/.github/docker-compose.yml +++ b/.github/docker-compose.yml @@ -1,81 +1,49 @@ -version: "3.9" - -x-fenix: &fenix - build: &fenix-build - context: ./ - dockerfile: .github/Dockerfile - args: - OPENMPI_REPO: open-mpi/ompi - OPENMPI_VERS_PREFIX: tags - OPENMPI_VERS: v5.0.0rc10 - #Caches should be manually scoped, or they'll conflict. - x-bake: - cache-from: - - type=gha,scope=default - cache-to: - - type=gha,scope=default,mode=max - services: - #fenix_ompi_5rc10: - # <<: *fenix - # image: "fenix:ompi_5rc10" - # build: - # <<: *fenix-build - # x-bake: - # cache-from: - # - type=gha,scope=ompi_5rc10 - # cache-to: - # - type=gha,scope=ompi_5rc10,mode=max - - fenix_ompi_5: - <<: *fenix - image: "fenix:ompi_5" + bootstrap: + image: "bootstrap" build: - <<: *fenix-build + dockerfile_inline: | + FROM spack/ubuntu-jammy:0.22.2 + VOLUME /configs + ARG OMPI_VERSION + ENV OMPI_VERSION=$${OMPI_VERSION} + CMD cp /configs/spack.yaml . && \ + spack -e . add openmpi@$${OMPI_VERSION} && \ + spack -e . containerize >/configs/spack.Dockerfile args: - - OPENMPI_VERS_PREFIX=heads - - OPENMPI_VERS=v5.0.x - x-bake: - cache-from: - - type=gha,scope=ompi_5 - cache-to: - - type=gha,scope=ompi_5,mode=max - - fenix_ompi_main: - <<: *fenix - image: "fenix:ompi_main" + OMPI_VERSION: main + no_cache: true + pull_policy: build + volumes: + - .github/:/configs + + env: + image: "ghcr.io/sandialabs/fenix/env:main" build: - <<: *fenix-build - args: - - OPENMPI_VERS_PREFIX=heads - - OPENMPI_VERS=main - x-bake: - cache-from: - - type=gha,scope=ompi_main - cache-to: - - type=gha,scope=ompi_main,mode=max - - fenix_icldisco_latest: - <<: *fenix - image: "fenix:icldisco_latest" + # Generated by running the bootstrap image + dockerfile: .github/spack.Dockerfile + + fenix: + image: "fenix" build: - <<: *fenix-build + dockerfile_inline: | + ARG OMPI_VERSION main + FROM ghcr.io/sandialabs/fenix/env:$${OMPI_VERSION} + COPY . /fenix + RUN . /opt/spack-environment/activate.sh && \ + mkdir -p /fenix/build && \ + cd /fenix/build && \ + cmake /fenix \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_C_COMPILER=mpicc \ + -DFENIX_EXAMPLES=ON \ + -DFENIX_TESTS=ON \ + -DMPIEXEC_PREFLAGS="--allow-run-as-root;--map-by;:oversubscribe" && \ + make -j + + WORKDIR /fenix/build + ENTRYPOINT ["/entrypoint.sh"] + CMD ["ctest", "--output-on-failure", "--timeout", "60"] args: - - OPENMPI_REPO=icldisco/ompi - - OPENMPI_VERS_PREFIX=heads - - OPENMPI_VERS=ulfm/latest - x-bake: - cache-from: - - type=gha,scope=icldisco_latest - cache-to: - - type=gha,scope=icldisco_latest,mode=max - - #fenix_icldisco_experimental: - # <<: *fenix - # image: fenix/icldisco - # build: - # <<: *fenix-build - # args: - # - OPENMPI_REPO=icldisco/ompi - # - OPENMPI_VERS_PREFIX=heads - # - OPENMPI_VERS=ulfm/experimental + OMPI_VERSION: main + pull_policy: build diff --git a/.github/spack.yaml b/.github/spack.yaml new file mode 100644 index 0000000..c5d3611 --- /dev/null +++ b/.github/spack.yaml @@ -0,0 +1,31 @@ +spack: + packages: + openmpi: + variants: +internal-hwloc +internal-libevent +internal-pmix + concretizer: + unify: true + reuse: true + + container: + format: docker + strip: false + images: + os: ubuntu:22.04 + spack: 0.22.2 + os_packages: + build: + - build-essential + - autotools-dev + - pkg-config + - python3 + - m4 + - autoconf + - automake + - flex + - git + - zlib1g-dev + - libperl-dev + - numactl + final: + - build-essential + - cmake diff --git a/.github/workflows/build-env/action.yml b/.github/workflows/build-env/action.yml new file mode 100644 index 0000000..28e16d6 --- /dev/null +++ b/.github/workflows/build-env/action.yml @@ -0,0 +1,91 @@ +name: Build Environment Image +description: Build the Open MPI environment image for Fenix + +inputs: + ompi_version: + description: "Open MPI version to build" + type: string + required: true + token: + description: "GitHub token for logging into GHCR" + type: string + required: true + max_age: + description: "Maximum image age before rebuild, in days" + type: number + required: false + default: 14 + +runs: + using: "composite" + steps: + - name: Check for valid image + shell: bash + run: | + set +e + IMG=ghcr.io/sandialabs/fenix/env:${{ inputs.ompi_version }} + echo "IMG=$IMG" >> $GITHUB_ENV + + docker image rm -f $IMG 2>/dev/null + docker pull $IMG >/dev/null 2>&1 + IMG_CREATED=$(docker inspect --type=image --format '{{.Created}}' $IMG 2>/dev/null) + if [ -z "$IMG_CREATED" ]; then + echo "Did not find image $IMG" + echo "found=false" >> $GITHUB_ENV + exit 0 + fi + + IMG_AGE=$(( ($(date +%s) - $(date -d "$IMG_CREATED" +%s)) / (60*60*24) )) + echo "Found image $IMG created $IMG_AGE days ago" + if [ "$IMG_AGE" -lt ${{ inputs.max_age }} ]; then + echo "Image is valid, skipping build" + echo "found=true" >> $GITHUB_ENV + else + echo "Image is too old, rebuilding" + echo "found=false" >> $GITHUB_ENV + fi + + #Remaining actions only run if we didn't find a valid image. + - name: Checkout repository + if: env.found != 'true' + uses: actions/checkout@v3 + + - name: Set up Docker Buildx + if: env.found != 'true' + uses: docker/setup-buildx-action@v2 + + - name: Log in to GHCR container registry + if: env.found != 'true' + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ inputs.token }} + + - name: Bake the bootstrap docker image + if: env.found != 'true' + uses: docker/bake-action@v5 + with: + files: .github/docker-compose.yml + targets: bootstrap + workdir: . + set: | + *.output=type=docker,name=bootstrap + *.args.OMPI_VERSION=${{ inputs.ompi_version }} + + - name: Bootstrap the environment Dockerfile + if: env.found != 'true' + shell: bash + run: docker run -v ${GITHUB_WORKSPACE}/.github:/configs bootstrap + + - name: Build the environment + if: env.found != 'true' + uses: docker/bake-action@v5 + with: + files: .github/docker-compose.yml + targets: env + workdir: . + pull: true + set: | + env.tags=ghcr.io/sandialabs/fenix/env:${{ inputs.ompi_version }} + env.output=type=registry,name=ghcr.io/sandialabs/fenix/env:${{ inputs.ompi_version }} diff --git a/.github/workflows/ci_checks.yaml b/.github/workflows/ci_checks.yaml index ebeeef8..c6adc31 100644 --- a/.github/workflows/ci_checks.yaml +++ b/.github/workflows/ci_checks.yaml @@ -1,31 +1,40 @@ name: Build & Test on: - push: - pull_request_target: - types: - - opened - - synchronized - - edited + pull_request: jobs: test: runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + ompi_version: + - main + - 5.0.3 + steps: - - uses: actions/checkout@v3 - - uses: docker/setup-buildx-action@v2 - - name: Build - uses: docker/bake-action@master + - name: Checkout + uses: actions/checkout@v3 + + - name: Build the environment image + uses: ./.github/workflows/build-env + with: + ompi_version: ${{ matrix.ompi_version }} + token: ${{ secrets.GITHUB_TOKEN }} + max_age: 14 #days + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Build Fenix + uses: docker/bake-action@v5 with: - files: | - .github/docker-compose.yml - load: true - - name: Test open-mpi v5.0.x - if: success() || failure() - run: docker run fenix:ompi_5 - - name: Test open-mpi main - if: success() || failure() - run: docker run fenix:ompi_main - - name: Test icldisco latest - if: success() || failure() - run: docker run fenix:icldisco_latest + files: .github/docker-compose.yml + targets: fenix + set: | + *.output=type=docker,name=fenix + *.args.OMPI_VERSION=${{ matrix.ompi_version }} + + - name: Test Fenix + run: docker run fenix diff --git a/include/fenix.h b/include/fenix.h index 1a283bf..77d573b 100644 --- a/include/fenix.h +++ b/include/fenix.h @@ -69,7 +69,7 @@ extern "C" { #define FENIX_SUCCESS 0 #define FENIX_ERROR_UNINITIALIZED -9 #define FENIX_ERROR_NOCATEGORY -10 -#define FENIX_ERROR_CALLBACK_NOT_REGISTERD -11 +#define FENIX_ERROR_CALLBACK_NOT_REGISTERED -11 #define FENIX_ERROR_GROUP_CREATE -12 #define FENIX_ERROR_MEMBER_CREATE -13 #define FENIX_ERROR_COMMIT_BARRIER -133 @@ -105,7 +105,8 @@ extern "C" { #define FENIX_DATA_SUBSET_CREATED 2 #define FENIX_ERRHANDLER_LOC 1 -#define FENIX_DATA_COMMIT_BARRIER_LOC 2 +#define FENIX_FINALIZE_LOC 2 +#define FENIX_DATA_COMMIT_BARRIER_LOC 4 #define FENIX_DATA_POLICY_IN_MEMORY_RAID 13 @@ -142,6 +143,8 @@ int Fenix_Initialized(int *); int Fenix_Callback_register(void (*recover)(MPI_Comm, int, void *), void *callback_data); +int Fenix_Callback_pop(); + int Fenix_get_number_of_ranks_with_role(int, int *); int Fenix_get_role(MPI_Comm comm, int rank, int *role); @@ -228,6 +231,8 @@ int Fenix_Process_fail_list(int** fail_list); int Fenix_check_cancelled(MPI_Request *request, MPI_Status *status); +int Fenix_Process_detect_failures(int do_recovery); + #if defined(c_plusplus) || defined(__cplusplus) } #endif diff --git a/include/fenix_ext.h b/include/fenix_ext.h index fd4b1a6..ef4dcc4 100644 --- a/include/fenix_ext.h +++ b/include/fenix_ext.h @@ -96,6 +96,9 @@ typedef struct { //Manage state of the comms. Necessary when failures happen rapidly, mussing up state int new_world_exists, user_world_exists; + int dummy_recv_buffer; + MPI_Request check_failures_req; + MPI_Op agree_op; // This is reserved for the global agreement call for Fenix data recovery API diff --git a/include/fenix_process_recovery.h b/include/fenix_process_recovery.h index bb9d63a..9b85e04 100644 --- a/include/fenix_process_recovery.h +++ b/include/fenix_process_recovery.h @@ -100,6 +100,8 @@ int __fenix_repair_ranks(); int __fenix_callback_register(void (*recover)(MPI_Comm, int, void *), void *); +int __fenix_callback_pop(); + void __fenix_callback_push(fenix_callback_list_t **, fenix_callback_func *); void __fenix_callback_invoke_all(int error); @@ -116,6 +118,8 @@ void __fenix_set_rank_role(int FenixRankRole); void __fenix_postinit(int *); +int __fenix_detect_failures(int do_recovery); + void __fenix_finalize(); void __fenix_finalize_spare(); diff --git a/src/fenix.c b/src/fenix.c index 6be875f..70c55d5 100644 --- a/src/fenix.c +++ b/src/fenix.c @@ -67,6 +67,10 @@ int Fenix_Callback_register(void (*recover)(MPI_Comm, int, void *), void *callba return __fenix_callback_register(recover, callback_data); } +int Fenix_Callback_pop() { + return __fenix_callback_pop(); +} + int Fenix_Initialized(int *flag) { *flag = (fenix.fenix_init_flag) ? 1 : 0; return FENIX_SUCCESS; @@ -205,3 +209,7 @@ int Fenix_check_cancelled(MPI_Request *request, MPI_Status *status){ //Request was (potentially) cancelled if ret is MPI_ERR_PROC_FAILED return ret == MPI_ERR_PROC_FAILED || ret == MPI_ERR_REVOKED; } + +int Fenix_Process_detect_failures(int do_recovery){ + return __fenix_detect_failures(do_recovery); +} diff --git a/src/fenix_callbacks.c b/src/fenix_callbacks.c index 885058d..8779402 100644 --- a/src/fenix_callbacks.c +++ b/src/fenix_callbacks.c @@ -80,6 +80,19 @@ int __fenix_callback_register(void (*recover)(MPI_Comm, int, void *), void *call return error_code; } +int __fenix_callback_pop(){ + if(!fenix.fenix_init_flag) return FENIX_ERROR_UNINITIALIZED; + if(fenix.callback_list == NULL) return FENIX_ERROR_CALLBACK_NOT_REGISTERED; + + fenix_callback_list_t* old_head = fenix.callback_list; + fenix.callback_list = old_head->next; + + free(old_head->callback); + free(old_head); + + return FENIX_SUCCESS; +} + void __fenix_callback_invoke_all(int error) { fenix_callback_list_t *current = fenix.callback_list; diff --git a/src/fenix_data_policy_in_memory_raid.c b/src/fenix_data_policy_in_memory_raid.c index ca4b007..8eaa362 100644 --- a/src/fenix_data_policy_in_memory_raid.c +++ b/src/fenix_data_policy_in_memory_raid.c @@ -56,6 +56,7 @@ #include #include "fenix.h" +#include "fenix_ext.h" #include "fenix_opt.h" #include "fenix_data_subset.h" #include "fenix_data_recovery.h" @@ -123,8 +124,25 @@ typedef struct __fenix_imr_group{ int entries_count; fenix_imr_mentry_t* entries; int num_snapshots; + int* timestamps; } fenix_imr_group_t; +typedef struct __fenix_imr_undo_log{ + int groupid, memberid; +} fenix_imr_undo_log_t; + +void __imr_sync_timestamps(fenix_imr_group_t* group); + +void __imr_undo_restore(MPI_Comm comm, int err, void* data){ + fenix_imr_undo_log_t* undo_log = (fenix_imr_undo_log_t*)data; + + Fenix_Data_member_delete(undo_log->groupid, undo_log->memberid); + + free(data); + Fenix_Callback_pop(); //Should be this callback itself. +} + + void __fenix_policy_in_memory_raid_get_group(fenix_group_t** group, MPI_Comm comm, int timestart, int depth, void* policy_value, int* flag){ *group = (fenix_group_t *)malloc(sizeof(fenix_imr_group_t)); @@ -257,8 +275,11 @@ void __fenix_policy_in_memory_raid_get_group(fenix_group_t** group, MPI_Comm com new_group->entries = (fenix_imr_mentry_t*) malloc(sizeof(fenix_imr_mentry_t) * __FENIX_IMR_DEFAULT_MENTRY_NUM); new_group->num_snapshots = 0; - - + new_group->timestamps = (int*)malloc(sizeof(int)*depth); + + new_group->base.comm = comm; + new_group->base.current_rank = my_rank; + __imr_sync_timestamps(new_group); *flag = FENIX_SUCCESS; } @@ -370,12 +391,7 @@ int __imr_member_create(fenix_group_t* g, fenix_member_entry_t* mentry){ //Initialize to smallest # blocks allowed. __fenix_data_subset_init(1, new_imr_mentry->data_regions + i); new_imr_mentry->data_regions[i].specifier = __FENIX_SUBSET_EMPTY; - - //-1 is not a valid timestamp, use as an indicator that the data isn't valid. - new_imr_mentry->timestamp[i] = -1; } - //The first commit's timestamp is the group's timestart. - new_imr_mentry->timestamp[0] = group->base.timestart; group->entries_count++; @@ -398,7 +414,7 @@ void __imr_member_free(fenix_imr_mentry_t* mentry, int depth){ } int __imr_member_delete(fenix_group_t* g, int member_id){ - int retval = -1; + int retval = FENIX_SUCCESS; fenix_imr_group_t* group = (fenix_imr_group_t*)g; //Find the member first fenix_imr_mentry_t *mentry; @@ -460,13 +476,14 @@ int __imr_member_store(fenix_group_t* g, int member_id, void* recv_buf = malloc(serialized_size * member_data->datatype_size); MPI_Sendrecv(serialized, serialized_size * member_data->datatype_size, MPI_BYTE, - group->partners[1], group->base.groupid ^ STORE_PAYLOAD_TAG, recv_buf, - serialized_size * member_data->datatype_size, MPI_BYTE, group->partners[0], - group->base.groupid ^ STORE_PAYLOAD_TAG, group->base.comm, NULL); + group->partners[1], group->base.groupid ^ STORE_PAYLOAD_TAG, + recv_buf, serialized_size * member_data->datatype_size, MPI_BYTE, + group->partners[0], group->base.groupid ^ STORE_PAYLOAD_TAG, + group->base.comm, NULL); //Expand the serialized data out and store into the partner's portion of this data entry. __fenix_data_subset_deserialize(&subset_specifier, recv_buf, - mentry->data[mentry->current_head] + member_data->datatype_size*member_data->current_count, + ((uint8_t*)mentry->data[mentry->current_head]) + member_data->datatype_size*member_data->current_count, member_data->current_count, member_data->datatype_size); free(recv_buf); @@ -520,7 +537,7 @@ int __imr_member_store(fenix_group_t* g, int member_id, offset = 0; } - MPI_Reduce((void*)((char*)data_buf) + offset, parity_buf, parity_size + (i < remainder ? 1 : 0), MPI_BYTE, + MPI_Reduce((char*)data_buf + offset, parity_buf, parity_size + (i < remainder ? 1 : 0), MPI_BYTE, MPI_BXOR, i, group->set_comm); if(i != my_set_rank){ offset += parity_size + (i < remainder ? 1 : 0); @@ -575,13 +592,18 @@ int __imr_commit(fenix_group_t* g){ fenix_imr_group_t *group = (fenix_imr_group_t*)g; + if(group->num_snapshots == group->base.depth+1){ + //Full of timestamps, remove the oldest and proceed as normal. + memcpy(group->timestamps, group->timestamps+1, group->base.depth); + group->num_snapshots--; + } + group->timestamps[group->num_snapshots++] = group->base.timestamp; + + //For each entry id (eid) for(int eid = 0; eid < group->entries_count; eid++){ fenix_imr_mentry_t *mentry = &group->entries[eid]; - //Two cases for each member entry: - // (1) depth has been reached, shift out the oldest commit - // (2) depth has not been reached, just commit and start filling a new location. if(mentry->current_head == group->base.depth + 1){ //The entry is full, one snapshot should be shifted out. @@ -598,24 +620,11 @@ int __imr_commit(fenix_group_t* g){ mentry->data[group->base.depth + 1] = first_data; mentry->data_regions[group->base.depth + 1].specifier = __FENIX_SUBSET_EMPTY; - mentry->timestamp[group->base.depth + 1] = mentry->timestamp[group->base.depth] + 1; - - } else { - //The entry is not full, just shift the current head. - mentry->current_head++; - - //Everything is initialized to correct values, we just need to provide - //the correct timestamp for the next snapshot. - mentry->timestamp[mentry->current_head] = mentry->timestamp[mentry->current_head-1] + 1; - - if(eid == 0){ - //Only do this once - group->num_snapshots++; - } + mentry->current_head--; } - } - group->base.timestamp = group->entries[0].timestamp[group->entries[0].current_head - 1]; + mentry->timestamp[mentry->current_head++] = group->base.timestamp; + } return to_return; } @@ -698,6 +707,9 @@ int __imr_member_restore(fenix_group_t* g, int member_id, int retval = -1; fenix_imr_group_t* group = (fenix_imr_group_t*)g; + //One-time fix after a reinit. + if(group->base.timestamp == -1 && group->num_snapshots > 0) + group->base.timestamp = group->timestamps[group->num_snapshots-1]; fenix_imr_mentry_t* mentry; //find_mentry returns the error status. We found the member (and corresponding data) if there are no errors. @@ -711,6 +723,8 @@ int __imr_member_restore(fenix_group_t* g, int member_id, int recovery_locally_possible; + fenix_imr_undo_log_t* undo_data; //Used for undoing partial restores interrupted by failures. + if(group->raid_mode == 1){ int my_data_found, partner_data_found; @@ -722,6 +736,7 @@ int __imr_member_restore(fenix_group_t* g, int member_id, MPI_Sendrecv(&found_member, 1, MPI_INT, group->partners[1], PARTNER_STATUS_TAG, &partner_data_found, 1, MPI_INT, group->partners[0], PARTNER_STATUS_TAG, group->base.comm, NULL); + if(found_member && partner_data_found && my_data_found){ //I have my data, and the person who's data I am backing up has theirs. We're good to go. @@ -738,17 +753,6 @@ int __imr_member_restore(fenix_group_t* g, int member_id, if(!partner_data_found) __fenix_data_member_send_metadata(group->base.groupid, member_id, group->partners[0]); - //Now my partner will need all of the entries. First they'll need to know how many snapshots - //to expect. - if(!partner_data_found) - MPI_Send((void*) &(group->num_snapshots), 1, MPI_INT, group->partners[0], - RECOVER_MEMBER_ENTRY_TAG^group->base.groupid, group->base.comm); - - //They also need the timestamps for each snapshot, as well as the value for the next. - if(!partner_data_found) - MPI_Send((void*)mentry->timestamp, group->num_snapshots+1, MPI_INT, group->partners[0], - RECOVER_MEMBER_ENTRY_TAG^group->base.groupid, group->base.comm); - for(int snapshot = 0; snapshot < group->num_snapshots; snapshot++){ //send data region info next if(!partner_data_found) @@ -788,21 +792,22 @@ int __imr_member_restore(fenix_group_t* g, int member_id, __fenix_member_create(group->base.groupid, packet.memberid, NULL, packet.current_count, packet.datatype_size); + //Mark the member for deletion if another failure interrupts recovering fully. + undo_data = (fenix_imr_undo_log_t*)malloc(sizeof(fenix_imr_undo_log_t)); + undo_data->groupid = group->base.groupid; + undo_data->memberid = member_id; + Fenix_Callback_register(__imr_undo_restore, (void*)undo_data); + __imr_find_mentry(group, member_id, &mentry); int member_data_index = __fenix_search_memberid(group->base.member, member_id); member_data = group->base.member->member_entry[member_data_index]; - - MPI_Recv((void*)&(group->num_snapshots), 1, MPI_INT, group->partners[1], - RECOVER_MEMBER_ENTRY_TAG^group->base.groupid, group->base.comm, NULL); - + mentry->current_head = group->num_snapshots; - //We also need to explicitly ask for all timestamps, since user may have deleted some and caused mischief. - MPI_Recv((void*)(mentry->timestamp), group->num_snapshots + 1, MPI_INT, group->partners[1], - RECOVER_MEMBER_ENTRY_TAG^group->base.groupid, group->base.comm, NULL); - //now recover data. for(int snapshot = 0; snapshot < group->num_snapshots; snapshot++){ + mentry->timestamp[snapshot] = group->timestamps[snapshot]; + __fenix_data_subset_free(mentry->data_regions+snapshot); __fenix_data_subset_recv(mentry->data_regions+snapshot, group->partners[1], __IMR_RECOVER_DATA_REGION_TAG ^ group->base.groupid, group->base.comm); @@ -828,11 +833,16 @@ int __imr_member_restore(fenix_group_t* g, int member_id, free(recv_buf); } } + + //Member restored fully, so we don't need to mark it for undoing on failure. + Fenix_Callback_pop(); + free(undo_data); } recovery_locally_possible = found_member || (my_data_found && partner_data_found); - + if(recovery_locally_possible) retval = FENIX_SUCCESS; + } else if (group->raid_mode == 5){ int* set_results = malloc(sizeof(int) * group->set_size); MPI_Allgather((void*)&found_member, 1, MPI_INT, (void*)set_results, 1, MPI_INT, @@ -890,6 +900,13 @@ int __imr_member_restore(fenix_group_t* g, int member_id, __fenix_member_create(group->base.groupid, packet.memberid, NULL, packet.current_count, packet.datatype_size); + //Mark the member for deletion if another failure interrupts recovering fully. + undo_data = (fenix_imr_undo_log_t*)malloc(sizeof(fenix_imr_undo_log_t)); + undo_data->groupid = group->base.groupid; + undo_data->memberid = member_id; + Fenix_Callback_register(__imr_undo_restore, (void*)undo_data); + + __imr_find_mentry(group, member_id, &mentry); int member_data_index = __fenix_search_memberid(group->base.member, member_id); member_data = group->base.member->member_entry[member_data_index]; @@ -956,6 +973,12 @@ int __imr_member_restore(fenix_group_t* g, int member_id, } } + if(!found_member){ + //Member restored fully, so we don't need to mark it for undoing on failure. + Fenix_Callback_pop(); + free(undo_data); + } + } retval = FENIX_SUCCESS; @@ -1032,7 +1055,7 @@ int __imr_member_restore(fenix_group_t* g, int member_id, } //Dont forget to clear the commit buffer - mentry->data_regions[mentry->current_head].specifier = __FENIX_SUBSET_EMPTY; + if(recovery_locally_possible) mentry->data_regions[mentry->current_head].specifier = __FENIX_SUBSET_EMPTY; return retval; @@ -1128,11 +1151,78 @@ int __imr_reinit(fenix_group_t* g, int* flag){ MPI_Comm_create_group(g->comm, set_group, 0, &(group->set_comm)); } + __imr_sync_timestamps(group); + *flag = FENIX_SUCCESS; return FENIX_SUCCESS; } +void __imr_sync_timestamps(fenix_imr_group_t* group){ + int n_snapshots = group->num_snapshots; + + if(group->raid_mode == 1){ + int partner_snapshots; + MPI_Sendrecv(&n_snapshots, 1, MPI_INT, group->partners[0], 34560, + &partner_snapshots, 1, MPI_INT, group->partners[1], 34560, + group->base.comm, MPI_STATUS_IGNORE); + n_snapshots = n_snapshots > partner_snapshots ? n_snapshots : partner_snapshots; + + MPI_Sendrecv(&n_snapshots, 1, MPI_INT, group->partners[1], 34561, + &partner_snapshots, 1, MPI_INT, group->partners[0], 34561, + group->base.comm, MPI_STATUS_IGNORE); + n_snapshots = n_snapshots > partner_snapshots ? n_snapshots : partner_snapshots; + } else { + MPI_Allreduce(MPI_IN_PLACE, &n_snapshots, 1, MPI_INT, MPI_MAX, group->set_comm); + } + + bool need_reset = group->num_snapshots != n_snapshots; + for(int i = group->num_snapshots; i < n_snapshots; i++) group->timestamps[i] = -1; + + if(group->raid_mode == 1){ + int* p0_stamps = (int*)malloc(sizeof(int)*n_snapshots); + int* p1_stamps = (int*)malloc(sizeof(int)*n_snapshots); + + MPI_Sendrecv(group->timestamps, n_snapshots, MPI_INT, group->partners[1], 34562, + p0_stamps, n_snapshots, MPI_INT, group->partners[0], 34562, + group->base.comm, MPI_STATUS_IGNORE); + MPI_Sendrecv(group->timestamps, n_snapshots, MPI_INT, group->partners[0], 34563, + p1_stamps, n_snapshots, MPI_INT, group->partners[1], 34563, + group->base.comm, MPI_STATUS_IGNORE); + + for(int i = 0; i < n_snapshots; i++){ + int old_stamp = group->timestamps[i]; + group->timestamps[i] = group->timestamps[i] > p0_stamps[i] ? group->timestamps[i] : p0_stamps[i]; + group->timestamps[i] = group->timestamps[i] > p1_stamps[i] ? group->timestamps[i] : p1_stamps[i]; + + need_reset |= group->timestamps[i] != old_stamp; + } + + free(p0_stamps); + free(p1_stamps); + } else { + MPI_Allreduce(MPI_IN_PLACE, group->timestamps, n_snapshots, MPI_INT, MPI_MAX, group->set_comm); + } + + group->num_snapshots = n_snapshots; + if(n_snapshots > 0) group->base.timestamp = group->timestamps[n_snapshots-1]; + else group->base.timestamp = -1; + + //Now fix members + if(need_reset && group->entries_count > 0) { + if(fenix.options.verbose == 1){ + verbose_print("Outdated timestamps on rank %d. All members will require full recovery.\n", + group->base.current_rank); + } + //For now, just delete all members and assume partner(s) can + //help me rebuild fully consistent state + for(int i = group->entries_count-1; i >= 0; i--){ + int memberid = group->entries[i].memberid; + Fenix_Data_member_delete(group->base.groupid, memberid); + } + } +} + int __imr_get_redundant_policy(fenix_group_t* group, int* policy_name, void* policy_value, int* flag){ int retval = FENIX_SUCCESS; diff --git a/src/fenix_data_recovery.c b/src/fenix_data_recovery.c index 7542cce..314df0b 100644 --- a/src/fenix_data_recovery.c +++ b/src/fenix_data_recovery.c @@ -551,11 +551,11 @@ int __fenix_data_commit(int groupid, int *timestamp) { } else { fenix_group_t *group = (fenix.data_recovery->group[group_index]); - group->vtbl.commit(group); - - if (group->timestamp +1 -1) group->timestamp++; + if (group->timestamp != -1) group->timestamp++; else group->timestamp = group->timestart; + group->vtbl.commit(group); + if (timestamp != NULL) { *timestamp = group->timestamp; } @@ -601,6 +601,8 @@ int __fenix_data_commit_barrier(int groupid, int *timestamp) { fenix.ignore_errs = old_failure_handling; if(can_commit == 1){ + if (group->timestamp != -1) group->timestamp++; + else group->timestamp = group->timestart; retval = group->vtbl.commit(group); } diff --git a/src/fenix_process_recovery.c b/src/fenix_process_recovery.c index b845fa6..02f7801 100644 --- a/src/fenix_process_recovery.c +++ b/src/fenix_process_recovery.c @@ -220,12 +220,14 @@ int __fenix_preinit(int *role, MPI_Comm comm, MPI_Comm *new_comm, int *argc, cha __fenix_get_current_rank(*fenix.world), fenix.role); } __fenix_finalize_spare(); - } else { + } else if(ret == MPI_ERR_REVOKED){ fenix.repair_result = __fenix_repair_ranks(); if (fenix.options.verbose == 0) { verbose_print("spare rank exiting from MPI_Recv - repair ranks; rank: %d, role: %d\n", __fenix_get_current_rank(*fenix.world), fenix.role); } + } else { + MPIX_Comm_ack_failed(*fenix.world, __fenix_get_world_size(*fenix.world), &a); } fenix.role = FENIX_ROLE_RECOVERED_RANK; } @@ -684,6 +686,11 @@ void __fenix_postinit(int *error) // fenix.role); //} + if(fenix.new_world_exists){ + //Set up dummy irecv to use for checking for failures. + MPI_Irecv(&fenix.dummy_recv_buffer, 1, MPI_INT, MPI_ANY_SOURCE, + 34095347, fenix.new_world, &fenix.check_failures_req); + } if (fenix.repair_result != 0) { *error = fenix.repair_result; @@ -705,41 +712,70 @@ void __fenix_postinit(int *error) } } +int __fenix_detect_failures(int do_recovery){ + if(!fenix.new_world_exists) return FENIX_ERROR_UNINITIALIZED; + + int old_ignore_errs = fenix.ignore_errs; + fenix.ignore_errs = !do_recovery; + + int req_completed; + int ret = MPI_Test(&fenix.check_failures_req, &req_completed, MPI_STATUS_IGNORE); + + if(req_completed) ret = FENIX_ERROR_INTERN; + + fenix.ignore_errs = old_ignore_errs; + return ret; +} + void __fenix_finalize() { - // Any MPI communication call needs to be protected in case they - // fail. In that case, we need to recursively call fenix_finalize. - // By setting fenix.finalized to 1 we are skipping the longjump - // after recovery. - fenix.finalized = 1; - - int ret = MPI_Barrier( fenix.new_world ); - if (ret != MPI_SUCCESS) { - __fenix_finalize(); - return; + int location = FENIX_FINALIZE_LOC; + MPIX_Comm_agree(*fenix.user_world, &location); + if(location != FENIX_FINALIZE_LOC){ + //Some ranks are in error recovery, so trigger error handling. + MPIX_Comm_revoke(*fenix.user_world); + MPI_Barrier(*fenix.user_world); + + //In case no-jump enabled after recovery + return __fenix_finalize(); } - if (__fenix_get_current_rank(*fenix.world) == 0) { - int spare_rank; - MPI_Comm_size(*fenix.world, &spare_rank); - spare_rank--; - int a; - int i; - for (i = 0; i < fenix.spare_ranks; i++) { - int ret = MPI_Send(&a, 1, MPI_INT, spare_rank, 1, *fenix.world); - if (ret != MPI_SUCCESS) { - __fenix_finalize(); - return; + int first_spare_rank = __fenix_get_world_size(*fenix.user_world); + int last_spare_rank = __fenix_get_world_size(*fenix.world) - 1; + + //If we've reached here, we will finalized regardless of further errors. + fenix.ignore_errs = 1; + while(!fenix.finalized){ + int user_rank = __fenix_get_current_rank(*fenix.user_world); + + if (user_rank == 0) { + for (int i = first_spare_rank; i <= last_spare_rank; i++) { + //We don't care if a spare failed, ignore return value + int unused; + MPI_Send(&unused, 1, MPI_INT, i, 1, *fenix.world); } - spare_rank--; } - } - ret = MPI_Barrier(*fenix.world); - if (ret != MPI_SUCCESS) { - __fenix_finalize(); - return; + //We need to confirm that rank 0 didn't fail, since it could have + //failed before notifying some spares to leave. + int need_retry = user_rank == 0 ? 0 : 1; + MPIX_Comm_agree(*fenix.user_world, &need_retry); + if(need_retry == 1){ + //Rank 0 didn't contribute, so we need to retry. + MPIX_Comm_shrink(*fenix.user_world, fenix.user_world); + continue; + } else { + //If rank 0 did contribute, we know sends made it, and regardless + //of any other failures we finalize. + fenix.finalized = 1; + } } + + //Now we do one last agree w/ the spares to let them know they can actually + //finalize + int unused; + MPIX_Comm_agree(*fenix.world, &unused); + MPI_Op_free( &fenix.agree_op ); MPI_Comm_set_errhandler( *fenix.world, MPI_ERRORS_ARE_FATAL ); @@ -763,8 +799,27 @@ void __fenix_finalize() void __fenix_finalize_spare() { fenix.fenix_init_flag = 0; - int ret = PMPI_Barrier(*fenix.world); - if (ret != MPI_SUCCESS) { debug_print("MPI_Barrier: %d\n", ret); } + + int unused; + MPI_Request agree_req, recv_req = MPI_REQUEST_NULL; + + MPIX_Comm_iagree(*fenix.world, &unused, &agree_req); + while(true){ + int completed = 0; + MPI_Test(&agree_req, &completed, MPI_STATUS_IGNORE); + if(completed) break; + + int ret = MPI_Test(&recv_req, &completed, MPI_STATUS_IGNORE); + if(completed){ + //We may get duplicate messages informing us to exit + MPI_Irecv(&unused, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, *fenix.world, &recv_req); + } + if(ret != MPI_SUCCESS){ + MPIX_Comm_ack_failed(*fenix.world, __fenix_get_world_size(*fenix.world), &unused); + } + } + + if(recv_req != MPI_REQUEST_NULL) MPI_Cancel(&recv_req); MPI_Op_free(&fenix.agree_op); MPI_Comm_set_errhandler(*fenix.world, MPI_ERRORS_ARE_FATAL); diff --git a/test/failed_spares/fenix_failed_spares.c b/test/failed_spares/fenix_failed_spares.c index 02d18a4..bea1dd7 100644 --- a/test/failed_spares/fenix_failed_spares.c +++ b/test/failed_spares/fenix_failed_spares.c @@ -66,7 +66,7 @@ const int kKillID = 1; void* exitThread(void* should_exit){ sleep(1); - if( ((int)should_exit) == 1){ + if( ((intptr_t)should_exit) == 1){ pid_t pid = getpid(); kill(pid, SIGTERM); } @@ -92,7 +92,7 @@ int main(int argc, char **argv) { MPI_Comm_size(world_comm, &old_world_size); MPI_Comm_rank(world_comm, &old_rank); - int should_cancel = 0; + intptr_t should_cancel = 0; for(int i = 2; i < argc; i++){ if(atoi(argv[i]) == old_rank) should_cancel = 1; }