diff --git a/.gitmodules b/.gitmodules index 58663f6113..f4bd1fb12f 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,12 +1,12 @@ [submodule "fateboard"] path = fateboard url = https://github.com/FederatedAI/FATE-Board.git - branch = v1.9.0 + branch = v1.9.1 [submodule "eggroll"] path = eggroll url = https://github.com/WeBankFinTech/eggroll.git - branch = v2.4.5 + branch = v2.4.6 [submodule "fateflow"] path = fateflow url = https://github.com/FederatedAI/FATE-Flow.git - branch = v1.9.0 + branch = v1.9.1 diff --git a/RELEASE.md b/RELEASE.md index 4b49a6226e..a463fede45 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,3 +1,13 @@ +## Release 1.9.1 +### Major Features and Improvements +> Bug-Fix +* Fix cipher compression with large Hessian value for HeteroSecureBoost +* Fix tweedie-loss calculation in HeteroSecureBoost +* Fix Intersection summary when left-joining data with match_id +* Fix event/non_event statistic for WOE computation in HeteroFeatureBinning +* Fix default sid name display for data uploaded with meta + + ## Release 1.9.0 ### Major Features and Improvements > FederatedML diff --git a/deploy/cluster-deploy/doc/fate_on_spark/common/hadoop_spark_deployment_guide.md b/deploy/cluster-deploy/doc/fate_on_spark/common/hadoop_spark_deployment_guide.md index b34561d0e4..621dcac0ad 100644 --- a/deploy/cluster-deploy/doc/fate_on_spark/common/hadoop_spark_deployment_guide.md +++ b/deploy/cluster-deploy/doc/fate_on_spark/common/hadoop_spark_deployment_guide.md @@ -153,30 +153,30 @@ ssh app@192.168.0.3 **Upload below Packages to Servers** -1. wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/jdk-8u192.tar.gz -2. wget https://archive.apache.org/dist/hadoop/common/hadoop-3.2.0/hadoop-3.2.0.tar.gz +1. wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/jdk-8u345.tar.xz +2. wget https://archive.apache.org/dist/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.tar.gz 3. wget https://downloads.lightbend.com/scala/2.12.10/scala-2.12.10.tgz 4. wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz -5. wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz +5. wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz **Extract** ```bash -tar xvf hadoop-3.2.0.tar.gz -C /data/projects/common +tar xvf hadoop-3.3.1.tar.gz -C /data/projects/common tar xvf scala-2.12.10.tgz -C /data/projects/common tar xvf spark-3.1.2-bin-hadoop3.2.tgz -C /data/projects/common tar xvf zookeeper-3.4.14.tar.gz -C /data/projects/common -tar xvf jdk-8u192-linux-x64.tar.gz -C /data/projects/common/jdk -mv hadoop-3.2.0 hadoop +tar xJf jdk-8u345.tar.xz -C /data/projects/common/jdk +mv hadoop-3.3.1 hadoop mv scala-2.12.10 scala mv spark-3.1.2-bin-hadoop3.2 spark -mv zookeeper-3.4.14 zookeeper +mv apache-zookeeper-3.6.3-bin zookeeper ``` **Configure /etc/profile** ```bash -export JAVA_HOME=/data/projects/common/jdk/jdk-8u192 +export JAVA_HOME=/data/projects/common/jdk/jdk-8u345 export PATH=$JAVA_HOME/bin:$PATH export HADOOP_HOME=/data/projects/common/hadoop export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin @@ -226,7 +226,7 @@ cd /data/projects/common/hadoop/etc/hadoop **In hadoop-env.sh、yarn-env.sh** -**Add**: export JAVA_HOME=/data/projects/common/jdk/jdk1.8.0_192 +**Add**: export JAVA_HOME=/data/projects/common/jdk/jdk-8u345 **In /data/projects/common/Hadoop/etc/hadoop change `core-site.xml`, `hdfs-site.xml`, `mapred-site.xml`, `yarn-site.xml` configuration; change IP hostname & path depending on actual environment. Please refer below for an example** @@ -631,7 +631,7 @@ spark.yarn.jars hdfs://fate-cluster/tmp/spark/jars/\*.jar **Add to spark-env.sh: ** ``` -export JAVA_HOME=/data/projects/common/jdk/jdk-8u192 +export JAVA_HOME=/data/projects/common/jdk/jdk-8u345 export SCALA_HOME=/data/projects/common/scala export HADOOP_HOME=/data/projects/common/hadoop export HADOOP_CONF_DIR=\$HADOOP_HOME/etc/hadoop diff --git a/deploy/cluster-deploy/doc/fate_on_spark/common/hadoop_spark_deployment_guide.zh.md b/deploy/cluster-deploy/doc/fate_on_spark/common/hadoop_spark_deployment_guide.zh.md index bf49149159..a9b4d73065 100644 --- a/deploy/cluster-deploy/doc/fate_on_spark/common/hadoop_spark_deployment_guide.zh.md +++ b/deploy/cluster-deploy/doc/fate_on_spark/common/hadoop_spark_deployment_guide.zh.md @@ -154,30 +154,30 @@ ssh app@192.168.0.3 **上传以下程序包到服务器上** -1. wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/jdk-8u192.tar.gz -2. wget https://archive.apache.org/dist/hadoop/common/hadoop-3.2.0/hadoop-3.2.0.tar.gz +1. wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/jdk-8u345.tar.xz +2. wget https://archive.apache.org/dist/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.tar.gz 3. wget https://downloads.lightbend.com/scala/2.12.10/scala-2.12.10.tgz 4. wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz -5. wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz +5. wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz **解压** ```bash -tar xvf hadoop-3.2.0.tar.gz -C /data/projects/common +tar xvf hadoop-3.3.1.tar.gz -C /data/projects/common tar xvf scala-2.12.10.tgz -C /data/projects/common tar xvf spark-3.1.2-bin-hadoop3.2.tgz -C /data/projects/common tar xvf zookeeper-3.4.14.tar.gz -C /data/projects/common -tar xvf jdk-8u192-linux-x64.tar.gz -C /data/projects/common/jdk -mv hadoop-3.2.0 hadoop +tar xJf jdk-8u345.tar.xz -C /data/projects/common/jdk +mv hadoop-3.3.1 hadoop mv scala-2.12.10 scala mv spark-3.1.2-bin-hadoop3.2 spark -mv zookeeper-3.4.14 zookeeper +mv apache-zookeeper-3.6.3-bin zookeeper ``` **配置/etc/profile** ```bash -export JAVA_HOME=/data/projects/common/jdk/jdk-8u192 +export JAVA_HOME=/data/projects/common/jdk/jdk-8u345 export PATH=$JAVA_HOME/bin:$PATH export HADOOP_HOME=/data/projects/common/hadoop export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin @@ -227,7 +227,7 @@ cd /data/projects/common/hadoop/etc/hadoop **在hadoop-env.sh、yarn-env.sh** -**加入**:export JAVA_HOME=/data/projects/common/jdk/jdk1.8.0_192 +**加入**:export JAVA_HOME=/data/projects/common/jdk/jdk-8u345 **/data/projects/common/Hadoop/etc/hadoop目录下修改core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml配置,需要根据实际情况修改里面的IP主机名、目录等。参考如下** @@ -629,7 +629,7 @@ spark.yarn.jars hdfs://fate-cluster/tmp/spark/jars/\*.jar **在spark-env.sh加入** -export JAVA_HOME=/data/projects/common/jdk/jdk-8u192 +export JAVA_HOME=/data/projects/common/jdk/jdk-8u345 export SCALA_HOME=/data/projects/common/scala diff --git a/deploy/cluster-deploy/doc/fate_on_spark/common/spark_standalone_deployment_guide.md b/deploy/cluster-deploy/doc/fate_on_spark/common/spark_standalone_deployment_guide.md index 1cc59163cb..1e88c1b2cb 100644 --- a/deploy/cluster-deploy/doc/fate_on_spark/common/spark_standalone_deployment_guide.md +++ b/deploy/cluster-deploy/doc/fate_on_spark/common/spark_standalone_deployment_guide.md @@ -6,7 +6,7 @@ ``` 1. jdk-8u192-linux-x64.tar.gz -wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/jdk-8u192.tar.gz +wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/jdk-8u345.tar.xz 2. spark-3.1.2-bin-hadoop3.2.tgz wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz ``` @@ -21,13 +21,13 @@ tar xvf spark-3.1.2-bin-hadoop3.2.tgz -C /data/projects/fate/common #If JDK is not deployed in the current environment, execute mkdir -p /data/projects/fate/common/jdk #decompression -tar xzf jdk-8u192-linux-x64.tar.gz -C /data/projects/fate/common/jdk +tar xJf jdk-8u345.tar.xz -C /data/projects/fate/common/jdk ``` **configure /etc/profile** ```bash -export JAVA_HOME=/data/projects/fate/common/jdk/jdk-8u192 +export JAVA_HOME=/data/projects/fate/common/jdk/jdk-8u345 export PATH=$JAVA_HOME/bin:$PATH export SPARK_HOME=/data/projects/fate/common/spark-3.1.2-bin-hadoop3.2 export PATH=$SPARK_HOME/bin:$PATH @@ -39,7 +39,7 @@ export PATH=$SPARK_HOME/bin:$PATH cd /data/projects/fate/common/spark-3.1.2-bin-hadoop3.2/conf cp spark-env.sh.template spark-env.sh #Add parameters -export JAVA_HOME=/data/projects/fate/common/jdk/jdk-8u192 +export JAVA_HOME=/data/projects/fate/common/jdk/jdk-8u345 export SPARK_MASTER_IP={Host IP} export SPARK_MASTER_PORT=7077 export SPARK_MASTER_WEBUI_PORT=9080 diff --git a/deploy/cluster-deploy/doc/fate_on_spark/common/spark_standalone_deployment_guide.zh.md b/deploy/cluster-deploy/doc/fate_on_spark/common/spark_standalone_deployment_guide.zh.md index db62d7327a..6a156d9c11 100644 --- a/deploy/cluster-deploy/doc/fate_on_spark/common/spark_standalone_deployment_guide.zh.md +++ b/deploy/cluster-deploy/doc/fate_on_spark/common/spark_standalone_deployment_guide.zh.md @@ -6,7 +6,7 @@ ``` 1. jdk-8u192-linux-x64.tar.gz -wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/jdk-8u192.tar.gz +wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/jdk-8u345.tar.xz 2. spark-3.1.2-bin-hadoop3.2.tgz wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz ``` @@ -21,13 +21,13 @@ tar xvf spark-3.1.2-bin-hadoop3.2.tgz -C /data/projects/fate/common #如当前环境没有部署jdk则执行 mkdir -p /data/projects/fate/common/jdk #解压缩 -tar xzf jdk-8u192-linux-x64.tar.gz -C /data/projects/fate/common/jdk +tar xJf jdk-8u345.tar.xz -C /data/projects/fate/common/jdk ``` **配置/etc/profile** ```bash -export JAVA_HOME=/data/projects/fate/common/jdk/jdk-8u192 +export JAVA_HOME=/data/projects/fate/common/jdk/jdk-8u345 export PATH=$JAVA_HOME/bin:$PATH export SPARK_HOME=/data/projects/fate/common/spark-3.1.2-bin-hadoop3.2 export PATH=$SPARK_HOME/bin:$PATH @@ -39,7 +39,7 @@ export PATH=$SPARK_HOME/bin:$PATH cd /data/projects/fate/common/spark-3.1.2-bin-hadoop3.2/conf cp spark-env.sh.template spark-env.sh #增加参数 -export JAVA_HOME=/data/projects/fate/common/jdk/jdk-8u192 +export JAVA_HOME=/data/projects/fate/common/jdk/jdk-8u345 export SPARK_MASTER_IP={主机IP} export SPARK_MASTER_PORT=7077 export SPARK_MASTER_WEBUI_PORT=9080 diff --git a/deploy/cluster-deploy/doc/fate_on_spark/fate_on_spark_deployment_guide.md b/deploy/cluster-deploy/doc/fate_on_spark/fate_on_spark_deployment_guide.md index 37a56afcf7..4441693b4b 100644 --- a/deploy/cluster-deploy/doc/fate_on_spark/fate_on_spark_deployment_guide.md +++ b/deploy/cluster-deploy/doc/fate_on_spark/fate_on_spark_deployment_guide.md @@ -177,8 +177,8 @@ Execute on the target server (192.168.0.1 with extranet environment) under the a mkdir -p /data/projects/install cd /data/projects/install wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/Miniconda3-py38_4.12.0-Linux-x86_64.sh -wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/jdk-8u192-linux-x64.tar.gz -wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/mysql-8.0.28.tar.gz +wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/jdk-8u345.tar.xz +wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/mysql-fate-8.0.28.tar.gz wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/openresty-1.17.8.2.tar.gz wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/fate/${version}/release/pip_packages_fate_${version}.tar.gz wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/fate/${version}/release/fate_install_${version}_release.tar.gz @@ -282,7 +282,7 @@ mysql>show databases; mkdir -p /data/projects/fate/common/jdk #Uncompress cd /data/projects/install -tar xzf jdk-8u192-linux-x64.tar.gz -C /data/projects/fate/common/jdk +tar xJf jdk-8u345.tar.xz -C /data/projects/fate/common/jdk ``` ### 5.5 Deploying python @@ -345,7 +345,7 @@ export FATE_DEPLOY_BASE=\$fate_project_base export PYTHONPATH=/data/projects/fate/fateflow/python:/data/projects/fate/fate/python venv=/data/projects/fate/common/python/venv -export JAVA_HOME=/data/projects/fate/common/jdk/jdk-8u192 +export JAVA_HOME=/data/projects/fate/common/jdk/jdk-8u345 export PATH=\$PATH:\$JAVA_HOME/bin source \${venv}/bin/activate export FATE_LOG_LEVEL=DEBUG diff --git a/deploy/cluster-deploy/doc/fate_on_spark/fate_on_spark_deployment_guide.zh.md b/deploy/cluster-deploy/doc/fate_on_spark/fate_on_spark_deployment_guide.zh.md index 0b59f83773..64f364cbb6 100644 --- a/deploy/cluster-deploy/doc/fate_on_spark/fate_on_spark_deployment_guide.zh.md +++ b/deploy/cluster-deploy/doc/fate_on_spark/fate_on_spark_deployment_guide.zh.md @@ -173,8 +173,8 @@ fi mkdir -p /data/projects/install cd /data/projects/install wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/Miniconda3-py38_4.12.0-Linux-x86_64.sh -wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/jdk-8u192-linux-x64.tar.gz -wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/mysql-8.0.28.tar.gz +wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/jdk-8u345.tar.xz +wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/mysql-fate-8.0.28.tar.gz wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/resources/openresty-1.17.8.2.tar.gz wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/fate/${version}/release/pip_packages_fate_${version}.tar.gz wget https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/fate/${version}/release/fate_install_${version}_release.tar.gz @@ -277,7 +277,7 @@ mysql>show databases; mkdir -p /data/projects/fate/common/jdk #解压缩 cd /data/projects/install -tar xzf jdk-8u192-linux-x64.tar.gz -C /data/projects/fate/common/jdk +tar xJf jdk-8u345.tar.xz -C /data/projects/fate/common/jdk ``` ### 5.5 部署python @@ -338,7 +338,7 @@ export FATE_DEPLOY_BASE=\$fate_project_base export PYTHONPATH=/data/projects/fate/fateflow/python:/data/projects/fate/fate/python venv=/data/projects/fate/common/python/venv -export JAVA_HOME=/data/projects/fate/common/jdk/jdk-8u192 +export JAVA_HOME=/data/projects/fate/common/jdk/jdk-8u345 export PATH=\$PATH:\$JAVA_HOME/bin source \${venv}/bin/activate export FATE_LOG_LEVEL=DEBUG diff --git a/deploy/upgrade/sql/1.7.1-1.7.2.sql b/deploy/upgrade/sql/1.7.1-1.7.2.sql index b4f22a720d..236c89825f 100644 --- a/deploy/upgrade/sql/1.7.1-1.7.2.sql +++ b/deploy/upgrade/sql/1.7.1-1.7.2.sql @@ -28,11 +28,76 @@ ALTER TABLE t_task DROP INDEX task_f_auto_retries; ALTER TABLE t_task DROP INDEX task_f_worker_id; ALTER TABLE t_task DROP INDEX task_f_party_status; -ALTER TABLE trackingmetric DROP INDEX trackingmetric_f_metric_namespace; -ALTER TABLE trackingmetric DROP INDEX trackingmetric_f_metric_name; -ALTER TABLE trackingmetric DROP INDEX trackingmetric_f_type; +DROP PROCEDURE IF EXISTS alter_trackingmetric; +DELIMITER // + +CREATE PROCEDURE alter_trackingmetric() +BEGIN + DECLARE done BOOL DEFAULT FALSE; + DECLARE date_ CHAR(8); + + DECLARE cur CURSOR FOR SELECT RIGHT(TABLE_NAME, 8) FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_SCHEMA = (SELECT DATABASE()) AND TABLE_NAME LIKE 't\_tracking\_metric\_%'; + DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE; + + OPEN cur; + + loop_: LOOP + FETCH cur INTO date_; + IF done THEN + LEAVE loop_; + END IF; + + SET @sql = CONCAT( + 'ALTER TABLE t_tracking_metric_', date_, + ' DROP INDEX trackingmetric_', date_, '_f_metric_namespace,', + ' DROP INDEX trackingmetric_', date_, '_f_metric_name,', + ' DROP INDEX trackingmetric_', date_, '_f_type;' + ); + PREPARE stmt FROM @sql; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + END LOOP; + + CLOSE cur; +END // + +DELIMITER ; +CALL alter_trackingmetric(); +DROP PROCEDURE alter_trackingmetric; -ALTER TABLE trackingoutputdatainfo DROP INDEX trackingoutputdatainfo_f_task_version; +DROP PROCEDURE IF EXISTS alter_trackingoutputdatainfo; +DELIMITER // + +CREATE PROCEDURE alter_trackingoutputdatainfo() +BEGIN + DECLARE done BOOL DEFAULT FALSE; + DECLARE date_ CHAR(8); + + DECLARE cur CURSOR FOR SELECT RIGHT(TABLE_NAME, 8) FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_SCHEMA = (SELECT DATABASE()) AND TABLE_NAME LIKE 't\_tracking\_output\_data\_info\_%'; + DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE; + + OPEN cur; + + loop_: LOOP + FETCH cur INTO date_; + IF done THEN + LEAVE loop_; + END IF; + + SET @sql = CONCAT('ALTER TABLE t_tracking_output_data_info_', date_, 'DROP INDEX trackingoutputdatainfo_', date_, '_f_task_version;'); + PREPARE stmt FROM @sql; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + END LOOP; + + CLOSE cur; +END // + +DELIMITER ; +CALL alter_trackingoutputdatainfo(); +DROP PROCEDURE alter_trackingoutputdatainfo; ALTER TABLE t_machine_learning_model_info DROP INDEX machinelearningmodelinfo_f_role; ALTER TABLE t_machine_learning_model_info DROP INDEX machinelearningmodelinfo_f_party_id; @@ -65,7 +130,7 @@ BEGIN LEAVE loop_; END IF; - SET @sql = CONCAT('ALTER TABLE t_component_summary_', date_, ' DROP INDEX componentsummary_', date_, '_f_task_version'); + SET @sql = CONCAT('ALTER TABLE t_component_summary_', date_, ' DROP INDEX componentsummary_', date_, '_f_task_version;'); PREPARE stmt FROM @sql; EXECUTE stmt; DEALLOCATE PREPARE stmt; diff --git a/deploy/upgrade/sql/1.8.0-1.9.0.sql b/deploy/upgrade/sql/1.8.0-1.9.0.sql index b5a9bfb3d9..abeed0bb6b 100644 --- a/deploy/upgrade/sql/1.8.0-1.9.0.sql +++ b/deploy/upgrade/sql/1.8.0-1.9.0.sql @@ -1,2 +1,161 @@ +ALTER TABLE t_session_record DROP PRIMARY KEY, ADD id INT NOT NULL AUTO_INCREMENT PRIMARY KEY FIRST; + +ALTER TABLE t_task ADD f_run_port INT; +ALTER TABLE t_task ADD f_kill_status BOOL NOT NULL DEFAULT FALSE; +ALTER TABLE t_task ADD f_error_report TEXT; + +DROP PROCEDURE IF EXISTS alter_trackingmetric; +DELIMITER // + +CREATE PROCEDURE alter_trackingmetric() +BEGIN + DECLARE done BOOL DEFAULT FALSE; + DECLARE date_ CHAR(8); + + DECLARE cur CURSOR FOR SELECT RIGHT(TABLE_NAME, 8) FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_SCHEMA = (SELECT DATABASE()) AND TABLE_NAME LIKE 't\_tracking\_metric\_%'; + DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE; + + OPEN cur; + + loop_: LOOP + FETCH cur INTO date_; + IF done THEN + LEAVE loop_; + END IF; + + SET @sql = CONCAT( + 'ALTER TABLE t_tracking_metric_', date_, + ' MODIFY f_component_name VARCHAR(30) NOT NULL,', + ' ADD INDEX trackingmetric_', date_, '_f_component_name (f_component_name),', + ' DROP INDEX trackingmetric_', date_, '_f_task_id,', + ' DROP INDEX trackingmetric_', date_, '_f_task_version,', + ' MODIFY f_role VARCHAR(10) NOT NULL,', + ' DROP INDEX trackingmetric_', date_, '_f_party_id,', + ' MODIFY f_metric_namespace VARCHAR(80) NOT NULL,', + ' ADD INDEX trackingmetric_', date_, '_f_metric_namespace (f_metric_namespace),', + ' MODIFY f_metric_name VARCHAR(80) NOT NULL,', + ' ADD INDEX trackingmetric_', date_, '_f_metric_name (f_metric_name);' + ); + PREPARE stmt FROM @sql; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + END LOOP; + + CLOSE cur; +END // + +DELIMITER ; +CALL alter_trackingmetric(); +DROP PROCEDURE alter_trackingmetric; + +ALTER TABLE t_machine_learning_model_info DROP f_description; +ALTER TABLE t_machine_learning_model_info DROP f_job_status; ALTER TABLE t_machine_learning_model_info ADD f_archive_sha256 VARCHAR(100); ALTER TABLE t_machine_learning_model_info ADD f_archive_from_ip VARCHAR(100); + +DROP PROCEDURE IF EXISTS alter_componentsummary; +DELIMITER // + +CREATE PROCEDURE alter_componentsummary() +BEGIN + DECLARE done BOOL DEFAULT FALSE; + DECLARE date_ CHAR(8); + + DECLARE cur CURSOR FOR SELECT RIGHT(TABLE_NAME, 8) FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_SCHEMA = (SELECT DATABASE()) AND TABLE_NAME LIKE 't\_component\_summary\_%'; + DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE; + + OPEN cur; + + loop_: LOOP + FETCH cur INTO date_; + IF done THEN + LEAVE loop_; + END IF; + + SET @sql = CONCAT('ALTER TABLE t_component_summary_', date_, ' MODIFY f_component_name VARCHAR(50) NOT NULL;'); + PREPARE stmt FROM @sql; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + END LOOP; + + CLOSE cur; +END // + +DELIMITER ; +CALL alter_componentsummary(); +DROP PROCEDURE alter_componentsummary; + +DROP TABLE t_model_operation_log; + +CREATE TABLE t_server_registry_info ( + id INT NOT NULL AUTO_INCREMENT, + f_create_time BIGINT, + f_create_date DATETIME, + f_update_time BIGINT, + f_update_date DATETIME, + f_server_name VARCHAR(30) NOT NULL, + f_host VARCHAR(30) NOT NULL, + f_port INT NOT NULL, + f_protocol VARCHAR(10) NOT NULL, + + PRIMARY KEY (id), + INDEX serverregistryinfo_f_server_name (f_server_name) +); + +CREATE TABLE t_service_registry_info ( + f_create_time BIGINT, + f_create_date DATETIME, + f_update_time BIGINT, + f_update_date DATETIME, + f_server_name VARCHAR(30) NOT NULL, + f_service_name VARCHAR(30) NOT NULL, + f_url VARCHAR(100) NOT NULL, + f_method VARCHAR(10) NOT NULL, + f_params LONGTEXT, + f_data LONGTEXT, + f_headers LONGTEXT, + + PRIMARY KEY (f_server_name, f_service_name) +); + +CREATE TABLE t_site_key_info ( + f_create_time BIGINT, + f_create_date DATETIME, + f_update_time BIGINT, + f_update_date DATETIME, + f_party_id VARCHAR(10) NOT NULL, + f_key_name VARCHAR(10) NOT NULL, + f_key LONGTEXT NOT NULL, + + PRIMARY KEY (f_party_id, f_key_name) +); + +CREATE TABLE t_pipeline_component_meta ( + id INT NOT NULL AUTO_INCREMENT, + f_create_time BIGINT, + f_create_date DATETIME, + f_update_time BIGINT, + f_update_date DATETIME, + f_model_id VARCHAR(100) NOT NULL, + f_model_version VARCHAR(100) NOT NULL, + f_role VARCHAR(50) NOT NULL, + f_party_id VARCHAR(10) NOT NULL, + f_component_name VARCHAR(100) NOT NULL, + f_component_module_name VARCHAR(100) NOT NULL, + f_model_alias VARCHAR(100) NOT NULL, + f_model_proto_index LONGTEXT, + f_run_parameters LONGTEXT, + f_archive_sha256 VARCHAR(100), + f_archive_from_ip VARCHAR(100), + + PRIMARY KEY (id), + INDEX pipelinecomponentmeta_f_model_id (f_model_id), + INDEX pipelinecomponentmeta_f_model_version (f_model_version), + INDEX pipelinecomponentmeta_f_role (f_role), + INDEX pipelinecomponentmeta_f_party_id (f_party_id), + INDEX pipelinecomponentmeta_f_component_name (f_component_name), + INDEX pipelinecomponentmeta_f_model_alias (f_model_alias), + UNIQUE INDEX (f_model_id, f_model_version, f_role, f_party_id, f_component_name) +); diff --git a/eggroll b/eggroll index 7bf1abba62..8521eba4bb 160000 --- a/eggroll +++ b/eggroll @@ -1 +1 @@ -Subproject commit 7bf1abba6221ca13d8c071827dfa25b2d73362e6 +Subproject commit 8521eba4bbb474541a6054e671219121c3214ce7 diff --git a/fate.env b/fate.env index cce8245434..93e1a97b3d 100755 --- a/fate.env +++ b/fate.env @@ -1,10 +1,10 @@ -FATE=1.9.0 -FATEFlow=1.9.0 -FATEBoard=1.9.0 -EGGROLL=2.4.5 +FATE=1.9.1 +FATEFlow=1.9.1 +FATEBoard=1.9.1 +EGGROLL=2.4.6 CENTOS=7.2 UBUNTU=16.04 PYTHON=3.8 MAVEN=3.6.3 JDK=8 -SPARK=3.1.3 +SPARK=3.3.1 diff --git a/fateboard b/fateboard index c199bc3113..16a67630cf 160000 --- a/fateboard +++ b/fateboard @@ -1 +1 @@ -Subproject commit c199bc311315d472a289fcb81adf2328e8233ef0 +Subproject commit 16a67630cf8a9c0e30b5fb1d7dd822a1f0c94510 diff --git a/fateflow b/fateflow index 3afbc3e5d3..6feffaffbb 160000 --- a/fateflow +++ b/fateflow @@ -1 +1 @@ -Subproject commit 3afbc3e5d335ac96634eadfc493c4c697ecbfc19 +Subproject commit 6feffaffbb8f7057641ff89d42f669d7c08ae59d diff --git a/python/fate_arch/common/path_utils.py b/python/fate_arch/common/path_utils.py index 78cfd7cd16..1b0c6dcb14 100644 --- a/python/fate_arch/common/path_utils.py +++ b/python/fate_arch/common/path_utils.py @@ -20,9 +20,11 @@ def get_data_table_count(path): + count = 0 config_path = os.path.join(path, "config.yaml") + if not os.path.exists(config_path): + return count config = file_utils.load_yaml_conf(conf_path=config_path) - count = 0 if config: if config.get("type") != "vision": raise Exception(f"can not support this type {config.get('type')}") diff --git a/python/federatedml/ensemble/basic_algorithms/decision_tree/hetero/hetero_decision_tree_guest.py b/python/federatedml/ensemble/basic_algorithms/decision_tree/hetero/hetero_decision_tree_guest.py index f1de3b84b2..1539c04e4b 100644 --- a/python/federatedml/ensemble/basic_algorithms/decision_tree/hetero/hetero_decision_tree_guest.py +++ b/python/federatedml/ensemble/basic_algorithms/decision_tree/hetero/hetero_decision_tree_guest.py @@ -40,6 +40,7 @@ def __init__(self, tree_param): self.run_cipher_compressing = True self.packer = None self.max_sample_weight = 1 + self.objective = None # code version control self.new_ver = True @@ -115,6 +116,7 @@ def init(self, flowid, runtime_idx, data_bin, bin_split_points, bin_sparse_point task_type, class_num=1, complete_secure=False, + objective=None, goss_subsample=False, cipher_compressing=False, max_sample_weight=1, @@ -135,6 +137,7 @@ def init(self, flowid, runtime_idx, data_bin, bin_split_points, bin_sparse_point self.max_sample_weight = max_sample_weight self.task_type = task_type self.mo_tree = mo_tree + self.objective = objective if self.mo_tree: # when mo mode is activated, need class number self.class_num = class_num else: @@ -359,13 +362,21 @@ def init_packer_and_sync_gh(self, idx=-1): statistics = MultivariateStatisticalSummary(self.grad_and_hess, -1) g_min = statistics.get_min()['g'] g_max = statistics.get_max()['g'] - + if self.objective == 'lse': + h_max = 2 + elif self.objective == 'lae': + h_max = 1 + else: + h_max = statistics.get_max()['h'] + else: + h_max = None self.packer = GHPacker(sample_num=self.grad_and_hess.count(), task_type=self.task_type, max_sample_weight=self.max_sample_weight, encrypter=self.encrypter, g_min=g_min, g_max=g_max, + h_max=h_max, mo_mode=self.mo_tree, # mo packing class_num=self.class_num # no mo packing ) diff --git a/python/federatedml/ensemble/basic_algorithms/decision_tree/tree_core/g_h_optim.py b/python/federatedml/ensemble/basic_algorithms/decision_tree/tree_core/g_h_optim.py index 725581d096..9c9bce3f34 100644 --- a/python/federatedml/ensemble/basic_algorithms/decision_tree/tree_core/g_h_optim.py +++ b/python/federatedml/ensemble/basic_algorithms/decision_tree/tree_core/g_h_optim.py @@ -85,7 +85,7 @@ class GHPacker(object): def __init__(self, sample_num: int, encrypter: PaillierEncrypt, precision=fix_point_precision, max_sample_weight=1.0, task_type=consts.CLASSIFICATION, - g_min=None, g_max=None, class_num=1, mo_mode=False, sync_para=True): + g_min=None, g_max=None, h_max=None, class_num=1, mo_mode=False, sync_para=True): if task_type == consts.CLASSIFICATION: g_max = 1.0 @@ -99,7 +99,8 @@ def __init__(self, sample_num: int, encrypter: PaillierEncrypt, else: g_max = g_max g_min = g_min - h_max = 2.0 + if h_max is None: + h_max = 2 else: raise ValueError('unknown task type {}'.format(task_type)) @@ -118,7 +119,6 @@ def _compute_packing_parameter(self, sample_num: int, precision=2 ** 53): h_sum_max = self.h_max * sample_num h_max_int = int(h_sum_max * precision) + 1 - g_offset_max = self.g_offset + self.g_max g_max_int = int(g_offset_max * sample_num * precision) + 1 diff --git a/python/federatedml/ensemble/basic_algorithms/decision_tree/tree_core/loss/regression_loss.py b/python/federatedml/ensemble/basic_algorithms/decision_tree/tree_core/loss/regression_loss.py index cdd74432cc..843f240f58 100644 --- a/python/federatedml/ensemble/basic_algorithms/decision_tree/tree_core/loss/regression_loss.py +++ b/python/federatedml/ensemble/basic_algorithms/decision_tree/tree_core/loss/regression_loss.py @@ -15,6 +15,7 @@ # import numpy as np +import functools from federatedml.feature.instance import Instance from federatedml.util import consts from federatedml.statistic.statics import MultivariateStatisticalSummary @@ -206,11 +207,8 @@ def compute_hess(y, y_pred): class TweedieLoss(object): @staticmethod def initialize(y): - y_inst = y.mapValues(lambda label: Instance(features=np.asarray([label]))) - y_inst.schema = {"header": ["label"]} - statistics = MultivariateStatisticalSummary(y_inst, -1) - mean = statistics.get_mean()["label"] - return y.mapValues(lambda x: np.asarray([mean])), np.asarray([mean]) + # init score = 0, equals to base_score=1.0 in xgb, init_score=log(base_score)=0 + return y.mapValues(lambda x: np.asarray([0])), np.asarray([0]) def __init__(self, rho=None): if rho is None: @@ -220,19 +218,27 @@ def __init__(self, rho=None): @staticmethod def predict(value): - return value + return np.exp(value) + + @staticmethod + def _tweedie_loss(label, pred, rho): + if pred < 1e-10: + pred = 1e-10 + a = label * np.exp((1 - rho) * np.log(pred)) / (1 - rho) + b = np.exp((2 - rho) * np.log(pred)) / (2 - rho) + return (-a + b), 1 def compute_loss(self, y, y_pred): - tweedie_loss = y.join(y_pred, - lambda y, yp: - (-y * np.exp(1 - self.rho) * np.log(max(yp, consts.FLOAT_ZERO)) / (1 - self.rho) + - np.exp(2 - self.rho) * np.log(max(consts.FLOAT_ZERO, yp)) / (2 - self.rho), 1)) + loss_func = functools.partial(self._tweedie_loss, rho=self.rho) + tweedie_loss = y.join(y_pred, loss_func) tweedie_loss_sum, sample_num = tweedie_loss.reduce(lambda tuple1, tuple2: (tuple1[0] + tuple2[0], tuple1[1] + tuple2[1])) return tweedie_loss_sum / sample_num def compute_grad(self, y, y_pred): - return -y * np.exp(1 - self.rho) * y_pred + np.exp(2 - self.rho) * y_pred + if y < 0: + raise ValueError('y < 0, in tweedie loss label must be non-negative, but got {}'.format(y)) + return -y * np.exp((1 - self.rho) * y_pred) + np.exp((2 - self.rho) * y_pred) def compute_hess(self, y, y_pred): - return -y * (1 - self.rho) * np.exp(1 - self.rho) * y_pred + (2 - self.rho) * np.exp(2 - self.rho) * y_pred + return -y * (1 - self.rho) * np.exp((1 - self.rho) * y_pred) + (2 - self.rho) * np.exp((2 - self.rho) * y_pred) diff --git a/python/federatedml/ensemble/boosting/boosting.py b/python/federatedml/ensemble/boosting/boosting.py index 1092be5878..d20f661d2f 100644 --- a/python/federatedml/ensemble/boosting/boosting.py +++ b/python/federatedml/ensemble/boosting/boosting.py @@ -343,10 +343,10 @@ def compute_loss(self, y_hat, y): y_predict = y_hat.mapValues(lambda val: loss_method.predict(val)) loss = loss_method.compute_loss(y, y_predict) elif self.task_type == consts.REGRESSION: - if self.objective_param.objective in ["lse", "lae", "logcosh", "tweedie", "log_cosh", "huber"]: + if self.objective_param.objective in ["lse", "lae", "logcosh", "log_cosh", "huber"]: loss_method = self.loss loss = loss_method.compute_loss(y, y_hat) - else: + elif self.objective_param.objective in ['tweedie']: loss_method = self.loss y_predict = y_hat.mapValues(lambda val: loss_method.predict(val)) loss = loss_method.compute_loss(y, y_predict) @@ -441,21 +441,22 @@ def score_to_predict_result(self, data_inst, y_hat): given binary/multi-class/regression prediction scores, outputs result in standard format """ predicts = None + loss_method = self.loss if self.task_type == consts.CLASSIFICATION: - loss_method = self.loss if self.num_classes == 2: predicts = y_hat.mapValues(lambda f: float(loss_method.predict(f))) else: predicts = y_hat.mapValues(lambda f: loss_method.predict(f).tolist()) elif self.task_type == consts.REGRESSION: - if self.objective_param.objective in ["lse", "lae", "huber", "log_cosh", "fair", "tweedie"]: + if self.objective_param.objective in ["tweedie"]: + predicts = y_hat.mapValues(lambda f: [float(loss_method.predict(f))]) + elif self.objective_param.objective in ["lse", "lae", "huber", "log_cosh", "fair"]: predicts = y_hat else: raise NotImplementedError("objective {} not supprted yet".format(self.objective_param.objective)) if self.task_type == consts.CLASSIFICATION: - predict_result = self.predict_score_to_output(data_inst, predict_score=predicts, classes=self.classes_, threshold=self.predict_param.threshold) diff --git a/python/federatedml/ensemble/secureboost/hetero_secoreboost/hetero_secureboost_guest.py b/python/federatedml/ensemble/secureboost/hetero_secoreboost/hetero_secureboost_guest.py index 0987e8f778..a89f54497e 100644 --- a/python/federatedml/ensemble/secureboost/hetero_secoreboost/hetero_secureboost_guest.py +++ b/python/federatedml/ensemble/secureboost/hetero_secoreboost/hetero_secureboost_guest.py @@ -238,6 +238,7 @@ def fit_a_learner(self, epoch_idx: int, booster_dim: int): cipher_compress=self.cipher_compressing, g_h=g_h, encrypter=self.encrypter, goss_subsample=self.enable_goss, + objective=self.objective_param.objective, complete_secure=complete_secure, max_sample_weights=self.max_sample_weight, fast_sbt=fast_sbt, tree_type=tree_type, target_host_id=target_host_id, guest_depth=self.guest_depth, host_depth=self.host_depth, diff --git a/python/federatedml/ensemble/secureboost/secureboost_util/tree_model_io.py b/python/federatedml/ensemble/secureboost/secureboost_util/tree_model_io.py index 9aa5cd20ac..14ca7c7ff0 100644 --- a/python/federatedml/ensemble/secureboost/secureboost_util/tree_model_io.py +++ b/python/federatedml/ensemble/secureboost/secureboost_util/tree_model_io.py @@ -13,6 +13,7 @@ def produce_hetero_tree_learner(role, tree_param: DecisionTreeParam, flow_id, da g_h=None, encrypter=None, # guest only goss_subsample=False, complete_secure=False, max_sample_weights=1.0, + objective=None, bin_num=None, # host only fast_sbt=False, tree_type=None, target_host_id=None, # fast sbt only @@ -41,7 +42,8 @@ def produce_hetero_tree_learner(role, tree_param: DecisionTreeParam, flow_id, da cipher_compressing=cipher_compress, max_sample_weight=max_sample_weights, mo_tree=mo_tree, - class_num=class_num + class_num=class_num, + objective=objective ) elif role == consts.HOST: diff --git a/python/federatedml/feature/binning/iv_calculator.py b/python/federatedml/feature/binning/iv_calculator.py index f3a5e395a1..7244c6cee2 100644 --- a/python/federatedml/feature/binning/iv_calculator.py +++ b/python/federatedml/feature/binning/iv_calculator.py @@ -58,7 +58,8 @@ def cal_local_iv(self, data_instances, split_points, bin_indexes.append(bin_cols_map[h]) if label_counts is None: label_counts = data_overview.get_label_count(data_instances) - labels = list(label_counts.keys()) + labels = sorted(label_counts.keys()) + labels.reverse() label_counts = [label_counts[k] for k in labels] data_bin_table = BaseBinning.get_data_bin(data_instances, split_points, bin_cols_map) diff --git a/python/federatedml/feature/hetero_feature_binning/hetero_binning_guest.py b/python/federatedml/feature/hetero_feature_binning/hetero_binning_guest.py index a5f2aa2a73..68304a24df 100644 --- a/python/federatedml/feature/hetero_feature_binning/hetero_binning_guest.py +++ b/python/federatedml/feature/hetero_feature_binning/hetero_binning_guest.py @@ -144,6 +144,8 @@ def stat_label(self, data_instances): if self._stage == "fit": self.labels = list(label_counts_dict.keys()) + self.labels.sort() + self.labels.reverse() label_counts = [label_counts_dict.get(k, 0) for k in self.labels] label_table = IvCalculator.convert_label(data_instances, self.labels) diff --git a/python/federatedml/param/data_transform_param.py b/python/federatedml/param/data_transform_param.py index dd302b5997..b70f7b0820 100644 --- a/python/federatedml/param/data_transform_param.py +++ b/python/federatedml/param/data_transform_param.py @@ -175,7 +175,7 @@ def check(self): if not isinstance(self.match_id_index, int) or self.match_id_index < 0: raise ValueError("match_id_index should be non negative integer") - if not isinstance(self.match_id_name, str): + if self.match_id_name is not None and not isinstance(self.match_id_name, str): raise ValueError("match_id_name should be str") return True diff --git a/python/federatedml/statistic/intersect/intersect_model.py b/python/federatedml/statistic/intersect/intersect_model.py index f26390fd3e..4002a0bb77 100644 --- a/python/federatedml/statistic/intersect/intersect_model.py +++ b/python/federatedml/statistic/intersect/intersect_model.py @@ -37,7 +37,7 @@ def __init__(self): super().__init__() self.intersection_obj = None self.proc_obj = None - self.intersect_num = -1 + # self.intersect_num = -1 self.intersect_rate = -1 self.unmatched_num = -1 self.unmatched_rate = -1 @@ -51,6 +51,9 @@ def __init__(self): self.use_match_id_process = False self.role = None self.intersect_method = None + self.match_id_num = None + self.match_id_intersect_num = -1 + self.recovered_num = -1 self.guest_party_id = None self.host_party_id = None @@ -76,8 +79,9 @@ def init_intersect_method(self): raise ValueError("role {} is not support".format(self.role)) def get_model_summary(self): - return {"intersect_num": self.intersect_num, "intersect_rate": self.intersect_rate, - "cardinality_only": self.intersection_obj.cardinality_only} + return {"intersect_num": self.match_id_intersect_num, "intersect_rate": self.intersect_rate, + "cardinality_only": self.intersection_obj.cardinality_only, + "unique_id_num": self.match_id_num} def sync_use_match_id(self): raise NotImplementedError(f"Should not be called here.") @@ -169,12 +173,23 @@ def _generate_nan_instance(): def callback(self): meta_info = {"intersect_method": self.intersect_method, "join_method": self.model_param.join_method} - self.callback_metric(metric_name=self.metric_name, - metric_namespace=self.metric_namespace, - metric_data=[Metric("intersect_count", self.intersect_num), - Metric("intersect_rate", self.intersect_rate), - Metric("unmatched_count", self.unmatched_num), - Metric("unmatched_rate", self.unmatched_rate)]) + if self.use_match_id_process: + self.callback_metric(metric_name=self.metric_name, + metric_namespace=self.metric_namespace, + metric_data=[Metric("intersect_count", self.match_id_intersect_num), + Metric("input_match_id_count", self.match_id_num), + Metric("intersect_rate", self.intersect_rate), + Metric("unmatched_count", self.unmatched_num), + Metric("unmatched_rate", self.unmatched_rate), + Metric("intersect_sample_id_count", self.recovered_num)]) + else: + self.callback_metric(metric_name=self.metric_name, + metric_namespace=self.metric_namespace, + metric_data=[Metric("intersect_count", self.match_id_intersect_num), + Metric("input_match_id_count", self.match_id_num), + Metric("intersect_rate", self.intersect_rate), + Metric("unmatched_count", self.unmatched_num), + Metric("unmatched_rate", self.unmatched_rate)]) self.tracker.set_metric_meta(metric_namespace=self.metric_namespace, metric_name=self.metric_name, metric_meta=MetricMeta(name=self.metric_name, @@ -220,11 +235,12 @@ def fit(self, data): if data_overview.check_with_inst_id(data) or self.model_param.with_sample_id: self.proc_obj.use_sample_id() match_data = self.proc_obj.recover(data=data) + self.match_id_num = match_data.count() if self.intersection_obj.run_cache: self.cache_output = self.intersection_obj.generate_cache(match_data) intersect_meta = self.intersection_obj.get_intersect_method_meta() self.callback_cache_meta(intersect_meta) - return data + return if self.intersection_obj.cardinality_only: self.intersection_obj.run_cardinality(match_data) else: @@ -232,15 +248,18 @@ def fit(self, data): if self.model_param.run_preprocess: intersect_data = self.run_preprocess(match_data) self.intersect_ids = self.intersection_obj.run_intersect(intersect_data) + if self.intersect_ids: + self.match_id_intersect_num = self.intersect_ids.count() else: if self.model_param.join_method == consts.LEFT_JOIN: raise ValueError(f"Only data with match_id may apply left_join method. Please check input data format") + self.match_id_num = data.count() if self.intersection_obj.run_cache: self.cache_output = self.intersection_obj.generate_cache(data) intersect_meta = self.intersection_obj.get_intersect_method_meta() # LOGGER.debug(f"callback intersect meta is: {intersect_meta}") self.callback_cache_meta(intersect_meta) - return data + return if self.intersection_obj.cardinality_only: self.intersection_obj.run_cardinality(data) else: @@ -248,13 +267,14 @@ def fit(self, data): if self.model_param.run_preprocess: intersect_data = self.run_preprocess(data) self.intersect_ids = self.intersection_obj.run_intersect(intersect_data) - + if self.intersect_ids: + self.match_id_intersect_num = self.intersect_ids.count() if self.intersection_obj.cardinality_only: if self.intersection_obj.intersect_num is not None: - data_count = data.count() - self.intersect_num = self.intersection_obj.intersect_num - self.intersect_rate = self.intersect_num / data_count - self.unmatched_num = data_count - self.intersect_num + # data_count = data.count() + self.match_id_intersect_num = self.intersection_obj.intersect_num + self.intersect_rate = self.match_id_intersect_num / self.match_id_num + self.unmatched_num = self.match_id_num - self.match_id_intersect_num self.unmatched_rate = 1 - self.intersect_rate self.set_summary(self.get_model_summary()) self.callback() @@ -268,6 +288,8 @@ def fit(self, data): self.intersect_ids = self.proc_obj.expand(self.intersect_ids, match_data=match_data, owner_only=True) + if self.intersect_ids: + self.recovered_num = self.intersect_ids.count() if self.model_param.only_output_key and self.intersect_ids: self.intersect_ids = self.intersect_ids.mapValues(lambda v: Instance(inst_id=v.inst_id)) # self.intersect_ids.schema = {"match_id_name": data.schema["match_id_name"], @@ -277,10 +299,8 @@ def fit(self, data): LOGGER.info("Finish intersection") if self.intersect_ids: - data_count = data.count() - self.intersect_num = self.intersect_ids.count() - self.intersect_rate = self.intersect_num / data_count - self.unmatched_num = data_count - self.intersect_num + self.intersect_rate = self.match_id_intersect_num / self.match_id_num + self.unmatched_num = self.match_id_num - self.match_id_intersect_num self.unmatched_rate = 1 - self.intersect_rate self.set_summary(self.get_model_summary()) @@ -355,6 +375,7 @@ def intersect_online_process(self, data_inst, caches): self.sync_use_match_id() intersect_data = data_inst + self.match_id_num = data_inst.count() if self.use_match_id_process: if len(self.host_party_id_list) > 1 and self.model_param.sample_id_generator != consts.GUEST: raise ValueError("While multi-host, sample_id_generator should be guest.") @@ -375,11 +396,13 @@ def intersect_online_process(self, data_inst, caches): proc_obj.use_sample_id() match_data = proc_obj.recover(data=data_inst) intersect_data = match_data + self.match_id_num = match_data.count() if self.role == consts.HOST: cache_id = cache_meta[str(self.guest_party_id)].get("cache_id") self.transfer_variable.cache_id.remote(cache_id, role=consts.GUEST, idx=0) guest_cache_id = self.transfer_variable.cache_id.get(role=consts.GUEST, idx=0) + self.match_id_num = list(cache_data.values())[0].count() if guest_cache_id != cache_id: raise ValueError(f"cache_id check failed. cache_id from host & guest must match.") elif self.role == consts.GUEST: @@ -395,6 +418,7 @@ def intersect_online_process(self, data_inst, caches): raise ValueError(f"Role {self.role} cannot run intersection transform.") self.intersect_ids = self.intersection_obj.run_cache_intersect(intersect_data, cache_data) + self.match_id_intersect_num = self.intersect_ids.count() if self.use_match_id_process: if not self.model_param.sync_intersect_ids: self.intersect_ids = proc_obj.expand(self.intersect_ids, @@ -402,6 +426,8 @@ def intersect_online_process(self, data_inst, caches): owner_only=True) else: self.intersect_ids = proc_obj.expand(self.intersect_ids, match_data=match_data) + if self.intersect_ids: + self.recovered_num = self.intersect_ids.count() if self.intersect_ids and self.model_param.only_output_key: self.intersect_ids = self.intersect_ids.mapValues(lambda v: Instance(inst_id=v.inst_id)) # self.intersect_ids.schema = {"match_id_name": data_inst.schema["match_id_name"], @@ -411,10 +437,8 @@ def intersect_online_process(self, data_inst, caches): LOGGER.info("Finish intersection") if self.intersect_ids: - data_count = data_inst.count() - self.intersect_num = self.intersect_ids.count() - self.intersect_rate = self.intersect_num / data_count - self.unmatched_num = data_count - self.intersect_num + self.intersect_rate = self.match_id_intersect_num / self.match_id_num + self.unmatched_num = self.match_id_num - self.match_id_intersect_num self.unmatched_rate = 1 - self.intersect_rate self.set_summary(self.get_model_summary()) diff --git a/python/federatedml/util/data_format_preprocess.py b/python/federatedml/util/data_format_preprocess.py index 11766a9956..b7f20fac13 100644 --- a/python/federatedml/util/data_format_preprocess.py +++ b/python/federatedml/util/data_format_preprocess.py @@ -202,7 +202,10 @@ def generate_header(data, schema): generated_header["is_display"] = False - generated_header["sid"] = schema.get("sid", DEFAULT_SID_NAME).strip() + sid = schema.get("sid") + if sid is None or sid == "": + sid = DEFAULT_SID_NAME + generated_header["sid"] = sid.strip() return generated_header @@ -302,3 +305,51 @@ def clean_header(schema): schema["header"] = "" return schema + + @staticmethod + def recover_schema(schema): + if not schema.get('meta'): + raise ValueError("Meta not in schema, can not recover meta") + + recovery_schema = copy.deepcopy(schema) + meta = schema["meta"] + input_format = meta.get("input_format", "dense") + if input_format == "dense": + """schema has not been processed yet""" + if "original_index_info" not in schema: + return recovery_schema + + header_list = DataFormatPreProcess.reconstruct_header(schema) + del recovery_schema["original_index_info"] + delimiter = schema.get("delimiter", ",") + header = "" if not header_list else delimiter.join(header_list) + recovery_schema["header"] = header + + if "label_name" in recovery_schema: + del recovery_schema["label_name"] + + if meta.get("with_match_id"): + del recovery_schema["match_id_name"] + else: + recovery_schema["header"] = "" + if "label_name" in recovery_schema: + del recovery_schema["label_name"] + + if meta.get("id_range"): + recovery_schema["meta"]["id_range"] = 0 + + if meta.get("with_label"): + del recovery_schema["meta"]["label_name"] + + del recovery_schema["is_display"] + + if meta.get("with_match_id"): + del recovery_schema["match_id_name"] + + if "anonymous_header" in schema: + del recovery_schema["anonymous_header"] + + if "anonymous_label" in schema: + del recovery_schema["anonymous_label"] + + return recovery_schema diff --git a/python/federatedml/util/data_transform.py b/python/federatedml/util/data_transform.py index 727f5ecf86..8b52f16629 100644 --- a/python/federatedml/util/data_transform.py +++ b/python/federatedml/util/data_transform.py @@ -93,9 +93,9 @@ def _update_param(self, schema): if self.with_match_id: match_id_name = schema.get("match_id_name", []) if not self.match_id_name: - if isinstance(match_id_name, list): + if isinstance(match_id_name, list) and len(self.match_id_name) > 1: raise ValueError("Multiple Match ID exist, please specified the one to use") - self.match_id_name = match_id_name + self.match_id_name = match_id_name[0] if isinstance(match_id_name, list) else match_id_name self.match_id_index = schema["original_index_info"]["match_id_index"][0] else: try: @@ -703,8 +703,13 @@ def _update_param(self, schema): if self.with_match_id: match_id_name = schema.get("match_id_name") if isinstance(match_id_name, list): + if not isinstance(self.match_id_index, int) or self.match_id_index >= len(match_id_name): + raise ValueError(f"match id index should between 0 and {len(match_id_name) - 1}, " + f"but {self.match_id_index} is given") self.match_id_name = match_id_name[self.match_id_index] else: + if self.match_id_index != 0: + raise ValueError("Only one match_id exist, match_id_index should be 0") self.match_id_name = match_id_name schema["match_id_name"] = self.match_id_name diff --git a/python/requirements.txt b/python/requirements.txt index 6d6766e9ab..9039b50c44 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -9,7 +9,7 @@ cloudpickle==2.1.0 cos-python-sdk-v5==1.9.10 Flask==2.0.3 gmpy2==2.0.8 -joblib==1.0.1 +joblib==1.2.0 kazoo==2.6.1 pyyaml==5.4.1 kfserving==0.5.1 @@ -21,7 +21,7 @@ peewee==3.9.3 psutil>=5.7.0 pycryptodomex==3.15.0 PyMySQL==0.9.3 -pyspark==3.1.3 +pyspark==3.3.1 python-dotenv==0.13.0 redis==3.5.3 urllib3==1.26.5 @@ -41,8 +41,7 @@ cryptography==3.3.2 sortedcontainers==2.2.2 pytorch-lightning>=1.6.5 filelock==3.3.1 -pulsar-client==2.10.1; sys_platform == "linux" -pulsar-client==2.10.0; sys_platform == "darwin" +pulsar-client==2.10.2 fastavro==1.4.1 lightgbm==3.3.1 shortuuid==1.0.9 @@ -55,8 +54,7 @@ etaf-crypto grpcio==1.46.3 grpcio-tools==1.46.3 numba==0.53.0 -phe>=1.4.0 -protobuf==3.19.4 +protobuf==3.19.6 pyarrow==6.0.1 mmh3==3.0.0