module fundit::task_monthlyPerformance use fundit::sqlUtilities; use fundit::operationDataPuller; use fundit::performanceDataPuller; use fundit::indicatorCalculator; use fundit::dataSaver; use fundit::bfiMatcher; use fundit::rankingCalculator; use fundit::navCalculator; /* * [定时任务] 计算基金一、二级分类排名并存入数据库 * * @param entity_type : 'MF', 'HF' (MF=HF) * @param end_date : * @param isFromMySQL : false 时读取dolphin本地的收益及指标表,用于初始化数据 * * NOTE: 与 MySQL 不同,这里的 rank 最小值是0,不是1 * * Example: CalEntityRankingTask('MF', 2024.09M, true); * CalEntityRankingTask('CO', 2024.10M, true); */ def CalEntityRankingTask(entityType, endDate, isFromMySQL=true) { //entityType='MF' //endDate = 2024.11M //isFromMySQL = true if(!(entityType in ['MF', 'HF', 'PL', 'CO'])) return NULL; entity_info = get_entity_info(entityType, NULL); v_ranking_tables = cal_indicator_ranking('strategy', entityType, entity_info, endDate, isFromMySQL); save_ranking_tables(entityType, 'strategy', v_ranking_tables); } /* * [定时任务] 计算基金BFI排名并存入数据库 * * @param entityType : 'MF', 'HF' (MF=HF), 'PL' * @param endDate : * @param isFromMySQL : false 时读取dolphin本地的收益及指标表,用于初始化数据 * * * Example: CalEntityBfiRankingTask('MF', 2024.09M, true); * CalEntityBfiRankingTask('PL', 2024.10M, true); */ def CalEntityBfiRankingTask(entityType, endDate, isFromMySQL=true) { if(!(entityType in ['MF', 'HF', 'PL'])) return NULL; entity_info = get_entity_info(entityType, NULL); // 16 sec v_ranking_tables = cal_indicator_ranking('bfi', entityType, entity_info, endDate, isFromMySQL); // 39 sec save_ranking_tables(entityType, 'bfi', v_ranking_tables); } /* * Private Method: 计算相对排名并存入数据库 * * */ def cal_and_save_relative_ranking(entity_type, benchmark_ranking, entity_ranking, ranking_by, isFromMySQL=true) { // benchmark_ranking= tb_fund_ranking t_entity_ranking = entity_ranking; cal_relative_ranking(benchmark_ranking, t_entity_ranking, isFromMySQL); t_entity_ranking.rename!('category_id', iif(ranking_by=='bfi', 'factor_id', ranking_by)); save_relative_ranking_table(entity_type, t_entity_ranking, ranking_by); } /* * * [定时任务] 以公募基金为评级参考,计算组合、私有基金收益及指标排名 * * @param entityType : PF * * TODO: customer fund * TODO: 计算单个组合时总耗时1.5min, 大部分时间用来获取Mysql数据 * * Example: CalRelativeRankingTask('PF', NULL, 2024.09M, true); * CalRelativeRankingTask('PF', 143109, 2024.09M, true); */ def CalRelativeRankingTask(entity_type, entity_ids, end_date, isFromMySQL=true) { // entity_type = 'PF' // end_date = 2024.09M // isFromMySQL = true // ranking_by = 'bfi' // entity_ids = 143109 entity_info = get_entity_info(entity_type, entity_ids); if(entity_type == 'PF') entity_info = SELECT * FROM entity_info WHERE portfolio_type IN (1, 2) // 1: 用户组合、2:客户真实组合,忽略客户推荐组合、总览综合等虚拟组合 v_ranking_by = ['strategy', 'substrategy', 'bfi']; // 暂时以公募混合基金为排名参考 for(ranking_by in v_ranking_by) { if(ranking_by == 'strategy') { v_category = EXEC DISTINCT strategy FROM entity_info WHERE strategy IS NOT NULL; tb_fund_ranking = get_fund_indicator_ranking(NULL, end_date, v_category, isFromMySQL); UPDATE tb_fund_ranking SET category_id = strategy$STRING; } else if(ranking_by == 'substrategy') { v_category = EXEC DISTINCT substrategy FROM entity_info WHERE substrategy IS NOT NULL; tb_fund_ranking = get_fund_indicator_substrategy_ranking(NULL, end_date, v_category, isFromMySQL); UPDATE tb_fund_ranking SET category_id = substrategy$STRING; } else if(ranking_by == 'bfi') { if(entity_ids != NULL || entity_ids != '') v_category = EXEC DISTINCT factor_id FROM get_entity_bfi_factors(entity_type, entity_ids, end_date, end_date); else v_category = NULL; tb_fund_ranking = get_fund_bfi_bm_indicator_ranking(NULL, end_date, v_category, isFromMySQL); UPDATE tb_fund_ranking SET category_id = factor_id; } if(tb_fund_ranking.isVoid() || tb_fund_ranking.size() == 0) return; entity_ranking = transform_data_for_ranking(entity_type, entity_info, end_date, ranking_by, isFromMySQL); cal_and_save_relative_ranking(entity_type, tb_fund_ranking, entity_ranking, ranking_by, isFromMySQL); } } /* * 计算并存储基金经理和公司月度净值 * * @return : [COLUMNS] entity_id, curve_type, strategy, effective_date, price_date, ret, nav * */ def cal_and_save_mc_monthly_nav(entity_type, entity_date, is_save_local) { rt = ''; if( !(entity_type in ['PL', 'CO']) ) return rt; if(entity_date.isVoid() || entity_date.size() == 0) return rt; // 准备类似MySQL结构的数据表 tb_entity_nav = create_mc_fitted_curve(); // 暂时与 MySQL 保持一致,只计算公募,私募,公私募综合三条时间序列。未来可细化至公、私募+主策略 d_curve_type = dict(INT, INT); d_curve_type[1] = 1; // 私募 d_curve_type[4] = 2; // 公募 d_curve_type[7] = -99; // 公私募综合 // 分批跑 i = 0; batch_size = 1000; all_entity_id = entity_date.entity_id.distinct(); do { // 14 sec tb_entity = SELECT entity_id, effective_date FROM entity_date WHERE entity_id IN all_entity_id[i : min(all_entity_id.size(), i+batch_size)]; if(tb_entity.isVoid() || tb_entity.size() == 0) break; s_json = tb_entity.toStdJson(); t_ret = get_mc_average_return(entity_type, 'm', s_json, 0, 1, true); for(cur in d_curve_type.keys()) { tmp = SELECT entity_id, cur AS curve_type, 0 AS strategy, effective_date, price_date, ret, incl_cal_cnt FROM t_ret WHERE raise_type = d_curve_type[cur] AND strategy = -99; // 目前只需要全策略 // 计算月收益 tb_nav = cal_mc_nav_by_return(entity_type, tmp, 'm'); INSERT INTO tb_entity_nav SELECT entity_id, curve_type, strategy, effective_date AS end_date, nav, incl_cal_cnt FROM ej(tb_nav, tmp, ['entity_id', 'curve_type', 'strategy', 'effective_date']); } i += batch_size; } while (i <= all_entity_id.size()); if(! tb_entity_nav.isVoid() && tb_entity_nav.size() > 0) { // save data to MySQL (12 sec) try { tb_entity_nav.rename!('entity_id', iif(entity_type == 'PL', 'fund_manager_id', 'company_id')); save_and_sync(tb_entity_nav, iif(entity_type == 'PL', 'raw_db.fund_manager_fitted_curve', 'raw_db.company_fitted_curve'), ); // 数据初始化时将指标存入本地 if(is_save_local == true) { save_table(tb_entity_nav, iif(entity_type == 'PL', 'mfdb.fund_manager_fitted_curve', 'mfdb.company_fitted_curve'), false); } } catch(ex) { //TODO: Log errors rt += ex; } } return rt; } /* * 计算并存储基金经理/公司的月度收益及指标 * * @param entity_date
: entity_id, curve_type, strategy, price_date * @param monthly_returns
: entity_id, curve_type, strategy, end_date, price_date, nav, ret * @param indicator_type : PBI, BFI * */ def cal_and_save_mc_indicator(entity_type, entity_date, monthly_returns, indicator_type, is_save_local) { rt = ''; if(!(entity_type IN ['PL', 'CO'] && indicator_type IN ['PBI', 'BFI'])) return rt; if(entity_date.isVoid() || entity_date.size() == 0) return rt; d_indicators = cal_mc_monthly_indicators(entity_type, indicator_type, monthly_returns); if(d_indicators.isVoid() || d_indicators["7"].isVoid()) break; // cal_mc_monthly_indicators 返回个两重字典,分别对应 curve_type 和不同区间的数据表,将同样区间的数据表(但不同curve_type)合并 // curve_type: 1:私募,4:公募,7:公私募综合 trailing_num = d_indicators["7"].keys().size(); for(k in d_indicators["7"].keys()) { if(!d_indicators["1"].isVoid()) d_indicators["7"][k].append!(d_indicators["1"][k]); if(!d_indicators["4"].isVoid()) d_indicators["7"][k].append!(d_indicators["4"][k]); } indicators = d_indicators["7"]; d_indicators = null; // // 按照 MySQL 建好各表 if(indicator_type == 'PBI') { entity_info = SELECT entity_id, curve_type, strategy, price_date.temporalParse('yyyy-MM') AS price_date FROM entity_date; tb_mc_performance = create_mc_performance(); tb_mc_indicator = create_mc_indicator(); tb_mc_risk_stats = create_mc_risk_stats(); tb_mc_riskadjret_stats = create_mc_riskadjret_stats(); tb_mc_style_stats = create_mc_style_stats(); // 写入数据 generate_entity_performance(entity_info, indicators, true, tb_mc_performance, ['curve_type', 'strategy']); generate_entity_indicator(entity_info, indicators, true, tb_mc_indicator, ['curve_type', 'strategy']); generate_entity_risk_stats(entity_info, indicators, true, tb_mc_risk_stats, ['curve_type', 'strategy']); generate_entity_riskadjret_stats(entity_info, indicators, true, tb_mc_riskadjret_stats, ['curve_type', 'strategy']); generate_entity_style_stats(entity_info, indicators, true, tb_mc_style_stats, ['curve_type', 'strategy']); if(! tb_mc_performance.isVoid() && tb_mc_performance.size() > 0) { // save data to MySQL (13s) try { chg_columns_for_mysql(tb_mc_performance, iif(entity_type == 'PL', 'manager_id', 'company_id')); save_and_sync(tb_mc_performance, iif(entity_type == 'PL', 'raw_db.manager_performance', 'raw_db.company_performance'), ); chg_columns_for_mysql(tb_mc_indicator, iif(entity_type == 'PL', 'manager_id', 'company_id')); save_and_sync(tb_mc_indicator, iif(entity_type == 'PL', 'raw_db.manager_indicator', 'raw_db.company_indicator'), ); chg_columns_for_mysql(tb_mc_risk_stats, iif(entity_type == 'PL', 'manager_id', 'company_id')); save_and_sync(tb_mc_risk_stats, iif(entity_type == 'PL', 'raw_db.manager_risk_stats', 'raw_db.company_risk_stats'), ); chg_columns_for_mysql(tb_mc_riskadjret_stats, iif(entity_type == 'PL', 'manager_id', 'company_id')); save_and_sync(tb_mc_riskadjret_stats, iif(entity_type == 'PL', 'raw_db.manager_riskadjret_stats', 'raw_db.company_riskadjret_stats'), ); chg_columns_for_mysql(tb_mc_style_stats, iif(entity_type == 'PL', 'manager_id', 'company_id')); save_and_sync(tb_mc_style_stats, iif(entity_type == 'PL', 'raw_db.manager_style_stats', 'raw_db.company_style_stats'), ); // 数据初始化时将指标存入本地 if(is_save_local == true) { save_table(tb_mc_performance, iif(entity_type == 'PL', 'mfdb.manager_performance', 'mfdb.company_performance'), false); save_table(tb_mc_indicator, iif(entity_type == 'PL', 'mfdb.manager_indicator', 'mfdb.company_indicator'), false); save_table(tb_mc_risk_stats, iif(entity_type == 'PL', 'mfdb.manager_risk_stats', 'mfdb.company_risk_stats'), false); save_table(tb_mc_riskadjret_stats, iif(entity_type == 'PL', 'mfdb.manager_riskadjret_stats', 'mfdb.company_riskadjret_stats'), false); save_table(tb_mc_style_stats, iif(entity_type == 'PL', 'mfdb.manager_style_stats', 'mfdb.company_style_stats'), false); } } catch(ex) { //TODO: Log errors rt += ex; } } } else { entity_info = SELECT entity_id, curve_type, strategy, end_date.temporalParse('yyyy-MM') AS end_date, factor_id AS benchmark_id FROM entity_date; tb_mc_bfi_indicator = create_mc_bfi_indicator(); //tb_mc_bfi_indicator.rename!('factor_id', 'benchmark_id'); generate_entity_bfi_indicator(entity_info, indicators, true, tb_mc_bfi_indicator, ['curve_type', 'strategy']); if(! tb_mc_bfi_indicator.isVoid() && tb_mc_bfi_indicator.size() > 0) { // save data to MySQL try { chg_columns_for_mysql(tb_mc_bfi_indicator, iif(entity_type == 'PL', 'manager_id', 'company_id')); save_and_sync(tb_mc_bfi_indicator, iif(entity_type == 'PL', 'raw_db.manager_ty_bfi_bm_indicator', 'raw_db.company_ty_bfi_bm_indicator'), ); // 数据初始化时将指标存入本地 if(is_save_local == true) save_table(tb_mc_bfi_indicator, iif(entity_type == 'PL', 'mfdb.manager_ty_bfi_bm_indicator', 'mfdb.company_ty_bfi_bm_indicator'), false); } catch(ex) { //TODO: Log errors rt = ex; } } } return rt; } /* * [定时任务]: 基金经理/公司月净值计算 * * Example: CalMCMonthlyNavTask('CO', 2024.12.04); */ def CalMCMonthlyNavTask(entity_type, updatetime) { //updatetime = 2024.11.05; //entity_type = 'CO'; if(!(entity_type IN ['PL', 'CO'])) return; is_save_local = iif(updatetime <= get_ini_data_const()['updatetime'], true, false); // 31 sec 简化起见,不区分curve_type, strategy; TODO: 性能能否优化? if(entity_type == 'PL') { entity_date = get_manager_list_by_fund_updatetime(updatetime); entity_date.rename!('manager_id', 'entity_id'); } else { entity_date = get_company_list_by_fund_updatetime(updatetime); entity_date.rename!('company_id', 'entity_id'); } // 15 sec cal_and_save_mc_monthly_nav(entity_type, entity_date, is_save_local); entity_date = null; } /* * [定时任务]: 基金经理/公司月收益及指标(含标准及BFI)计算 * * @param entity_type : PL, CO * * Example: CalMCIndicatorTask('CO', 2024.11.04); * CalMCIndicatorTask('PL', 2024.11.04); */ def CalMCIndicatorTask(entity_type, updatetime) { // entity_type = 'PL'; // updatetime = 2024.11.01 rt = ''; is_save_local = iif(updatetime <= get_ini_data_const()['updatetime'], true, false); // 3 sec entity_date = get_entity_list_by_nav_updatetime(entity_type, NULL, updatetime, true); i = 0; batch_size = 1000; v_entity_id = entity_date.entity_id.distinct(); max_cnt = v_entity_id.size(); ver_old_month = 1900.01M; do { // 取完整月净值 tb_entity_date = SELECT entity_id, curve_type, strategy, ver_old_month AS end_date FROM entity_date WHERE entity_id in v_entity_id[i : min(i + batch_size, max_cnt)]; if(tb_entity_date.isVoid() || tb_entity_date.size() == 0) break; s_json = tb_entity_date.toStdJson(); tb_nav = get_mc_nav_for_return_calculation('PL', s_json, 0); v_end_date = tb_nav.end_date.temporalParse('yyyy-MM'); tb_nav.replaceColumn!('end_date', v_end_date); tb_nav.join!(v_end_date.temporalFormat('yyyy-MM-dd').temporalParse('yyyy-MM-dd').businessMonthEnd() AS price_date); tb_nav.sortBy!(['entity_id', 'curve_type', 'strategy', 'end_date']); // 计算月度收益 tb_monthly_ret = SELECT entity_id, curve_type, strategy, end_date, price_date, cumulative_nav AS nav, cumulative_nav.ratios()-1 AS ret FROM tb_nav CONTEXT BY entity_id, curve_type, strategy; // 40+ min cal_and_save_mc_indicator(entity_type, entity_date, tb_monthly_ret, 'PBI', is_save_local); i += batch_size; } while (i < max_cnt); } /* * [定时任务]: 基金经理月BFI指标计算 * * * Example: CalManagerBfiIndicatorTask(2024.11.04); */ def CalManagerBfiIndicatorTask(updatetime) { // updatetime = 2024.11.01 rt = ''; entity_type = 'PL'; is_save_local = iif(updatetime <= get_ini_data_const()['updatetime'], true, false); // BFI indicator 计算由 bfi matching 表更新驱动 entity_date = get_mc_bfi_factors(entity_type, NULL, 1990.01M, today().month(), updatetime); entity_date.join!(entity_date.end_date AS price_date); i = 0; batch_size = 1000; v_entity_id = entity_date.entity_id.distinct(); max_cnt = v_entity_id.size(); do { // 取完整月收益 1+ min tb_monthly_ret = get_monthly_ret(entity_type, v_entity_id[i : min(i + batch_size, max_cnt)], 1900.01.01, today(), true); v_end_date = tb_monthly_ret.end_date.temporalParse('yyyy-MM'); tb_monthly_ret.replaceColumn!('end_date', v_end_date); UPDATE tb_monthly_ret SET price_date = end_date.temporalFormat('yyyy-MM-dd').temporalParse('yyyy-MM-dd').businessMonthEnd() WHERE price_date IS NULL; // 40+ min cal_and_save_mc_indicator(entity_type, entity_date, tb_monthly_ret, 'BFI', is_save_local); i += batch_size; } while (i < max_cnt); } /* * * [定时任务]: 基金经理的BFI MATCHING * * it takes 9.5 hours */ def MatchManagerBFITask(updatetime) { rt = ''; is_save_local = iif(updatetime <= get_ini_data_const()['updatetime'], true, false); entity_type = 'PL'; // 31 sec entity_date = get_mc_performance_by_updatetime(entity_type, updatetime); if(entity_date.isVoid() || entity_date.size() == 0) return rt; i = 0; batch_size = 1000; max_cnt = entity_date.size(); do { t_entity_date = entity_date[i : min(max_cnt, i + batch_size)]; // 22 min per 1000 records, way too slow t_bfi = match_mc_bfi(entity_type, t_entity_date); t_max_r2 = SELECT entity_id , curve_type, strategy, factor_id.first() AS factor_id, end_date, string(NULL) AS performance_flag, coe.first() AS coe, r2.first() AS r2, concat(factor_name, ",") AS rz_portrait FROM ej(t_bfi, get_bfi_index_list(), 'factor_id') GROUP BY entity_id, curve_type, strategy, end_date; try { // 高度怀疑 pf_manager_factor_bfi 表只是中间表,没有用,这里就不存了 // 有效 factors 存到 xxx_factor_bfi_by_category_group 表 chg_columns_for_mysql(t_bfi, 'manager_id'); save_and_sync(t_bfi, 'raw_db.pf_manager_factor_bfi_by_category_group', ); // 有效因子中 R2 最大的因子存 xxx_max_r2 chg_columns_for_mysql(t_max_r2, 'manager_id'); save_and_sync(t_max_r2, 'raw_db.pf_manager_factor_bfi_max_r2', ); } catch (ex) { //TODO: Log errors rt += ex; } i += batch_size } while (i < max_cnt); return rt; }