Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): lock graph creation when running in another process #29408

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/nx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,5 @@ assert_fs = "1.0.10"
# This is only used for unit tests
swc_ecma_dep_graph = "0.109.1"
tempfile = "3.13.0"
# We only explicitly use tokio for async tests
tokio = "1.38.0"
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,11 @@ async function processFilesAndCreateAndSerializeProjectGraph(
serializedSourceMaps: null,
};
} else {
writeCache(g.projectFileMapCache, g.projectGraph);
writeCache(
g.projectFileMapCache,
g.projectGraph,
projectConfigurationsResult.sourceMaps
);
return g;
}
} catch (err) {
Expand Down
8 changes: 8 additions & 0 deletions packages/nx/src/native/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ export declare class ChildProcess {
onOutput(callback: (message: string) => void): void
}

export declare class FileLock {
locked: boolean
constructor(lockFilePath: string)
lock(): void
unlock(): void
wait(): Promise<void>
}

export declare class HashPlanner {
constructor(nxJson: NxJson, projectGraph: ExternalObject<ProjectGraph>)
getPlans(taskIds: Array<string>, taskGraph: TaskGraph): Record<string, string[]>
Expand Down
1 change: 1 addition & 0 deletions packages/nx/src/native/native-bindings.js
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ if (!nativeBinding) {
}

module.exports.ChildProcess = nativeBinding.ChildProcess
module.exports.FileLock = nativeBinding.FileLock
module.exports.HashPlanner = nativeBinding.HashPlanner
module.exports.ImportResult = nativeBinding.ImportResult
module.exports.NxCache = nativeBinding.NxCache
Expand Down
128 changes: 128 additions & 0 deletions packages/nx/src/native/utils/file_lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use std::fs::{self, File};
use std::io;
use std::path::Path;
use std::time::Duration;

#[napi]
#[derive(Clone)]
pub struct FileLock {
#[napi]
pub locked: bool,

lock_file_path: String,
}

#[napi]
impl FileLock {
#[napi(constructor)]
pub fn new(lock_file_path: String) -> Self {
let locked = Path::new(&lock_file_path).exists();
Self {
locked,
lock_file_path,
}
}

#[napi]
pub fn lock(&mut self) -> anyhow::Result<()> {
if self.locked {
anyhow::bail!("File {} is already locked", self.lock_file_path)
}

let _ = File::create(&self.lock_file_path)?;
self.locked = true;
Ok(())
}

#[napi]
pub fn unlock(&mut self) -> anyhow::Result<()> {
if !self.locked {
anyhow::bail!("File {} is not locked", self.lock_file_path)
}
fs::remove_file(&self.lock_file_path).or_else(|err| {
if err.kind() == io::ErrorKind::NotFound {
Ok(())
} else {
Err(err)
}
})?;
self.locked = false;
Ok(())
}

#[napi]
pub async fn wait(&self) -> Result<(), napi::Error> {
if !self.locked {
return Ok(());
}

loop {
if !self.locked || !Path::new(&self.lock_file_path).exists() {
break Ok(());
}
std::thread::sleep(Duration::from_millis(2));
}
}
}

// Ensure the lock file is removed when the FileLock is dropped
impl Drop for FileLock {
fn drop(&mut self) {
if self.locked {
let _ = self.unlock();
}
}
}

#[cfg(test)]
mod test {
use super::*;

use assert_fs::prelude::*;
use assert_fs::TempDir;

#[test]
fn test_new_lock() {
let tmp_dir = TempDir::new().unwrap();
let lock_file = tmp_dir.child("test_lock_file");
let lock_file_path = lock_file.path().to_path_buf();
let lock_file_path_str = lock_file_path.into_os_string().into_string().unwrap();
let mut file_lock = FileLock::new(lock_file_path_str);
assert_eq!(file_lock.locked, false);
let _ = file_lock.lock();
assert_eq!(file_lock.locked, true);
assert!(lock_file.exists());
let _ = file_lock.unlock();
assert_eq!(lock_file.exists(), false);
}

#[tokio::test]
async fn test_wait() {
let tmp_dir = TempDir::new().unwrap();
let lock_file = tmp_dir.child("test_lock_file");
let lock_file_path = lock_file.path().to_path_buf();
let lock_file_path_str = lock_file_path.into_os_string().into_string().unwrap();
let mut file_lock = FileLock::new(lock_file_path_str);
let _ = file_lock.lock();
let file_lock_clone = file_lock.clone();
let wait_fut = async move {
let _ = file_lock_clone.wait().await;
};
let _ = tokio::runtime::Runtime::new().unwrap().block_on(wait_fut);
assert_eq!(file_lock.locked, false);
assert_eq!(lock_file.exists(), false);
}

#[test]
fn test_drop() {
let tmp_dir = TempDir::new().unwrap();
let lock_file = tmp_dir.child("test_lock_file");
let lock_file_path = lock_file.path().to_path_buf();
let lock_file_path_str = lock_file_path.into_os_string().into_string().unwrap();
{
let mut file_lock = FileLock::new(lock_file_path_str.clone());
let _ = file_lock.lock();
}
assert_eq!(lock_file.exists(), false);
}
}
1 change: 1 addition & 0 deletions packages/nx/src/native/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ pub use normalize_trait::Normalize;
#[cfg_attr(target_arch = "wasm32", path = "atomics/wasm.rs")]
pub mod atomics;
pub mod ci;
pub mod file_lock;

pub use atomics::*;
35 changes: 34 additions & 1 deletion packages/nx/src/project-graph/nx-deps-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
} from '../utils/fileutils';
import { PackageJson } from '../utils/package-json';
import { nxVersion } from '../utils/versions';
import { ConfigurationSourceMaps } from './utils/project-configuration-utils';

export interface FileMapCache {
version: string;
Expand All @@ -34,6 +35,8 @@ export const nxProjectGraph = join(
);
export const nxFileMap = join(workspaceDataDirectory, 'file-map.json');

export const nxSourceMaps = join(workspaceDataDirectory, 'source-maps.json');

export function ensureCacheDirectory(): void {
try {
if (!existsSync(workspaceDataDirectory)) {
Expand Down Expand Up @@ -102,6 +105,31 @@ export function readProjectGraphCache(): null | ProjectGraph {
return data ?? null;
}

export function readSourceMapsCache(): null | ConfigurationSourceMaps {
performance.mark('read source-maps:start');
ensureCacheDirectory();

let data = null;
try {
if (fileExists(nxSourceMaps)) {
data = readJsonFile(nxSourceMaps);
}
} catch (error) {
console.log(
`Error reading '${nxSourceMaps}'. Continue the process without the cache.`
);
console.log(error);
}

performance.mark('read source-maps:end');
performance.measure(
'read cache',
'read source-maps:start',
'read source-maps:end'
);
return data ?? null;
}

export function createProjectFileMapCache(
nxJson: NxJsonConfiguration<'*' | string[]>,
packageJsonDeps: Record<string, string>,
Expand All @@ -123,7 +151,8 @@ export function createProjectFileMapCache(

export function writeCache(
cache: FileMapCache,
projectGraph: ProjectGraph
projectGraph: ProjectGraph,
sourceMaps: ConfigurationSourceMaps
): void {
performance.mark('write cache:start');
let retry = 1;
Expand All @@ -137,13 +166,17 @@ export function writeCache(
const unique = (Math.random().toString(16) + '0000000').slice(2, 10);
const tmpProjectGraphPath = `${nxProjectGraph}~${unique}`;
const tmpFileMapPath = `${nxFileMap}~${unique}`;
const tmpSourceMapPath = `${nxSourceMaps}~${unique}`;

try {
writeJsonFile(tmpProjectGraphPath, projectGraph);
renameSync(tmpProjectGraphPath, nxProjectGraph);

writeJsonFile(tmpFileMapPath, cache);
renameSync(tmpFileMapPath, nxFileMap);

writeJsonFile(tmpSourceMapPath, sourceMaps);
renameSync(tmpSourceMapPath, nxSourceMaps);
done = true;
} catch (err: any) {
if (err instanceof Error) {
Expand Down
76 changes: 64 additions & 12 deletions packages/nx/src/project-graph/project-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
import {
readFileMapCache,
readProjectGraphCache,
readSourceMapsCache,
writeCache,
} from './nx-deps-cache';
import { ConfigurationResult } from './utils/project-configuration-utils';
Expand All @@ -34,6 +35,10 @@ import {
} from './utils/retrieve-workspace-files';
import { getPlugins } from './plugins/get-plugins';
import { logger } from '../utils/logger';
import { FileLock } from '../native';
import { join } from 'path';
import { workspaceDataDirectory } from '../utils/cache-directory';
import { DelayedSpinner } from '../utils/delayed-spinner';

/**
* Synchronously reads the latest cached copy of the workspace's ProjectGraph.
Expand Down Expand Up @@ -167,7 +172,7 @@ export async function buildProjectGraphAndSourceMapsWithoutDaemon() {
throw new ProjectGraphError(errors, projectGraph, sourceMaps);
} else {
if (cacheEnabled) {
writeCache(projectFileMapCache, projectGraph);
writeCache(projectFileMapCache, projectGraph, sourceMaps);
}
return { projectGraph, sourceMaps };
}
Expand Down Expand Up @@ -206,6 +211,20 @@ export function handleProjectGraphError(opts: { exitOnError: boolean }, e) {
}
}

async function readCachedGraphAndHydrateFileMap() {
const graph = readCachedProjectGraph();
const projectRootMap = Object.fromEntries(
Object.entries(graph.nodes).map(([project, { data }]) => [
data.root,
project,
])
);
const { allWorkspaceFiles, fileMap, rustReferences } =
await retrieveWorkspaceFiles(workspaceRoot, projectRootMap);
hydrateFileMap(fileMap, allWorkspaceFiles, rustReferences);
return graph;
}

/**
* Computes and returns a ProjectGraph.
*
Expand Down Expand Up @@ -235,18 +254,8 @@ export async function createProjectGraphAsync(
): Promise<ProjectGraph> {
if (process.env.NX_FORCE_REUSE_CACHED_GRAPH === 'true') {
try {
const graph = readCachedProjectGraph();
const projectRootMap = Object.fromEntries(
Object.entries(graph.nodes).map(([project, { data }]) => [
data.root,
project,
])
);
const { allWorkspaceFiles, fileMap, rustReferences } =
await retrieveWorkspaceFiles(workspaceRoot, projectRootMap);
hydrateFileMap(fileMap, allWorkspaceFiles, rustReferences);
return graph;
// If no cached graph is found, we will fall through to the normal flow
return readCachedGraphAndHydrateFileMap();
} catch (e) {
logger.verbose('Unable to use cached project graph', e);
}
Expand All @@ -267,6 +276,47 @@ export async function createProjectGraphAndSourceMapsAsync(
performance.mark('create-project-graph-async:start');

if (!daemonClient.enabled()) {
const lock = new FileLock(
join(workspaceDataDirectory, 'project-graph.lock')
);

function cleanupFileLock() {
try {
lock.unlock();
} catch {}
}

process.on('exit', cleanupFileLock);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might not need this


if (lock.locked) {
logger.verbose(
'Waiting for graph construction in another process to complete'
);
const spinner = new DelayedSpinner(
'Waiting for graph construction in another process to complete'
);
await lock.wait();
spinner.cleanup();

// Note: This will currently throw if any of the caches are missing...
// It would be nice if one of the processes that was waiting for the lock
// could pick up the slack and build the graph if it's missing, but
// we wouldn't want either of the below to happen:
// - All of the waiting processes to build the graph
// - Even one of the processes building the graph on a legitimate error

const sourceMaps = readSourceMapsCache();
if (!sourceMaps) {
throw new Error(
'The project graph was computed in another process, but the source maps are missing.'
);
}
return {
projectGraph: await readCachedGraphAndHydrateFileMap(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should throw an error if the graph is in a bad state

sourceMaps,
};
}
lock.lock();
try {
const res = await buildProjectGraphAndSourceMapsWithoutDaemon();
performance.measure(
Expand All @@ -293,6 +343,8 @@ export async function createProjectGraphAndSourceMapsAsync(
return res;
} catch (e) {
handleProjectGraphError(opts, e);
} finally {
lock.unlock();
}
} else {
try {
Expand Down
Loading