module fundit::task_fundPerformance use fundit::sqlUtilities; use fundit::operationDataPuller; use fundit::performanceDataPuller; use fundit::dataSaver; use fundit::returnCalculator; use fundit::indicatorCalculator; use fundit::rbsaCalculator; use fundit::bfiMatcher; use fundit::ms_dataPuller; /* * [定时任务]:最新净值触发的业绩指标计算 * * @param entityType : 'MF', 'HF'... * @param date : 净值更新时间, 为空时缺省为当前时间-1天;为1989.01.01或更早日期时代表初始化,指标会被存入本地数据库 * * NOTE: 与Java不同的是当月indicator计算每日触发,不必等到Month-end production * * Example: calFundPerformanceTask('MF', 2024.10.28); * calFundPerformanceTask('MI', 2024.10.28); * calFundPerformanceTask('MF', get_ini_data_const()['date']); -- 【初始化数据专用】(70min) */ def calFundPerformanceTask(entityType, date) { rt = ''; if(!(entityType IN ['MF', 'HF', 'MI', 'FI'])) return null; if(date.isNothing() || date.isNull()) end_day = temporalAdd(now(), -1d); else end_day = date; // 取有最新净值变动的基金列表 (1s) tb_cal_funds = get_entity_list_by_nav_updatetime(entityType, NULL, end_day, true); if(tb_cal_funds.isVoid() || tb_cal_funds.size() == 0 ) return; // 按照 MySQL 建好各表 tb_fund_performance = create_entity_performance(); tb_fund_indicator = create_entity_indicator(); tb_fund_risk_stats = create_entity_risk_stats(); tb_fund_riskadjret_stats = create_entity_riskadjret_stats(); tb_fund_style_stats = create_entity_style_stats(); tb_fund_performance_weekly = create_entity_performance_weekly(); tb_fund_latest_performance = create_entity_latest_performance(); // 分批跑 i = 0; batch_size = 1000; do { funds = tb_cal_funds[i : min(tb_cal_funds.size(), i+batch_size)]; if(funds.isVoid() || funds.size() == 0) break; // 200ms fund_info = SELECT entity_id, price_date, inception_date, benchmark_id, ini_value FROM ej(funds, get_entity_info(entityType, funds.entity_id), 'entity_id'); // 计算月收益 (12s) rets = mix_monthly_returns(entityType, fund_info); if(!rets.isVoid() && rets.size() > 0) { // 计算月度指标 (56s) rets.rename!('cumulative_nav', 'nav'); indicators = cal_monthly_indicators(entityType, 'PBI', rets); // 仿照MySQL的表结构准备好记录 (1s) generate_entity_performance(fund_info, indicators, true, tb_fund_performance); generate_entity_indicator(fund_info, indicators, true, tb_fund_indicator); generate_entity_risk_stats(fund_info, indicators, true, tb_fund_risk_stats); generate_entity_riskadjret_stats(fund_info, indicators, true, tb_fund_riskadjret_stats); generate_entity_style_stats(fund_info, indicators, true, tb_fund_style_stats); } // 计算周收益 (8s) rets_w = cal_weekly_returns(entityType, fund_info); if(! rets_w.isVoid() && rets_w.size() > 0) { generate_entity_performance_weekly(fund_info, rets_w, true, tb_fund_performance_weekly); } // 计算最新收益 (69s) perf_latest = cal_latest_performance(entityType, fund_info, true); if(! perf_latest.isVoid() && perf_latest.size() > 0) { generate_entity_latest_performance(fund_info, perf_latest, true, tb_fund_latest_performance); } i += batch_size; // } while (i < batch_size); } while (i <= tb_cal_funds.size()); if(! tb_fund_performance.isVoid() && tb_fund_performance.size() > 0) { // save data to MySQL (13s) try { chg_columns_for_mysql(tb_fund_performance, 'fund_id'); save_and_sync(tb_fund_performance, 'raw_db.fund_performance', 'raw_db.fund_performance'); chg_columns_for_mysql(tb_fund_indicator, 'fund_id'); save_and_sync(tb_fund_indicator, 'raw_db.fund_indicator', 'raw_db.fund_indicator'); chg_columns_for_mysql(tb_fund_risk_stats, 'fund_id'); // mfdb.fund_performance 表中 maxdrawdown_6m & maxdrawdown_ytd 是虚拟列,这里用数据列顺序强行写入真实列 6m_maxdrawdown & ytd_maxdrawdown (DolphinDB 不允许字段名以数字开头) save_and_sync(tb_fund_risk_stats, 'raw_db.fund_risk_stats', 'raw_db.fund_risk_stats'); chg_columns_for_mysql(tb_fund_riskadjret_stats, 'fund_id'); save_and_sync(tb_fund_riskadjret_stats, 'raw_db.fund_riskadjret_stats', 'raw_db.fund_riskadjret_stats'); chg_columns_for_mysql(tb_fund_style_stats, 'fund_id'); save_and_sync(tb_fund_style_stats, 'raw_db.fund_style_stats', 'raw_db.fund_style_stats'); save_and_sync(tb_fund_performance_weekly, 'raw_db.fund_performance_weekly', 'raw_db.fund_performance_weekly'); save_and_sync(tb_fund_latest_performance, 'raw_db.fund_latest_performance', 'raw_db.fund_latest_nav_performance'); // 数据初始化时将指标存入本地 if(end_day <= get_ini_data_const['date']) { save_table(tb_fund_performance, 'mfdb.fund_performance', false); save_table(tb_fund_indicator, 'mfdb.fund_indicator', false); save_table(tb_fund_risk_stats, 'mfdb.fund_risk_stats', false); save_table(tb_fund_riskadjret_stats, 'mfdb.fund_riskadjret_stats', false); save_table(tb_fund_style_stats, 'mfdb.fund_style_stats', false); save_table(tb_fund_performance_weekly, 'mfdb.fund_performance_weekly', false); save_table(tb_fund_latest_performance, 'mfdb.fund_latest_performance', false); } } catch(ex) { //TODO: Log errors rt = ex; } } return rt; } /* * [定时任务] 匹配BFI并存入数据库 * * */ def matchEntityBFI(entityType, date) { } /* * [定时任务] 计算BFI指标并存入数据库 * * @param entityType : 'MF', 'HF', 'PF'; 前两个是一样的 * @param date : BFI更新时间, 为空时缺省为当前时间的前1天;为1989.01.01或更早日期时代表初始化,指标会被存入本地数据库 * * * Example: calEntityBfiIndicatorTask('MF', 2024.10.28); * calEntityBfiIndicatorTask('PF', 2024.10.28); */ def calEntityBfiIndicatorTask(entityType, date) { // entityType = 'MF' // date = 2024.10.01 rt = ''; if(!(entityType IN ['MF', 'HF', 'PF'])) return null; very_old_day = 1900.01.01; if(date.isNothing() || date.isNull()) end_day = temporalAdd(now(), -1d); else end_day = date; // 1989.01.01及以前的日期被认为从本地读数据 isFromMySQL = iif(end_day <= 1989.01.01, false, true); // 取有最新bfi变动的基金列表 (1s) tb_cal_entities = get_entity_bfi_factors(entityType, NULL, very_old_day.month(), today().month(), end_day); if(tb_cal_entities.isVoid() || tb_cal_entities.size() == 0 ) return; v_uniq_entity_id = EXEC DISTINCT entity_id FROM tb_cal_entities; // 按照 MySQL 建好各表 tb_bfi_indicator = create_entity_bfi_indicator(iif(entityType=='PF', true, false)); // 分批跑 i = 0; batch_size = 100; do { entities = SELECT * FROM tb_cal_entities WHERE entity_id IN v_uniq_entity_id[i : min(v_uniq_entity_id.size(), i+batch_size)]; if(entities.isVoid() || entities.size() == 0) break; // 200ms entity_info = SELECT entity_id, end_date.temporalParse('yyyy-MM') AS end_date, inception_date, factor_id AS benchmark_id, ini_value FROM ej(entities, get_entity_info(entityType, entities.entity_id), 'entity_id'); // 取月收益 (12s) rets = get_monthly_ret(entityType, entity_info.entity_id, very_old_day, entity_info.end_date.max().temporalFormat('yyyy-MM-dd').temporalParse('yyyy-MM-dd').monthEnd(), isFromMySQL); // 把 yyyy-MM 格式的 end_date 改成 dolphin 的 MONTH v_end_date = rets.end_date.temporalParse('yyyy-MM'); rets.replaceColumn!('end_date', v_end_date); if(!rets.isVoid() && rets.size() > 0) { // 计算月度指标 (5s) indicators = cal_monthly_indicators(entityType, 'BFI', rets); // 仿照MySQL的表结构准备好记录 (1s) generate_entity_bfi_indicator(entity_info, indicators, true, tb_bfi_indicator); } i += batch_size; } while (i <= v_uniq_entity_id.size()); if(! tb_bfi_indicator.isVoid() && tb_bfi_indicator.size() > 0) { // save data to MySQL try { t_desc = get_bfi_indicator_table_description(entityType); chg_columns_for_mysql(tb_bfi_indicator, t_desc.sec_id_col[0]); db_name = t_desc.table_name[0].split('.')[0]; save_and_sync(tb_bfi_indicator, t_desc.table_name[0].strReplace(db_name, 'raw_db'), t_desc.table_name[0].strReplace(db_name, 'raw_db')); // 数据初始化时将指标存入本地,做排名之用 if(end_day <= get_ini_data_const['date']) { save_table(tb_bfi_indicator, t_desc.table_name[0], false); } } catch(ex) { //TODO: Log errors rt = ex; } } return rt; } /* * 根据收益更新日期计算 RBSA * * Example: CalFundRBSATask('MF', ['MF00003PW1'], 2024.10.14T10:00:00); */ def CalFundRBSATask(entityType, entityIds, updateTime) { // entityType = 'MF' //entityIds = ['MF00003PW1'] //updateTime = 2024.10.14T10:00:00 tb_result = table(100:0, ["entity_id", "asset_type_id", "index_id", "effective_date", "level", "alternative_id", "weighting"], [iif(entityType=='PF', INT, STRING), STRING, STRING, STRING, INT, STRING, DOUBLE]); t = get_entity_list_by_weekly_return_updatetime(entityType, entityIds, updateTime, true); window = 48; step = 13; if(t.isVoid() || t.size() == 0) return; d_rbsa = get_rbsa_index(); for(entity in t) { for(asset_type in d_rbsa.keys()) { // 起始日期是最早更新日期再向前推一个时间窗口 res = cal_entity_RBSA(entityType, entity.entity_id, d_rbsa[asset_type], 'w', t.price_date.temporalAdd(-window, 'w')[0], today(), true, window, step); // 每日任务只负责更新最新的rbsa结果 latest_date = (EXEC price_date.max() AS price_date FROM res)[0]; tb_result.tableInsert(SELECT entity_id, asset_type, index_id, price_date, level, alternative_id, weights FROM res WHERE price_date = latest_date); } } save_and_sync(tb_result, 'raw_db.pf_fund_rbsa_breakdown', 'raw_db.pf_fund_rbsa_breakdown'); } /* * 【临时】用于数据初始化:只计算收益 * * @param entityType : 'MF', 'HF'... * @param date : 净值更新时间 * */ def ms_calFundReturns() { rt = ''; very_old_date = 1990.01.01; // 取基金列表 (27s) tb_cal_funds = ms_get_fund_list_by_nav_updatetime(NULL, very_old_date); if(tb_cal_funds.isVoid() || tb_cal_funds.size() == 0 ) return; tb_fund_performance = create_entity_performance(); tb_fund_indicator = create_entity_indicator(); tb_fund_risk_stats = create_entity_risk_stats(); tb_fund_riskadjret_stats = create_entity_riskadjret_stats(); tb_fund_style_stats = create_entity_style_stats(); tb_fund_performance_weekly = create_entity_performance_weekly(); tb_fund_latest_performance = create_entity_latest_performance(); // 分批跑 i = 0; batch_size = 1000; do { funds = tb_cal_funds[i : min(tb_cal_funds.size(), i+batch_size)]; if(funds.isVoid() || funds.size() == 0) break; // 200ms fund_info = SELECT entity_id, price_date, inception_date, benchmark_id, ini_value FROM ej(funds, ms_get_fund_info(funds.entity_id), 'entity_id', 'fund_id'); // 计算月收益 (19s) tb_nav = ms_get_fund_monthly_nav(fund_info.entity_id); rets = cal_monthly_returns_by_nav(fund_info, tb_nav); if(!rets.isVoid() && rets.size() > 0) { // 计算月度指标 (67s) rets.rename!('cumulative_nav', 'nav'); indicators = cal_monthly_indicators('MF', 'PBI', rets); // 仿照MySQL的表结构准备好记录 (1s) generate_entity_performance(fund_info, indicators, true, tb_fund_performance); generate_entity_indicator(fund_info, indicators, true, tb_fund_indicator); generate_entity_risk_stats(fund_info, indicators, true, tb_fund_risk_stats); generate_entity_riskadjret_stats(fund_info, indicators, true, tb_fund_riskadjret_stats); generate_entity_style_stats(fund_info, indicators, true, tb_fund_style_stats); } // 计算周收益 (49s) rets_w = cal_weekly_returns('MF', fund_info); if(! rets_w.isVoid() && rets_w.size() > 0) { generate_entity_performance_weekly(fund_info, rets_w, true, tb_fund_performance_weekly); } // 计算最新收益 (23s) perf_latest = cal_latest_performance('MF', fund_info, true); if(! perf_latest.isVoid() && perf_latest.size() > 0) { generate_entity_latest_performance(fund_info, perf_latest, true, tb_fund_latest_performance); } i += batch_size; // } while (i < batch_size); } while (i <= tb_cal_funds.size()); if(! tb_fund_performance.isVoid() && tb_fund_performance.size() > 0) { // save data to MySQL (26m) try { chg_columns_for_mysql(tb_fund_performance, 'fund_id'); save_and_sync(tb_fund_performance, 'raw_db.fund_performance', 'raw_db.fund_performance'); chg_columns_for_mysql(tb_fund_indicator, 'fund_id'); save_and_sync(tb_fund_indicator, 'raw_db.fund_indicator', 'raw_db.fund_indicator'); chg_columns_for_mysql(tb_fund_risk_stats, 'fund_id'); // mfdb.fund_performance 表中 maxdrawdown_6m & maxdrawdown_ytd 是虚拟列,这里用数据列顺序强行写入真实列 6m_maxdrawdown & ytd_maxdrawdown (DolphinDB 不允许字段名以数字开头) save_and_sync(tb_fund_risk_stats, 'raw_db.fund_risk_stats', 'raw_db.fund_risk_stats'); chg_columns_for_mysql(tb_fund_riskadjret_stats, 'fund_id'); save_and_sync(tb_fund_riskadjret_stats, 'raw_db.fund_riskadjret_stats', 'raw_db.fund_riskadjret_stats'); chg_columns_for_mysql(tb_fund_style_stats, 'fund_id'); save_and_sync(tb_fund_style_stats, 'raw_db.fund_style_stats', 'raw_db.fund_style_stats'); save_and_sync(tb_fund_performance_weekly, 'raw_db.fund_performance_weekly', 'raw_db.fund_performance_weekly'); save_and_sync(tb_fund_latest_performance, 'raw_db.fund_latest_performance', 'raw_db.fund_latest_nav_performance'); } catch(ex) { //TODO: Log errors rt = ex; } } return rt; } /* * [定时任务] 计算基金和组合的BFI * * * TODO: max_r2 表在哪里被用到了?应该和基金推荐有关系 */ def MatchEntityBFI(entityType, date) { //entityType = 'MF' //date = 2024.11.20 rt = ''; if(find(['HF', 'MF', 'PF'], entityType) < 0) return null; // 取有最新净值变动的基金列表 (1s) tb_cal_entity = get_entity_list_by_nav_updatetime(entityType, NULL, date, true); if(tb_cal_entity.isVoid() || tb_cal_entity.size() == 0 ) return; i = 0; size = tb_cal_entity.size(); batch_size = 1000; entity_index_coe = create_entity_index_coe(); do { // 4 min per 1000 funds coe = cal_entity_index_coe(entityType, tb_cal_entity[i : min(size, i+batch_size)]); if(coe.isVoid() || coe.size() == 0) continue; entity_info = get_entity_info(entityType, tb_cal_entity[i : min(size, i+batch_size)].entity_id); bfi_raw = match_entity_bfi(entityType, entity_info, coe); // 先存到数据库,落袋为安 try { // 筛掉 correlation 绝对值不够阈值的记录 t_coe = SELECT entity_id, end_date, index_id, iif(coe_1y.abs() < get_min_threshold('correlation'), double(NULL), coe_1y) AS coe_1y, iif(coe_3y.abs() < get_min_threshold('correlation'), double(NULL), coe_3y) AS coe_3y, iif(coe_5y.abs() < get_min_threshold('correlation'), double(NULL), coe_5y) AS coe_5y, info_ratio_1y, info_ratio_3y, info_ratio_5y, t_value_1y, t_value_3y, t_value_5y, beta_1y, beta_3y, beta_5y FROM coe; DELETE FROM t_coe WHERE coe_1y IS NULL AND coe_3y IS NULL AND coe_5y IS NULL; // 候选因子 t_bfi_candidates = SELECT entity_id, end_date, index_id AS factor_id, coe_1y AS coe, coe_1y.square() AS r2, 'w' AS performance_flag, t_value_1y, beta_1y FROM t_coe WHERE index_id LIKE 'FA%'; chg_columns_for_mysql(t_coe, iif(entityType == 'PF', 'portfolio_id', 'fund_id')); // 只有基金需要存 index_coe 表 if(entityType IN ['MF', 'HF']) save_and_sync(t_coe, 'raw_db.pf_fund_index_coe', ); // 所有的 factors 存到 xxx_factor_bfi 表;NOTE: Java 把所有 factor 的数据都存起来,这里只存 correlation 达标的记录 (反正这个表没啥用?) chg_columns_for_mysql(t_bfi_candidates, iif(entityType == 'PF', 'portfolio_id', 'fund_id')); save_and_sync(t_bfi_candidates, iif(entityType == 'PF', 'raw_db..pf_portfolio_factor_bfi', 'raw_db.cm_fund_factor_bfi'), ); if(bfi_raw.isVoid() || bfi_raw.size() == 0) continue; // 有效因子 t_bfi = SELECT entity_id, end_date, factor_id, coe_1y AS coe, r2, performance_flag, t_value_1y, beta_1y FROM bfi_raw ORDER BY entity_id, end_date, r2 DESC; // 最大R2因子及所有有效因子标签 t_max_r2 = SELECT entity_id, factor_id.first() AS factor_id, end_date, int(NULL) AS performance_flag, coe.first() AS coe, r2.first() AS r2, concat(factor_name, ",") AS rz_portrait FROM ej(t_bfi, get_bfi_index_list(), 'factor_id') GROUP BY entity_id, end_date; // 有效 factors 存到 xxx_factor_bfi_by_category_group 表 chg_columns_for_mysql(t_bfi, iif(entityType == 'PF', 'portfolio_id', 'fund_id')); save_and_sync(t_bfi_candidates, iif(entityType == 'PF', 'raw_db.pf_portfolio_factor_bfi_by_category_group', 'raw_db.pf_fund_factor_bfi_by_category_group'), ); // 有效因子中 R2 最大的因子存 xxx_max_r2 chg_columns_for_mysql(t_max_r2, iif(entityType == 'PF', 'portfolio_id', 'fund_id')); save_and_sync(t_max_r2, iif(entityType == 'PF', 'raw_db.pf_portfolio_factor_bfi_max_r2', 'raw_db.pf_fund_factor_bfi_by_category_group_max_r2'), ); } catch (ex) { //TODO: Log errors rt += ex + '\n'; } i = i + batch_size; } while (i