Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve init processing #172

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.eclipse.collections.api.factory.Lists;
Expand Down Expand Up @@ -326,30 +327,42 @@ public CompileResult getCompileResult(SectionState sectionState)
{
DocumentState documentState = sectionState.getDocumentState();
GlobalState globalState = documentState.getGlobalState();
return globalState.getProperty(COMPILE_RESULT, () -> tryCompile(globalState, documentState, sectionState));
// when looking for compile results, there might be another thread working on it already
// if current thread is the one that sets the completable future, then do the actual compilation and set it on the completable future
// if current thread does not set the completable future, then just join and wait for result from another thread
CompletableFuture<CompileResult> maybeCompileResultFuture = new CompletableFuture<>();
CompletableFuture<CompileResult> compileResultFuture = globalState.getProperty(COMPILE_RESULT, () -> maybeCompileResultFuture);

if (compileResultFuture == maybeCompileResultFuture)
{
compileResultFuture.complete(this.tryCompile(globalState, documentState, sectionState));
}

return compileResultFuture.join();
}

protected CompileResult tryCompile(GlobalState globalState, DocumentState documentState, SectionState sectionState)
{
long started = System.currentTimeMillis();
globalState.logInfo("Starting compilation");
PureModelContextData pureModelContextData = null;
try
{
pureModelContextData = buildPureModelContextData(globalState);
PureModel pureModel = Compiler.compile(pureModelContextData, DeploymentMode.PROD, "");
globalState.logInfo("Compilation completed successfully");
globalState.logInfo("Compilation completed successfully in " + (System.currentTimeMillis() - started) + "ms");
return new CompileResult(pureModel, pureModelContextData);
}
catch (EngineException e)
{
SourceInformation sourceInfo = e.getSourceInformation();
if (isValidSourceInfo(sourceInfo))
{
globalState.logInfo("Compilation completed with error " + "(" + sourceInfo.sourceId + " " + SourceInformationUtil.toLocation(sourceInfo) + "): " + e.getMessage());
globalState.logInfo("Compilation completed in " + (System.currentTimeMillis() - started) + "ms with error " + "(" + sourceInfo.sourceId + " " + SourceInformationUtil.toLocation(sourceInfo) + "): " + e.getMessage());
}
else
{
globalState.logInfo("Compilation completed with error: " + e.getMessage());
globalState.logInfo("Compilation completed in " + (System.currentTimeMillis() - started) + "ms with error: " + e.getMessage());
globalState.logWarning("Invalid source information for compilation error");
LOGGER.warn("Invalid source information in exception during compilation requested for section {} of {}: {}", sectionState.getSectionNumber(), documentState.getDocumentId(), (sourceInfo == null) ? null : sourceInfo.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import java.util.List;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -300,6 +303,28 @@ public void forEachDocumentState(Consumer<? super DocumentState> consumer)
this.docStates.forEachValue(consumer::accept);
}

@Override
public CompletableFuture<Void> forEachDocumentStateParallel(Consumer<? super DocumentState> consumer)
{
this.docStates.forEachValue(consumer::accept);
return CompletableFuture.completedFuture(null);
}

@Override
public <RESULT> CompletableFuture<List<RESULT>> collectFromEachDocumentState(Function<? super DocumentState, List<RESULT>> func)
{
List<RESULT> results = this.docStates.stream().map(func).flatMap(List::stream).collect(Collectors.toList());
return CompletableFuture.completedFuture(results);
}

@Override
public <RESULT> CompletableFuture<List<RESULT>> collectFromEachDocumentSectionState(BiFunction<? super DocumentState, ? super SectionState, List<RESULT>> func)
{
List<RESULT> results = new ArrayList<>();
this.docStates.stream().forEach(x -> x.forEachSectionState(s -> results.addAll(func.apply(x, s))));
return CompletableFuture.completedFuture(results);
}

@Override
public Collection<LegendLSPGrammarExtension> getAvailableGrammarExtensions()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package org.finos.legend.engine.ide.lsp.extension.state;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.finos.legend.engine.ide.lsp.extension.LegendLSPFeature;
import org.finos.legend.engine.ide.lsp.extension.LegendLSPGrammarExtension;
Expand All @@ -41,6 +45,26 @@ public interface GlobalState extends State
*/
void forEachDocumentState(Consumer<? super DocumentState> consumer);

/**
* Apply the given consumer to each document state. No particular order is guaranteed.
* Implementation could do this in parallel
*
* @param consumer document state consumer, needs to be threadsafe
* @return future to track when this completes
*/
CompletableFuture<Void> forEachDocumentStateParallel(Consumer<? super DocumentState> consumer);

/**
* Apply the given function to each document state, collecting its result in a list. No particular order is guaranteed.
* Implementation could do this in parallel.
*
* @param func function to apply to each document state, needs to be threadsafe
* @return future to with the collected results
*/
<RESULT> CompletableFuture<List<RESULT>> collectFromEachDocumentState(Function<? super DocumentState, List<RESULT>> func);

<RESULT> CompletableFuture<List<RESULT>> collectFromEachDocumentSectionState(BiFunction<? super DocumentState, ? super SectionState, List<RESULT>> func);

/**
* List of available grammar extensions. This is useful for extensions that need to dispatch to other extensions
* for further processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,14 @@ public String getProjectVersion()
@Override
public void initialized(InitializedParams params)
{
long start = System.currentTimeMillis();
checkReady();
this.classpathFactory.initialize(this);
CompletableFuture<Void> initializeExtensions = this.initializeExtensions();
CompletableFuture<Void> engineServerUrl = this.initializeEngineServerUrl();

CompletableFuture.allOf(initializeExtensions, engineServerUrl)
.thenRun(() -> this.logInfoToClient("Extension finished post-initialization"));
.thenRun(() -> this.logInfoToClient("Extension finished post-initialization in " + (System.currentTimeMillis() - start) + "ms"));

}

Expand Down Expand Up @@ -292,10 +293,16 @@ private CompletableFuture<Void> initializeExtensions()

return this.classpathFactory.create(Collections.unmodifiableSet(this.rootFolders))
.thenAccept(this.extensionGuard::initialize)
.thenRun(this.extensionGuard.wrapOnClasspath(this::reprocessDocuments))
.thenCompose(_x -> this.reprocessDocuments())
.thenRun(this.legendLanguageService::loadVirtualFileSystemContent)
// trigger compilation
.thenRun(this.extensionGuard.wrapOnClasspath(() -> this.globalState.forEachDocumentState(this.textDocumentService::getLegendDiagnostics)))
.thenCompose(_x -> this.globalState.forEachDocumentStateParallel(x ->
{
long diagnosticStarted = System.currentTimeMillis();
this.textDocumentService.getLegendDiagnostics(x);
LOGGER.info("Diagnostics computed for {} took {}ms", x.getDocumentId(), System.currentTimeMillis() - diagnosticStarted);

}))
.thenRun(() ->
{
LanguageClient languageClient = this.getLanguageClient();
Expand All @@ -314,10 +321,14 @@ private CompletableFuture<Void> initializeExtensions()
});
}

private void reprocessDocuments()
private CompletableFuture<Void> reprocessDocuments()
{
this.globalState.forEachDocumentState(x -> ((LegendServerGlobalState.LegendServerDocumentState) x).recreateSectionStates());
this.globalState.clearProperties();
return this.globalState.forEachDocumentStateParallel(x ->
{
long startTime = System.currentTimeMillis();
((LegendServerGlobalState.LegendServerDocumentState) x).recreateSectionStates();
LOGGER.info("Reprocessing {} took {}ms", x.getDocumentId(), System.currentTimeMillis() - startTime);
}).thenRun(this.globalState::clearProperties);
}

@Override
Expand Down Expand Up @@ -484,10 +495,10 @@ public <T> CompletableFuture<T> supplyPossiblyAsync(Supplier<T> supplier)
return this.supplyPossiblyAsync_internal(this.extensionGuard.wrapOnClasspath(supplier));
}

void runPossiblyAsync(Runnable runnable)
CompletableFuture<?> runPossiblyAsync(Runnable runnable)
{
checkReady();
this.runPossiblyAsync_internal(this.extensionGuard.wrapOnClasspath(runnable));
return this.runPossiblyAsync_internal(this.extensionGuard.wrapOnClasspath(runnable));
}

private CompletableFuture<?> runPossiblyAsync_internal(Runnable work)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,24 +120,17 @@ public CompletableFuture<String> replClasspath()
@Override
public CompletableFuture<List<LegendTest>> testCases()
{
return this.server.supplyPossiblyAsync(() ->
{
List<LegendTest> commands = new ArrayList<>();

this.server.getGlobalState().forEachDocumentState(docState ->
{
docState.forEachSectionState(sectionState ->
return this.server.getGlobalState().collectFromEachDocumentSectionState((docState, sectionState) ->
{
List<LegendTest> commands = new ArrayList<>();
LegendLSPGrammarExtension extension = sectionState.getExtension();
if (extension != null)
{
commands.addAll(extension.testCases(sectionState));
}
});
});

return commands;
});
return commands;
}
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.finos.legend.engine.ide.lsp.extension.LegendLSPFeature;
import org.finos.legend.engine.ide.lsp.extension.LegendLSPGrammarExtension;
Expand Down Expand Up @@ -64,6 +68,37 @@ public void forEachDocumentState(Consumer<? super DocumentState> consumer)
this.docs.values().forEach(consumer);
}

@Override
public CompletableFuture<Void> forEachDocumentStateParallel(Consumer<? super DocumentState> consumer)
{
return CompletableFuture.allOf(
this.docs.values()
.stream()
.map(x -> this.server.runPossiblyAsync(() -> consumer.accept(x)))
.toArray(CompletableFuture[]::new)
);
}

@Override
public <RESULT> CompletableFuture<List<RESULT>> collectFromEachDocumentState(Function<? super DocumentState, List<RESULT>> func)
{
return this.docs.values()
.stream()
.map(x -> this.server.supplyPossiblyAsync(() -> func.apply(x)))
.reduce((x, y) -> x.thenCombine(y, (r, l) -> Stream.concat(r.stream(), l.stream()).collect(Collectors.toList())))
.orElseGet(() -> CompletableFuture.completedFuture(List.of()));
}

@Override
public <RESULT> CompletableFuture<List<RESULT>> collectFromEachDocumentSectionState(BiFunction<? super DocumentState, ? super SectionState, List<RESULT>> func)
{
return this.docs.values()
.stream()
.flatMap(x -> x.collectFromEachSectionState(func))
.reduce((x, y) -> x.thenCombine(y, (r, l) -> Stream.concat(r.stream(), l.stream()).collect(Collectors.toList())))
.orElseGet(() -> CompletableFuture.completedFuture(List.of()));
}

@Override
public void logInfo(String message)
{
Expand Down Expand Up @@ -247,6 +282,16 @@ public void forEachSectionState(Consumer<? super SectionState> consumer)
}
}

private <RESULT> Stream<CompletableFuture<List<RESULT>>> collectFromEachSectionState(BiFunction<? super DocumentState, ? super SectionState, List<RESULT>> func)
{
List<LegendServerSectionState> currentSectionsStates = this.sectionStates;
if (currentSectionsStates != null)
{
return currentSectionsStates.stream().map(x -> this.globalState.server.supplyPossiblyAsync(() -> func.apply(this, x)));
}
return Stream.empty();
}

Integer getVersion()
{
return version;
Expand Down
Loading
Loading