Browse Source

支持清除旧数据的逻辑

Joey 2 weeks ago
parent
commit
86e02807f7

+ 2 - 0
modules/CalculationJobs.dos

@@ -10,6 +10,8 @@ use fundit::task_weeklyPerformance;
 /* 
 /* 
  *  定时任务
  *  定时任务
  *  
  *  
+ *  TODO: 需要使此脚本每次自动加载
+ *  
  * 查询任务运行; getRecentJobs();
  * 查询任务运行; getRecentJobs();
  * 查询任务列表:getScheduledJobs('weekly_%');
  * 查询任务列表:getScheduledJobs('weekly_%');
  * 删除任务:deleteScheduledJob('monthly_manager_nav');
  * 删除任务:deleteScheduledJob('monthly_manager_nav');

+ 80 - 18
modules/task_fundPerformance.dos

@@ -73,9 +73,9 @@ def GetEntityNavTask(date) {
  *   NOTE: 1) 与Java不同的是当月indicator计算每日触发,不必等到Month-end production
  *   NOTE: 1) 与Java不同的是当月indicator计算每日触发,不必等到Month-end production
  *         2) latest performance 尝试用 dolphin 本地数据
  *         2) latest performance 尝试用 dolphin 本地数据
  *   
  *   
- *   Example: calFundPerformanceTask('HF', 2020.01.23);
- *            calFundPerformanceTask('MI', 2020.10.28);
- *            calFundPerformanceTask('FI', 2024.10.28);
+ *   Example: calFundPerformanceTask('HF', 2025.01.23);
+ *            calFundPerformanceTask('MI', 2025.01.23);
+ *            calFundPerformanceTask('FI', 2025.01.23);
  *            calFundPerformanceTask('MF', get_ini_data_const()['date']);  -- 【初始化数据专用】(100min)
  *            calFundPerformanceTask('MF', get_ini_data_const()['date']);  -- 【初始化数据专用】(100min)
  */
  */
 def calFundPerformanceTask(entityType, date) {
 def calFundPerformanceTask(entityType, date) {
@@ -158,27 +158,27 @@ def calFundPerformanceTask(entityType, date) {
 	        try {
 	        try {
 	
 	
 	            chg_columns_for_mysql(tb_fund_performance, 'fund_id');
 	            chg_columns_for_mysql(tb_fund_performance, 'fund_id');
-	            save_and_sync(tb_fund_performance, 'raw_db.fund_performance', 'raw_db.fund_performance');
+	            save_and_sync(tb_fund_performance, 'raw_db.fund_performance', 'raw_db.fund_performance', 'fund_id', 'end_date');
 	
 	
 	            chg_columns_for_mysql(tb_fund_indicator, 'fund_id');
 	            chg_columns_for_mysql(tb_fund_indicator, 'fund_id');
-	            save_and_sync(tb_fund_indicator, 'raw_db.fund_indicator', 'raw_db.fund_indicator');
+	            save_and_sync(tb_fund_indicator, 'raw_db.fund_indicator', 'raw_db.fund_indicator', 'fund_id', 'end_date');
 	
 	
 	            chg_columns_for_mysql(tb_fund_risk_stats, 'fund_id');
 	            chg_columns_for_mysql(tb_fund_risk_stats, 'fund_id');
 	            // mfdb.fund_performance 表中 maxdrawdown_6m & maxdrawdown_ytd 是虚拟列,这里用数据列顺序强行写入真实列 6m_maxdrawdown & ytd_maxdrawdown (DolphinDB 不允许字段名以数字开头)
 	            // mfdb.fund_performance 表中 maxdrawdown_6m & maxdrawdown_ytd 是虚拟列,这里用数据列顺序强行写入真实列 6m_maxdrawdown & ytd_maxdrawdown (DolphinDB 不允许字段名以数字开头)
-	            save_and_sync(tb_fund_risk_stats, 'raw_db.fund_risk_stats', 'raw_db.fund_risk_stats');
+	            save_and_sync(tb_fund_risk_stats, 'raw_db.fund_risk_stats', 'raw_db.fund_risk_stats', 'fund_id', 'end_date');
 	
 	
 	            chg_columns_for_mysql(tb_fund_riskadjret_stats, 'fund_id');
 	            chg_columns_for_mysql(tb_fund_riskadjret_stats, 'fund_id');
-	            save_and_sync(tb_fund_riskadjret_stats, 'raw_db.fund_riskadjret_stats', 'raw_db.fund_riskadjret_stats');
+	            save_and_sync(tb_fund_riskadjret_stats, 'raw_db.fund_riskadjret_stats', 'raw_db.fund_riskadjret_stats', 'fund_id', 'end_date');
 	
 	
 	            chg_columns_for_mysql(tb_fund_style_stats, 'fund_id');
 	            chg_columns_for_mysql(tb_fund_style_stats, 'fund_id');
-	            save_and_sync(tb_fund_style_stats, 'raw_db.fund_style_stats', 'raw_db.fund_style_stats');
+	            save_and_sync(tb_fund_style_stats, 'raw_db.fund_style_stats', 'raw_db.fund_style_stats', 'fund_id', 'end_date');
 	
 	
 	            chg_columns_for_mysql(tb_fund_ms_stats, 'fund_id');
 	            chg_columns_for_mysql(tb_fund_ms_stats, 'fund_id');
-	            save_and_sync(tb_fund_ms_stats, 'raw_db.fund_ms_stats', 'mfdb.fund_ms_stats'); // new table, write into directly
+	            save_and_sync(tb_fund_ms_stats, 'raw_db.fund_ms_stats', 'mfdb.fund_ms_stats', 'fund_id', 'end_date'); // new table, write into directly
 	
 	
-	            save_and_sync(tb_fund_performance_weekly, 'raw_db.fund_performance_weekly', 'raw_db.fund_performance_weekly');
+	            save_and_sync(tb_fund_performance_weekly, 'raw_db.fund_performance_weekly', 'raw_db.fund_performance_weekly', 'fund_id', 'year_week');
 	
 	
-	            save_and_sync(tb_fund_latest_performance, 'raw_db.fund_latest_performance', 'raw_db.fund_latest_nav_performance');
+	            save_and_sync(tb_fund_latest_performance, 'raw_db.fund_latest_performance', 'raw_db.fund_latest_nav_performance', 'fund_id', 'price_date');
 	
 	
 	            // 数据初始化时将指标存入本地
 	            // 数据初始化时将指标存入本地
 	            if(end_day <= get_ini_data_const()['date']) {
 	            if(end_day <= get_ini_data_const()['date']) {
@@ -210,6 +210,68 @@ def calFundPerformanceTask(entityType, date) {
 
 
 
 
 /*
 /*
+ *   [批量测试]:最新净值触发的周收益计算 (31min)
+ * 
+ *   @param entityType <STRING>: 'MF', 'HF'...
+ *   @param date <DATETIME>: 净值更新时间, 为空时缺省为当前时间-1天;为1989.01.01或更早日期时代表初始化,指标会被存入本地数据库
+ *   
+ *   
+ *   Example: CalEntityWeeklyPerformance('FI', NULL, 2000.01.01);
+ *            CalEntityWeeklyPerformance('MF', ['MF00006AGP'], 2020.01.23);
+ *
+ */
+def CalEntityWeeklyPerformance(entityType, entityIds, date) {
+
+    rt = '';
+
+    if(!(entityType IN ['MF', 'HF', 'MI', 'FI'])) return null;
+
+    if(date.isNothing() || date.isNull())
+    	end_day = temporalAdd(now(), -1d);
+    else
+    	end_day = date;
+
+    // 取有最新净值变动的基金列表 (2.5 min)
+    tb_cal_funds = get_entity_list_by_nav_updatetime(entityType, entityIds, end_day, true);
+
+    if(tb_cal_funds.isVoid() || tb_cal_funds.size() == 0 ) return;
+
+    // 分批跑
+    i = 0;
+    batch_size = 1000;
+
+    do {
+
+	    // 按照 MySQL 建好各表
+	    tb_fund_performance_weekly = create_entity_performance_weekly();
+
+        funds = tb_cal_funds[i : min(tb_cal_funds.size(), i+batch_size)];
+// funds = SELECT * FROM tb_cal_funds WHERE entity_id = 'MF00004QR9'
+        if(funds.isVoid() || funds.size() == 0) break;
+
+        // 200ms
+        fund_info = SELECT entity_id, price_date, inception_date, benchmark_id, ini_value 
+                    FROM ej(funds, get_entity_info(entityType, funds.entity_id), 'entity_id');
+
+        // 计算周收益
+        rets_w = cal_weekly_returns(entityType, fund_info);
+//select * from rets_w where entity_id = 'MF00003PW1' and year_week = '200801'
+        if(! rets_w.isVoid() && rets_w.size() > 0) {
+            generate_entity_performance_weekly(fund_info, rets_w, true, tb_fund_performance_weekly);
+
+            save_and_sync(tb_fund_performance_weekly, 'raw_db.fund_performance_weekly', 'raw_db.fund_performance_weekly', 'fund_id', 'year_week');
+	    }
+
+        i += batch_size;
+
+    } while (i <= tb_cal_funds.size());
+
+    
+    return rt;
+	
+}
+
+/*
  *   【临时】用于数据初始化:只计算收益
  *   【临时】用于数据初始化:只计算收益
  * 
  * 
  *   @param entityType <STRING>: 'MF', 'HF'...
  *   @param entityType <STRING>: 'MF', 'HF'...
@@ -299,24 +361,24 @@ def ms_calFundReturns() {
         try {
         try {
 
 
             chg_columns_for_mysql(tb_fund_performance, 'fund_id');
             chg_columns_for_mysql(tb_fund_performance, 'fund_id');
-            save_and_sync(tb_fund_performance, 'raw_db.fund_performance', 'raw_db.fund_performance');
+            save_and_sync(tb_fund_performance, 'raw_db.fund_performance', 'raw_db.fund_performance', 'fund_id', 'end_date');
 
 
             chg_columns_for_mysql(tb_fund_indicator, 'fund_id');
             chg_columns_for_mysql(tb_fund_indicator, 'fund_id');
-            save_and_sync(tb_fund_indicator, 'raw_db.fund_indicator', 'raw_db.fund_indicator');
+            save_and_sync(tb_fund_indicator, 'raw_db.fund_indicator', 'raw_db.fund_indicator', 'fund_id', 'end_date');
 
 
             chg_columns_for_mysql(tb_fund_risk_stats, 'fund_id');
             chg_columns_for_mysql(tb_fund_risk_stats, 'fund_id');
             // mfdb.fund_performance 表中 maxdrawdown_6m & maxdrawdown_ytd 是虚拟列,这里用数据列顺序强行写入真实列 6m_maxdrawdown & ytd_maxdrawdown (DolphinDB 不允许字段名以数字开头)
             // mfdb.fund_performance 表中 maxdrawdown_6m & maxdrawdown_ytd 是虚拟列,这里用数据列顺序强行写入真实列 6m_maxdrawdown & ytd_maxdrawdown (DolphinDB 不允许字段名以数字开头)
-            save_and_sync(tb_fund_risk_stats, 'raw_db.fund_risk_stats', 'raw_db.fund_risk_stats');
+            save_and_sync(tb_fund_risk_stats, 'raw_db.fund_risk_stats', 'raw_db.fund_risk_stats', 'fund_id', 'end_date');
 
 
             chg_columns_for_mysql(tb_fund_riskadjret_stats, 'fund_id');
             chg_columns_for_mysql(tb_fund_riskadjret_stats, 'fund_id');
-            save_and_sync(tb_fund_riskadjret_stats, 'raw_db.fund_riskadjret_stats', 'raw_db.fund_riskadjret_stats');
+            save_and_sync(tb_fund_riskadjret_stats, 'raw_db.fund_riskadjret_stats', 'raw_db.fund_riskadjret_stats', 'fund_id', 'end_date');
 
 
             chg_columns_for_mysql(tb_fund_style_stats, 'fund_id');
             chg_columns_for_mysql(tb_fund_style_stats, 'fund_id');
-            save_and_sync(tb_fund_style_stats, 'raw_db.fund_style_stats', 'raw_db.fund_style_stats');
+            save_and_sync(tb_fund_style_stats, 'raw_db.fund_style_stats', 'raw_db.fund_style_stats', 'fund_id', 'end_date');
 
 
-            save_and_sync(tb_fund_performance_weekly, 'raw_db.fund_performance_weekly', 'raw_db.fund_performance_weekly');
+            save_and_sync(tb_fund_performance_weekly, 'raw_db.fund_performance_weekly', 'raw_db.fund_performance_weekly', 'fund_id', 'year_week');
 
 
-            save_and_sync(tb_fund_latest_performance, 'raw_db.fund_latest_performance', 'raw_db.fund_latest_nav_performance');
+            save_and_sync(tb_fund_latest_performance, 'raw_db.fund_latest_performance', 'raw_db.fund_latest_nav_performance', 'fund_id', 'price_date');
 
 
         } catch(ex) {
         } catch(ex) {
 
 

+ 19 - 13
modules/task_monthlyPerformance.dos

@@ -205,8 +205,10 @@ def cal_and_save_mc_monthly_nav(entity_type, entity_date, is_save_local) {
         // save data to MySQL  (12 sec)
         // save data to MySQL  (12 sec)
         try {
         try {
 
 
-            tb_entity_nav.rename!('entity_id', iif(entity_type == 'PL', 'fund_manager_id', 'company_id'));
-            save_and_sync(tb_entity_nav, iif(entity_type == 'PL', 'raw_db.fund_manager_fitted_curve', 'raw_db.company_fitted_curve'), );
+        	entity_id_col = iif(entity_type == 'PL', 'fund_manager_id', 'company_id');
+
+            tb_entity_nav.rename!('entity_id', entity_id_col);
+            save_and_sync(tb_entity_nav, iif(entity_type == 'PL', 'raw_db.fund_manager_fitted_curve', 'raw_db.company_fitted_curve'), , entity_id_col, 'end_date');
 
 
             // 数据初始化时将指标存入本地
             // 数据初始化时将指标存入本地
             if(is_save_local == true) {
             if(is_save_local == true) {
@@ -280,21 +282,23 @@ def cal_and_save_mc_indicator(entity_type, entity_date, monthly_returns, indicat
 	
 	
 	        // save data to MySQL  (13s)
 	        // save data to MySQL  (13s)
 	        try {
 	        try {
-	
-	            chg_columns_for_mysql(tb_mc_performance, iif(entity_type == 'PL', 'manager_id', 'company_id'));
-	            save_and_sync(tb_mc_performance, iif(entity_type == 'PL', 'raw_db.manager_performance', 'raw_db.company_performance'), );
+
+				entity_id_col = iif(entity_type == 'PL', 'manager_id', 'company_id');
+
+	            chg_columns_for_mysql(tb_mc_performance, entity_id_col);
+	            save_and_sync(tb_mc_performance, iif(entity_type == 'PL', 'raw_db.manager_performance', 'raw_db.company_performance'), , entity_id_col, 'end_date');
 	
 	
 	            chg_columns_for_mysql(tb_mc_indicator, iif(entity_type == 'PL', 'manager_id', 'company_id'));
 	            chg_columns_for_mysql(tb_mc_indicator, iif(entity_type == 'PL', 'manager_id', 'company_id'));
-	            save_and_sync(tb_mc_indicator, iif(entity_type == 'PL', 'raw_db.manager_indicator', 'raw_db.company_indicator'), );
+	            save_and_sync(tb_mc_indicator, iif(entity_type == 'PL', 'raw_db.manager_indicator', 'raw_db.company_indicator'), , entity_id_col, 'end_date');
 	
 	
 	            chg_columns_for_mysql(tb_mc_risk_stats, iif(entity_type == 'PL', 'manager_id', 'company_id'));
 	            chg_columns_for_mysql(tb_mc_risk_stats, iif(entity_type == 'PL', 'manager_id', 'company_id'));
-	            save_and_sync(tb_mc_risk_stats, iif(entity_type == 'PL', 'raw_db.manager_risk_stats', 'raw_db.company_risk_stats'), );
+	            save_and_sync(tb_mc_risk_stats, iif(entity_type == 'PL', 'raw_db.manager_risk_stats', 'raw_db.company_risk_stats'), , entity_id_col, 'end_date');
 	
 	
 	            chg_columns_for_mysql(tb_mc_riskadjret_stats, iif(entity_type == 'PL', 'manager_id', 'company_id'));
 	            chg_columns_for_mysql(tb_mc_riskadjret_stats, iif(entity_type == 'PL', 'manager_id', 'company_id'));
-	            save_and_sync(tb_mc_riskadjret_stats, iif(entity_type == 'PL', 'raw_db.manager_riskadjret_stats', 'raw_db.company_riskadjret_stats'), );
+	            save_and_sync(tb_mc_riskadjret_stats, iif(entity_type == 'PL', 'raw_db.manager_riskadjret_stats', 'raw_db.company_riskadjret_stats'), , entity_id_col, 'end_date');
 	
 	
 	            chg_columns_for_mysql(tb_mc_style_stats, iif(entity_type == 'PL', 'manager_id', 'company_id'));
 	            chg_columns_for_mysql(tb_mc_style_stats, iif(entity_type == 'PL', 'manager_id', 'company_id'));
-	            save_and_sync(tb_mc_style_stats, iif(entity_type == 'PL', 'raw_db.manager_style_stats', 'raw_db.company_style_stats'), );
+	            save_and_sync(tb_mc_style_stats, iif(entity_type == 'PL', 'raw_db.manager_style_stats', 'raw_db.company_style_stats'), , entity_id_col, 'end_date');
 	
 	
 	  
 	  
 	            // 数据初始化时将指标存入本地
 	            // 数据初始化时将指标存入本地
@@ -326,9 +330,11 @@ def cal_and_save_mc_indicator(entity_type, entity_date, monthly_returns, indicat
 	
 	
 	        // save data to MySQL
 	        // save data to MySQL
 	        try {
 	        try {
+
+	        	entity_id_col = iif(entity_type == 'PL', 'manager_id', 'company_id');
 	
 	
-	            chg_columns_for_mysql(tb_mc_bfi_indicator, iif(entity_type == 'PL', 'manager_id', 'company_id'));
-	            save_and_sync(tb_mc_bfi_indicator, iif(entity_type == 'PL', 'raw_db.manager_ty_bfi_bm_indicator', 'raw_db.company_ty_bfi_bm_indicator'), );
+	            chg_columns_for_mysql(tb_mc_bfi_indicator, entity_id_col);
+	            save_and_sync(tb_mc_bfi_indicator, iif(entity_type == 'PL', 'raw_db.manager_ty_bfi_bm_indicator', 'raw_db.company_ty_bfi_bm_indicator'), , entity_id_col, 'end_date');
 	
 	
 	            // 数据初始化时将指标存入本地
 	            // 数据初始化时将指标存入本地
 	            if(is_save_local == true)
 	            if(is_save_local == true)
@@ -508,11 +514,11 @@ def MatchManagerBFITask(updatetime) {
 
 
 			// 有效 factors 存到 xxx_factor_bfi_by_category_group 表
 			// 有效 factors 存到 xxx_factor_bfi_by_category_group 表
 			chg_columns_for_mysql(t_bfi, 'manager_id');
 			chg_columns_for_mysql(t_bfi, 'manager_id');
-			save_and_sync(t_bfi, 'raw_db.pf_manager_factor_bfi_by_category_group', );
+			save_and_sync(t_bfi, 'raw_db.pf_manager_factor_bfi_by_category_group', , 'manager_id', 'end_date');
 
 
 			// 有效因子中 R2 最大的因子存 xxx_max_r2 
 			// 有效因子中 R2 最大的因子存 xxx_max_r2 
 			chg_columns_for_mysql(t_max_r2, 'manager_id');
 			chg_columns_for_mysql(t_max_r2, 'manager_id');
-			save_and_sync(t_max_r2, 'raw_db.pf_manager_factor_bfi_max_r2', );
+			save_and_sync(t_max_r2, 'raw_db.pf_manager_factor_bfi_max_r2', , 'manager_id', 'end_date');
 			
 			
 		} catch (ex) {
 		} catch (ex) {
             //TODO: Log errors
             //TODO: Log errors

+ 12 - 12
modules/task_portfolioPerformance.dos

@@ -127,7 +127,7 @@ def cal_and_save_portfolio_nav(cal_portfolio_info, is_save_local) {
 	        try {
 	        try {
 	
 	
 	            tb_portfolio_nav.rename!('entity_id', 'portfolio_id');
 	            tb_portfolio_nav.rename!('entity_id', 'portfolio_id');
-	            save_and_sync(tb_portfolio_nav, 'raw_db.pf_portfolio_nav', 'raw_db.pf_portfolio_nav');
+	            save_and_sync(tb_portfolio_nav, 'raw_db.pf_portfolio_nav', 'raw_db.pf_portfolio_nav', 'portfolio_id', 'price_date');
 	
 	
 	            // 数据初始化时将指标存入本地
 	            // 数据初始化时将指标存入本地
 	            if(is_save_local == true) {
 	            if(is_save_local == true) {
@@ -245,37 +245,37 @@ def cal_and_save_entity_indicators(entity_type, cal_entity_info, is_save_local)
 				des = get_performance_table_description(entity_type)[0];
 				des = get_performance_table_description(entity_type)[0];
 	            chg_columns_for_mysql(tb_entity_performance, des.sec_id_col);
 	            chg_columns_for_mysql(tb_entity_performance, des.sec_id_col);
 	            tb_entity_performance.rename!('cumulative_nav', des.cumulative_nav_col);
 	            tb_entity_performance.rename!('cumulative_nav', des.cumulative_nav_col);
-	            save_and_sync(tb_entity_performance, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), );
+	            save_and_sync(tb_entity_performance, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), , des.sec_id_col, 'end_date');
 	            if(is_save_local == true) save_table(tb_entity_performance, des.table_name, false);
 	            if(is_save_local == true) save_table(tb_entity_performance, des.table_name, false);
 	
 	
 				des = get_indicator_table_description(entity_type)[0];
 				des = get_indicator_table_description(entity_type)[0];
 	            chg_columns_for_mysql(tb_entity_indicator, des.sec_id_col);
 	            chg_columns_for_mysql(tb_entity_indicator, des.sec_id_col);
-	            save_and_sync(tb_entity_indicator, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), );
+	            save_and_sync(tb_entity_indicator, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), , des.sec_id_col, 'end_date');
 	            if(is_save_local == true) save_table(tb_entity_indicator, des.table_name, false);
 	            if(is_save_local == true) save_table(tb_entity_indicator, des.table_name, false);
 	
 	
 				des = get_risk_stats_table_description(entity_type)[0];
 				des = get_risk_stats_table_description(entity_type)[0];
 	            chg_columns_for_mysql(tb_entity_risk_stats, des.sec_id_col);
 	            chg_columns_for_mysql(tb_entity_risk_stats, des.sec_id_col);
-	            save_and_sync(tb_entity_risk_stats, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), );
+	            save_and_sync(tb_entity_risk_stats, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), , des.sec_id_col, 'end_date');
 	            if(is_save_local == true) save_table(tb_entity_risk_stats, des.table_name, false);
 	            if(is_save_local == true) save_table(tb_entity_risk_stats, des.table_name, false);
 	
 	
 				des = get_riskadjret_stats_table_description(entity_type)[0];
 				des = get_riskadjret_stats_table_description(entity_type)[0];
 	            chg_columns_for_mysql(tb_entity_riskadjret_stats, des.sec_id_col);
 	            chg_columns_for_mysql(tb_entity_riskadjret_stats, des.sec_id_col);
-	            save_and_sync(tb_entity_riskadjret_stats, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), );
+	            save_and_sync(tb_entity_riskadjret_stats, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), , des.sec_id_col, 'end_date');
 	            if(is_save_local == true) save_table(tb_entity_riskadjret_stats, des.table_name, false);
 	            if(is_save_local == true) save_table(tb_entity_riskadjret_stats, des.table_name, false);
 	
 	
 				des = get_capture_style_table_description(entity_type)[0];
 				des = get_capture_style_table_description(entity_type)[0];
 	            chg_columns_for_mysql(tb_entity_style_stats, des.sec_id_col);
 	            chg_columns_for_mysql(tb_entity_style_stats, des.sec_id_col);
-	            save_and_sync(tb_entity_style_stats, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), );
+	            save_and_sync(tb_entity_style_stats, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), , des.sec_id_col, 'end_date');
 	            if(is_save_local == true) save_table(tb_entity_style_stats, des.table_name, false);
 	            if(is_save_local == true) save_table(tb_entity_style_stats, des.table_name, false);
 	
 	
 				des = get_performance_weekly_table_description(entity_type)[0];
 				des = get_performance_weekly_table_description(entity_type)[0];
 				tb_entity_performance_weekly.rename!('cumulative_nav', des.cumulative_nav_col);
 				tb_entity_performance_weekly.rename!('cumulative_nav', des.cumulative_nav_col);
-	            save_and_sync(tb_entity_performance_weekly, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), );
+	            save_and_sync(tb_entity_performance_weekly, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), , des.sec_id_col, 'year_week');
 	            if(is_save_local == true) save_table(tb_entity_performance_weekly, des.table_name, false);
 	            if(is_save_local == true) save_table(tb_entity_performance_weekly, des.table_name, false);
 	
 	
 				des = get_latest_performance_table_description(entity_type)[0];
 				des = get_latest_performance_table_description(entity_type)[0];
 				tb_entity_latest_performance.rename!('cumulative_nav', des.cumulative_nav_col);
 				tb_entity_latest_performance.rename!('cumulative_nav', des.cumulative_nav_col);
-	            save_and_sync(tb_entity_latest_performance, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), );
+	            save_and_sync(tb_entity_latest_performance, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), , des.sec_id_col, 'price_date');
 	            if(is_save_local == true) save_table(tb_entity_latest_performance, des.table_name, false);
 	            if(is_save_local == true) save_table(tb_entity_latest_performance, des.table_name, false);
 	
 	
 	
 	
@@ -299,7 +299,7 @@ def cal_and_save_entity_indicators(entity_type, cal_entity_info, is_save_local)
  *   @param updatetime <DATETIME>: 持仓证券净值更新时间,忽略或传入1989.01.01及更早的日期被认为在做数据初始化
  *   @param updatetime <DATETIME>: 持仓证券净值更新时间,忽略或传入1989.01.01及更早的日期被认为在做数据初始化
  * 
  * 
  * 
  * 
- *   Example: CalPortfolioPerformanceTask(2024.10.28);
+ *   Example: CalPortfolioPerformanceTask(2025.01.23);
  *            CalPortfolioPerformanceTask(1989.01.01);  -- 【初始化专用】 (45min)
  *            CalPortfolioPerformanceTask(1989.01.01);  -- 【初始化专用】 (45min)
  */
  */
 def CalPortfolioPerformanceTask(updatetime) {
 def CalPortfolioPerformanceTask(updatetime) {
@@ -367,7 +367,7 @@ def cal_and_save_factor_nav(cal_factor_info, is_save_local) {
 	}
 	}
 
 
 	if(! t_factor_value.isVoid() && t_factor_value.size() > 0) {
 	if(! t_factor_value.isVoid() && t_factor_value.size() > 0) {
-		save_and_sync(t_factor_value, 'raw_db.cm_factor_value', 'raw_db.cm_factor_value');
+		save_and_sync(t_factor_value, 'raw_db.cm_factor_value', 'raw_db.cm_factor_value', 'factor_id', 'price_date');
 
 
 		if(is_save_local == true) {
 		if(is_save_local == true) {
 			save_table(t_factor_value, 'pfdb.cm_factor_value', false);
 			save_table(t_factor_value, 'pfdb.cm_factor_value', false);
@@ -501,7 +501,7 @@ def cal_bond_factor_ret(updatetime) {
  *            CalFactorPerformanceTask(1989.01.01);  -- 【初始化专用】 (1.3 min)
  *            CalFactorPerformanceTask(1989.01.01);  -- 【初始化专用】 (1.3 min)
  */
  */
 def CalFactorPerformanceTask(updatetime) {
 def CalFactorPerformanceTask(updatetime) {
-//updatetime=2024.10.28
+//updatetime=2025.01.23
 	rt = '';
 	rt = '';
     is_save_local = iif(updatetime <= get_ini_data_const()['date'], true, false);
     is_save_local = iif(updatetime <= get_ini_data_const()['date'], true, false);
 
 
@@ -560,7 +560,7 @@ def CalCategoryAverageNavTask(updatetime) {
 			t_index_value = SELECT entity_id AS index_id, price_date, nav AS index_value, incl_cal_count AS incl_cal_fund_count, total_cnt AS total_fund_count
 			t_index_value = SELECT entity_id AS index_id, price_date, nav AS index_value, incl_cal_count AS incl_cal_fund_count, total_cnt AS total_fund_count
 			                FROM ej(t_tmp, t_ret, ['entity_id', 'price_date']);
 			                FROM ej(t_tmp, t_ret, ['entity_id', 'price_date']);
 
 
-			save_and_sync(t_index_value, 'raw_db.indexes_ty_index', );
+			save_and_sync(t_index_value, 'raw_db.indexes_ty_index', ,'index_id', 'price_date');
 	
 	
 			if(is_save_local == true) {
 			if(is_save_local == true) {
 
 

+ 24 - 21
modules/task_weeklyPerformance.dos

@@ -20,8 +20,8 @@ use fundit::dataSaver;
  *         portfolio 未测试
  *         portfolio 未测试
  * 
  * 
  *   Example: CalEntityRBSATask('MF', ['MF00003PW1'], 2024.10.14T10:00:00);
  *   Example: CalEntityRBSATask('MF', ['MF00003PW1'], 2024.10.14T10:00:00);
- *            CalEntityRBSATask('MF', NULL, 2025.01.10);
- *            CalEntityRBSATask('PF', NULL, 2025.01.10);
+ *            CalEntityRBSATask('MF', NULL, 2025.01.23);
+ *            CalEntityRBSATask('PF', NULL, 2025.01.23);
  */
  */
 def CalEntityRBSATask(entityType, entityIds, updateTime) {
 def CalEntityRBSATask(entityType, entityIds, updateTime) {
 // entityType = 'PF'
 // entityType = 'PF'
@@ -87,9 +87,9 @@ def CalEntityRBSATask(entityType, entityIds, updateTime) {
 		}
 		}
 
 
 		if(entityType IN ['MF', 'HF'])
 		if(entityType IN ['MF', 'HF'])
-			save_and_sync(tb_result, 'raw_db.pf_fund_rbsa_breakdown', 'raw_db.pf_fund_rbsa_breakdown');
+			save_and_sync(tb_result, 'raw_db.pf_fund_rbsa_breakdown', 'raw_db.pf_fund_rbsa_breakdown', 'fund_id', 'effective_date');
 		else
 		else
-			save_and_sync(tb_result, 'raw_db.pf_portfolio_rbsa_breakdown', 'raw_db.pf_portfolio_rbsa_breakdown');
+			save_and_sync(tb_result, 'raw_db.pf_portfolio_rbsa_breakdown', 'raw_db.pf_portfolio_rbsa_breakdown', 'portfolio_id', 'effective_date');
 
 
 		i += batch_size;
 		i += batch_size;
 
 
@@ -110,7 +110,7 @@ def CalEntityRBSATask(entityType, entityIds, updateTime) {
  */
  */
 def MatchEntityBFITask(entityType, date) {
 def MatchEntityBFITask(entityType, date) {
 //entityType = 'MF'
 //entityType = 'MF'
-//date = 2024.12.01
+//date = 2025.01.23
 
 
 	rt = '';
 	rt = '';
 
 
@@ -160,11 +160,12 @@ def MatchEntityBFITask(entityType, date) {
 			chg_columns_for_mysql(t_coe, iif(entityType == 'PF', 'portfolio_id', 'fund_id'));
 			chg_columns_for_mysql(t_coe, iif(entityType == 'PF', 'portfolio_id', 'fund_id'));
 
 
 			// 只有基金需要存 index_coe 表
 			// 只有基金需要存 index_coe 表
-		    if(entityType IN ['MF', 'HF']) save_and_sync(t_coe, 'raw_db.pf_fund_index_coe', );
+		    if(entityType IN ['MF', 'HF']) save_and_sync(t_coe, 'raw_db.pf_fund_index_coe', , 'fund_id', 'end_date');
 
 
 			// 所有的 factors 存到 xxx_factor_bfi 表;NOTE: Java 把所有 factor 的数据都存起来,这里只存 correlation 达标的记录 (反正这个表没啥用?)
 			// 所有的 factors 存到 xxx_factor_bfi 表;NOTE: Java 把所有 factor 的数据都存起来,这里只存 correlation 达标的记录 (反正这个表没啥用?)
-			chg_columns_for_mysql(t_bfi_candidates, iif(entityType == 'PF', 'portfolio_id', 'fund_id'));
-	        save_and_sync(t_bfi_candidates, iif(entityType == 'PF', 'raw_db.pf_portfolio_factor_bfi', 'raw_db.cm_fund_factor_bfi'), );
+			entity_id_col = iif(entityType == 'PF', 'portfolio_id', 'fund_id');
+			chg_columns_for_mysql(t_bfi_candidates, entity_id_col);
+	        save_and_sync(t_bfi_candidates, iif(entityType == 'PF', 'raw_db.pf_portfolio_factor_bfi', 'raw_db.cm_fund_factor_bfi'), , entity_id_col, 'end_date');
 
 
 			if(bfi_raw.isVoid() || bfi_raw.size() == 0) continue;
 			if(bfi_raw.isVoid() || bfi_raw.size() == 0) continue;
 
 
@@ -179,12 +180,12 @@ def MatchEntityBFITask(entityType, date) {
 			           GROUP BY entity_id, end_date;
 			           GROUP BY entity_id, end_date;
 
 
 			// 有效 factors 存到 xxx_factor_bfi_by_category_group 表
 			// 有效 factors 存到 xxx_factor_bfi_by_category_group 表
-			chg_columns_for_mysql(t_bfi, iif(entityType == 'PF', 'portfolio_id', 'fund_id'));
-			save_and_sync(t_bfi, iif(entityType == 'PF', 'raw_db.pf_portfolio_factor_bfi_by_category_group', 'raw_db.pf_fund_factor_bfi_by_category_group'), );
+			chg_columns_for_mysql(t_bfi, entity_id_col);
+			save_and_sync(t_bfi, iif(entityType == 'PF', 'raw_db.pf_portfolio_factor_bfi_by_category_group', 'raw_db.pf_fund_factor_bfi_by_category_group'), , entity_id_col, 'end_date');
 
 
 			// 有效因子中 R2 最大的因子存 xxx_max_r2 
 			// 有效因子中 R2 最大的因子存 xxx_max_r2 
-			chg_columns_for_mysql(t_max_r2, iif(entityType == 'PF', 'portfolio_id', 'fund_id'));
-			save_and_sync(t_max_r2, iif(entityType == 'PF', 'raw_db.pf_portfolio_factor_bfi_max_r2', 'raw_db.pf_fund_factor_bfi_by_category_group_max_r2'), );
+			chg_columns_for_mysql(t_max_r2, entity_id_col);
+			save_and_sync(t_max_r2, iif(entityType == 'PF', 'raw_db.pf_portfolio_factor_bfi_max_r2', 'raw_db.pf_fund_factor_bfi_by_category_group_max_r2'), , entity_id_col, 'end_date');
 
 
 
 
 		} catch (ex) {
 		} catch (ex) {
@@ -284,7 +285,7 @@ def calEntityBfiIndicatorTask(entityType, date) {
 		    
 		    
             chg_columns_for_mysql(tb_bfi_indicator, t_desc.sec_id_col[0]);
             chg_columns_for_mysql(tb_bfi_indicator, t_desc.sec_id_col[0]);
             db_name = t_desc.table_name[0].split('.')[0];
             db_name = t_desc.table_name[0].split('.')[0];
-            save_and_sync(tb_bfi_indicator, t_desc.table_name[0].strReplace(db_name, 'raw_db'), t_desc.table_name[0].strReplace(db_name, 'raw_db'));
+            save_and_sync(tb_bfi_indicator, t_desc.table_name[0].strReplace(db_name, 'raw_db'), t_desc.table_name[0].strReplace(db_name, 'raw_db'), t_desc.sec_id_col[0], 'end_date');
 
 
             // 数据初始化时将指标存入本地,做排名之用
             // 数据初始化时将指标存入本地,做排名之用
             if(end_day <= get_ini_data_const()['date'])
             if(end_day <= get_ini_data_const()['date'])
@@ -367,8 +368,10 @@ def cal_and_save_mc_weekly_nav(entity_type, entity_date, is_save_local) {
         // save data to MySQL  (12 sec)
         // save data to MySQL  (12 sec)
         try {
         try {
 
 
-            tb_entity_nav.rename!('entity_id', iif(entity_type == 'PL', 'manager_id', 'company_id'));
-            save_and_sync(tb_entity_nav, iif(entity_type == 'PL', 'raw_db.manager_nav', 'raw_db.company_nav'), );
+			entity_id_col = iif(entity_type == 'PL', 'manager_id', 'company_id');
+
+            tb_entity_nav.rename!('entity_id', entity_id_col);
+            save_and_sync(tb_entity_nav, iif(entity_type == 'PL', 'raw_db.manager_nav', 'raw_db.company_nav'), , entity_id_col, 'year_week');
 
 
             // 数据初始化时将指标存入本地
             // 数据初始化时将指标存入本地
             if(is_save_local == true) {
             if(is_save_local == true) {
@@ -390,17 +393,17 @@ def cal_and_save_mc_weekly_nav(entity_type, entity_date, is_save_local) {
 /*
 /*
  *  [定时任务]: 基金经理/公司周净值计算
  *  [定时任务]: 基金经理/公司周净值计算
  * 
  * 
- *   Example: CalMCWeeklyNavTask('CO', 2024.11.04);
+ *   Example: CalMCWeeklyNavTask('PL', 2025.01.23);
  */
  */
-def CalMCWeeklyNavTask(entity_type, updatetime) {
+def CalMCWeeklyNavTask(entityType, updatetime) {
 //updatetime = 2024.11.05;
 //updatetime = 2024.11.05;
-//entity_type = 'PL';
-	if(!(entity_type IN ['PL', 'CO'])) return;
+//entityType = 'CO';
+	if(!(entityType IN ['PL', 'CO'])) return;
 
 
 	is_save_local = iif(updatetime <= get_ini_data_const()['updatetime'], true, false);
 	is_save_local = iif(updatetime <= get_ini_data_const()['updatetime'], true, false);
 
 
 	// 60 sec  简化起见,不区分curve_type, strategy; TODO: 性能能否优化?
 	// 60 sec  简化起见,不区分curve_type, strategy; TODO: 性能能否优化?
-	if(entity_type == 'PL') {
+	if(entityType == 'PL') {
 		entity_date = get_manager_list_by_fund_updatetime(updatetime, 'w');
 		entity_date = get_manager_list_by_fund_updatetime(updatetime, 'w');
 		entity_date.rename!('manager_id', 'entity_id');
 		entity_date.rename!('manager_id', 'entity_id');
 	}
 	}
@@ -410,7 +413,7 @@ def CalMCWeeklyNavTask(entity_type, updatetime) {
 	}
 	}
 	
 	
 	// 15 sec
 	// 15 sec
-	cal_and_save_mc_weekly_nav(entity_type, entity_date, is_save_local);
+	cal_and_save_mc_weekly_nav(entityType, entity_date, is_save_local);
 
 
 	entity_date = null;
 	entity_date = null;