diff --git a/env/flink/env_flink.cc b/env/flink/env_flink.cc index 3c7579c34..f0eb58039 100644 --- a/env/flink/env_flink.cc +++ b/env/flink/env_flink.cc @@ -878,6 +878,69 @@ IOStatus FlinkFileSystem::UnlockFile(FileLock* /*lock*/, return IOStatus::OK(); } +IOStatus FlinkFileSystem::LinkFile(const std::string& src, + const std::string& target, + const IOOptions& options, + IODebugContext* dbg) { + IOStatus status = FileExists(src, options, dbg); + if (!status.ok()) { + return status.IsNotFound() + ? IOStatus::PathNotFound( + std::string( + "Could not find src path when linkFile, path: ") + .append(ConstructPath(src))) + : status; + } + + JNIEnv* jniEnv = getJNIEnv(); + + std::string srcFilePath = ConstructPath(src); + // Construct src Path Instance + jobject srcPathInstance; + status = class_cache_->ConstructPathInstance(srcFilePath, &srcPathInstance); + if (!status.ok()) { + return status; + } + + std::string targetFilePath = ConstructPath(target); + // Construct target Path Instance + jobject targetPathInstance; + status = + class_cache_->ConstructPathInstance(targetFilePath, &targetPathInstance); + if (!status.ok()) { + jniEnv->DeleteLocalRef(srcPathInstance); + return status; + } + + JavaClassCache::JavaMethodContext linkMethod = + class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_LINK_FILE); + jint linked = + jniEnv->CallIntMethod(file_system_instance_, linkMethod.javaMethod, + srcPathInstance, targetPathInstance); + jniEnv->DeleteLocalRef(srcPathInstance); + jniEnv->DeleteLocalRef(targetPathInstance); + + status = CurrentStatus([srcFilePath, targetFilePath]() { + return std::string("Exception when LinkFile, src: ") + .append(srcFilePath) + .append(", target: ") + .append(targetFilePath); + }); + if (!status.ok()) { + return status; + } + + if (linked == -1) { + return IOStatus::NotSupported(); + } else if (linked > 0) { + return IOStatus::IOError(std::string("Exception when LinkFile, src: ") + .append(srcFilePath) + .append(", target: ") + .append(targetFilePath)); + } + return IOStatus::OK(); +} + Status FlinkFileSystem::Create(const std::shared_ptr& base, const std::string& uri, std::unique_ptr* result, diff --git a/env/flink/env_flink.h b/env/flink/env_flink.h index 2ed2f8859..a1d3c4fdd 100644 --- a/env/flink/env_flink.h +++ b/env/flink/env_flink.h @@ -98,6 +98,10 @@ class FlinkFileSystem : public FileSystemWrapper { const IOOptions& /*options*/, bool* /*is_dir*/, IODebugContext* /*dbg*/) override; + IOStatus LinkFile(const std::string& /*src*/, const std::string& /*target*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override; + private: const std::string base_path_; JavaClassCache* class_cache_; diff --git a/env/flink/jni_helper.cc b/env/flink/jni_helper.cc index 0f87ac385..652780aec 100644 --- a/env/flink/jni_helper.cc +++ b/env/flink/jni_helper.cc @@ -151,6 +151,14 @@ IOStatus JavaClassCache::Init() { .signature = "(Lorg/apache/flink/core/fs/Path;Lorg/apache/flink/core/fs/Path;)Z"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_LINK_FILE] + .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_LINK_FILE] + .methodName = "link"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_LINK_FILE] + .signature = + "(Lorg/apache/flink/core/fs/Path;Lorg/apache/flink/core/fs/Path;)I"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_OPEN] .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_OPEN].methodName = diff --git a/env/flink/jni_helper.h b/env/flink/jni_helper.h index b98cd0eff..19bc89371 100644 --- a/env/flink/jni_helper.h +++ b/env/flink/jni_helper.h @@ -66,6 +66,7 @@ class JavaClassCache { JM_FLINK_FILE_STATUS_GET_LEN, JM_FLINK_FILE_STATUS_GET_MODIFICATION_TIME, JM_FLINK_FILE_STATUS_IS_DIR, + JM_FLINK_FILE_SYSTEM_LINK_FILE, NUM_CACHED_METHODS } CachedJavaMethod; diff --git a/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java b/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java index afb32d754..a46ef1443 100644 --- a/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java +++ b/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java @@ -119,6 +119,11 @@ public boolean rename(Path src, Path dst) throws IOException { return flinkFS.rename(src, dst); } + public int link(Path src, Path dst) throws IOException { + // let forstdb copy the file + return -1; + } + @Override public boolean isDistributedFS() { return flinkFS.isDistributedFS();