module fundit::task_portfolioPerformance use fundit::sqlUtilities; use fundit::operationDataPuller; use fundit::performanceDataPuller; use fundit::portfolioDataPuller; use fundit::dataSaver; use fundit::navCalculator; use fundit::returnCalculator; use fundit::indicatorCalculator; /* * 计算组合历史净值(不存数据库) * * @param portfolio_ids : 组合IDS,为空时跑全集(但不建议,因为可能会很吃内存) * @param updatetime : 持仓证券净值更新时间,忽略时跑全历史 * * @return : portfolio_id, price_date, ret, nav * * Example:calPortfolioNAV([143109, 145041]); * calPortfolioNAV([143109, 145041], 2024.10.28); */ def calPortfolioNAV(portfolio_ids, updatetime=1900.01.01) { // portfolio_ids=[364743, 364744]; // updatetime=1900.01.01; port_info = get_portfolio_list_by_fund_nav_updatetime(portfolio_ids, updatetime, true); tb_nav = cal_portfolio_nav(port_info); return tb_nav; } /* * 计算组合历史收益和指标(不存数据库) * * @param navs
: NEED COLUMNS portfolio_id, price_date, ret, nav * * @return : * * Example:calPortfolioPerformance(calPortfolioNAV([143109, 145041])); */ def calPortfolioPerformance(navs) { if(navs.isVoid() || navs.size() == 0) return; tb_navs = navs; tb_navs.rename!(['portfolio_id'], ['entity_id']); port_ids = EXEC DISTINCT entity_id from tb_navs; port_info = get_entity_info('PF', port_ids); // 这个函数会根据情况加入成立日当月的初始净值,比直接用navs表中可能带的ret更方便 tb_navs.rename!('nav', 'cumulative_nav'); tb_month_ret = cal_monthly_returns_by_nav(port_info, tb_navs); tb_month_ret.rename!('cumulative_nav', 'nav'); indicators = cal_monthly_indicators('PF', 'PBI', tb_month_ret); return indicators; } /* * 计算组合历史收益和指标(不存数据库) * * @param navs
: NEED COLUMNS entity_id, price_date, ret, nav * * @return : * * Example:calEntityPerformance('PF', calPortfolioNAV([143109, 145041])); */ def calEntityPerformance(entity_type, navs) { if(navs.isVoid() || navs.size() == 0) return; tb_navs = navs; //tb_navs.rename!(['portfolio_id'], ['entity_id']); entity_ids = EXEC DISTINCT entity_id from tb_navs; entity_info = get_entity_info(entity_type, entity_ids); // 这个函数会根据情况加入成立日当月的初始净值,比直接用navs表中可能带的ret更方便 tb_navs.rename!('nav', 'cumulative_nav'); tb_month_ret = cal_monthly_returns_by_nav(entity_info, tb_navs); tb_month_ret.rename!('cumulative_nav', 'nav'); indicators = cal_monthly_indicators(entity_type, 'PBI', tb_month_ret); return indicators; } /* * 计算组合净值并存入数据库 * */ def cal_and_save_portfolio_nav(cal_portfolio_info, is_save_local) { rt = ''; // 准备类似MySQL结构的数据表 tb_portfolio_nav = create_entity_nav(true); // 分批跑 i = 0; batch_size = 1000; all_portfolio_id = EXEC DISTINCT portfolio_id FROM cal_portfolio_info; do { // 先把净值算出来存入数据库,落袋为安 portfolio_info = SELECT * FROM cal_portfolio_info WHERE portfolio_id IN all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)]; if(portfolio_info.isVoid() || portfolio_info.size() == 0) break; // 30 sec per 1000 portfolios tb_ret = cal_portfolio_nav(portfolio_info); INSERT INTO tb_portfolio_nav SELECT entity_id, price_date, nav FROM tb_ret; i += batch_size; } while (i <= cal_portfolio_info.size()); if(! tb_portfolio_nav.isVoid() && tb_portfolio_nav.size() > 0) { // save data to MySQL (12 sec) try { tb_portfolio_nav.rename!('entity_id', 'portfolio_id'); save_and_sync(tb_portfolio_nav, 'raw_db.pf_portfolio_nav', 'raw_db.pf_portfolio_nav'); // 数据初始化时将指标存入本地 if(is_save_local == true) { save_table(tb_portfolio_nav, 'pfdb.pf_portfolio_nav', false); } } catch(ex) { //TODO: Log errors rt = ex; } } return rt; } /* * 通用计算标准指标并存入数据库 * * @param entity_type * @param cal_entity_info
: [COLUMNS] entity_id, start_cal_date * @param is_save_local * */ def cal_and_save_entity_indicators(entity_type, cal_entity_info, is_save_local) { // cal_entity_info = tb_cal_factors // entity_type = 'FA' rt = ''; is_id_interger = iif(entity_type == 'PF', true, false); // 准备类似MySQL结构的数据表 tb_entity_performance = create_entity_performance(is_id_interger); tb_entity_indicator = create_entity_indicator(is_id_interger); tb_entity_risk_stats = create_entity_risk_stats(is_id_interger); tb_entity_riskadjret_stats = create_entity_riskadjret_stats(is_id_interger); tb_entity_style_stats = create_entity_style_stats(is_id_interger); tb_entity_performance_weekly = create_entity_performance_weekly(is_id_interger); tb_entity_latest_performance = create_entity_latest_performance(is_id_interger); // 分批跑 i = 0; batch_size = 1000; all_entity_id = EXEC DISTINCT entity_id FROM cal_entity_info; do { cal_entity = SELECT * FROM cal_entity_info WHERE entity_id IN all_entity_id[i : min(all_entity_id.size(), i+batch_size)]; if(cal_entity.isVoid() || cal_entity.size() == 0) break; // 取数据库月度净值及前值 5 sec s_json = (SELECT entity_id, 1900.01.01 AS price_date FROM cal_entity GROUP BY entity_id).rename!('entity_id', 'sec_id').toStdJson(); tb_monthly_nav = get_nav_for_return_calculation(entity_type, 'm', s_json); // 把组合 entity id 字段从字符串换回整型,不然后面Join table的时候会出错 if(entity_type=='PF') { v_entity_id = tb_monthly_nav.sec_id$INT; tb_monthly_nav.replaceColumn!('sec_id', v_entity_id); } tb_monthly_nav.dropColumns!('nav').rename!(['sec_id', 'cumulative_nav'], ['entity_id', 'nav']); // 计算各标准指标 indicators = calEntityPerformance(entity_type, tb_monthly_nav); // 仿照MySQL的表结构准备好记录 (1s) entity_info = (SELECT entity_id, start_cal_date.min() AS price_date FROM cal_entity GROUP BY entity_id); generate_entity_performance(entity_info, indicators, true, tb_entity_performance); generate_entity_indicator(entity_info, indicators, true, tb_entity_indicator); generate_entity_risk_stats(entity_info, indicators, true, tb_entity_risk_stats); generate_entity_riskadjret_stats(entity_info, indicators, true, tb_entity_riskadjret_stats); generate_entity_style_stats(entity_info, indicators, true, tb_entity_style_stats); // 计算周收益 (49s) entity_info = SELECT * FROM ej(entity_info, get_entity_info(entity_type, all_entity_id[i : min(all_entity_id.size(), i+batch_size)]), 'entity_id') rets_w = cal_weekly_returns(entity_type, entity_info); if(! rets_w.isVoid() && rets_w.size() > 0) { // 把 entity id 字段从字符串换回整型,不然后面Join table的时候会出错 if(entity_type == 'PF') { v_entity_id = rets_w.entity_id$INT; rets_w.replaceColumn!('entity_id', v_entity_id); } generate_entity_performance_weekly(entity_info, rets_w, true, tb_entity_performance_weekly); } // 计算最新收益 (23s) perf_latest = cal_latest_performance(entity_type, entity_info, true); if(! perf_latest.isVoid() && perf_latest.size() > 0) { generate_entity_latest_performance(entity_info, perf_latest, true, tb_entity_latest_performance); } i += batch_size; } while (i <= cal_entity_info.size()); if(! tb_entity_performance.isVoid() && tb_entity_performance.size() > 0) { // save data to MySQL try { des = get_performance_table_description(entity_type)[0]; chg_columns_for_mysql(tb_entity_performance, des.sec_id_col); tb_entity_performance.rename!('cumulative_nav', des.cumulative_nav_col); save_and_sync(tb_entity_performance, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), ); if(is_save_local == true) save_table(tb_entity_performance, des.table_name, false); des = get_indicator_table_description(entity_type)[0]; chg_columns_for_mysql(tb_entity_indicator, des.sec_id_col); save_and_sync(tb_entity_indicator, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), ); if(is_save_local == true) save_table(tb_entity_indicator, des.table_name, false); des = get_risk_stats_table_description(entity_type)[0]; chg_columns_for_mysql(tb_entity_risk_stats, des.sec_id_col); save_and_sync(tb_entity_risk_stats, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), ); if(is_save_local == true) save_table(tb_entity_risk_stats, des.table_name, false); des = get_riskadjret_stats_table_description(entity_type)[0]; chg_columns_for_mysql(tb_entity_riskadjret_stats, des.sec_id_col); save_and_sync(tb_entity_riskadjret_stats, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), ); if(is_save_local == true) save_table(tb_entity_riskadjret_stats, des.table_name, false); des = get_capture_style_table_description(entity_type)[0]; chg_columns_for_mysql(tb_entity_style_stats, des.sec_id_col); save_and_sync(tb_entity_style_stats, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), ); if(is_save_local == true) save_table(tb_entity_style_stats, des.table_name, false); des = get_performance_weekly_table_description(entity_type)[0]; tb_entity_performance_weekly.rename!('cumulative_nav', des.cumulative_nav_col); save_and_sync(tb_entity_performance_weekly, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), ); if(is_save_local == true) save_table(tb_entity_performance_weekly, des.table_name, false); des = get_latest_performance_table_description(entity_type)[0]; tb_entity_latest_performance.rename!('cumulative_nav', des.cumulative_nav_col); save_and_sync(tb_entity_latest_performance, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), ); if(is_save_local == true) save_table(tb_entity_latest_performance, des.table_name, false); } catch(ex) { //TODO: Log errors rt = ex; } } return rt; } /* * [定时任务]批量计算组合净值、收益及指标 * * @param updatetime : 持仓证券净值更新时间,忽略或传入1989.01.01及更早的日期被认为在做数据初始化 * * * Example: CalPortfolioPerformanceTask(2024.10.28); * CalPortfolioPerformanceTask(1989.01.01); -- 【初始化专用】 (45min) */ def CalPortfolioPerformanceTask(updatetime) { rt = ''; // 3 min tb_cal_ports = get_portfolio_list_by_fund_nav_updatetime(NULL, updatetime, true); if(tb_cal_ports.isVoid() || tb_cal_ports.size() == 0) return; is_save_local = iif(updatetime <= get_ini_data_const()['date'], true, false); // 26 min rt = cal_and_save_portfolio_nav(tb_cal_ports, is_save_local); // 9 min tb_cal_ports.rename!('portfolio_id', 'entity_id'); rt = rt + '; ' + cal_and_save_entity_indicators('PF', tb_cal_ports, is_save_local); return rt; } /* * 批量计算BFI因子净值 * * Example: cal_and_save_factor_nav(get_bfi_factor_list_by_index_nav_updatetime(['FA00000VMJ'], updatetime, true);, false); * */ def cal_and_save_factor_nav(cal_factor_info, is_save_local) { ret = '' t_factor_value = table(100:0, ['factor_id', 'price_date', 'factor_value'], [SYMBOL, DATE, DOUBLE]); // 因子个数有限,用循环更简便 for(factor in cal_factor_info) { v_factor_id = array(STRING, 0).append!(factor.factor_id); // 取因子成分指数 tb_holdings = get_fixed_weight_portfolio_holding('FA', v_factor_id); UPDATE tb_holdings SET first_cal_date = first_cal_date, latest_cal_date = latest_cal_date FROM ej(tb_holdings, cal_factor_info, 'entity_id', 'factor_id'); s_json = (SELECT sec_id, first_cal_date.min() AS price_date FROM tb_holdings GROUP BY sec_id).toStdJson(); // 取含前值的成分指数点位 tb_nav = get_nav_for_return_calculation('MI', 'd', s_json).sortBy!(['sec_id', 'price_date'], [1, 1]); // 计算每期收益 UPDATE tb_nav SET ret = cumulative_nav.ratios() - 1 CONTEXT BY sec_id; t = SELECT h.entity_id, n.price_date, h.sec_id, n.ret, h.weight/100 AS weight FROM tb_holdings AS h INNER JOIN tb_nav AS n ON h.sec_id = n.sec_id ORDER BY h.entity_id, h.sec_id, n.price_date; t_factor = SELECT factor_id AS entity_id, first_cal_date, latest_cal_date FROM cal_factor_info WHERE factor_id = factor.factor_id; t_tmp = cal_nav_by_return('FA', t_factor, t); if(!t_tmp.isVoid() && t_tmp.size() > 0) { INSERT INTO t_factor_value SELECT entity_id AS factor_id, price_date, nav AS factor_value FROM t_tmp; } } if(! t_factor_value.isVoid() && t_factor_value.size() > 0) { save_and_sync(t_factor_value, 'raw_db.cm_factor_value', 'raw_db.cm_factor_value'); if(is_save_local == true) { save_table(t_factor_value, 'pfdb.cm_factor_value', false); } } } /* * 计算基于指数组合的因子日收益 * * */ def cal_and_save_synthesis_factor_nav(updatetime, is_save_local) { // factor_type = 5: 根据成分指数净值更新日期,取有影响的因子 tb_cal_factors = get_bfi_factor_list_by_index_nav_updatetime(NULL, updatetime, true); if(tb_cal_factors.isVoid() || tb_cal_factors.size() == 0) return null; // 26 min cal_and_save_factor_nav(tb_cal_factors, is_save_local); return tb_cal_factors; } /* * 计算基于指数移动收益的因子日收益 * * TODO: 算法暂时与Java相近,虽然看起来比较可疑 */ def cal_moving_avg_factor_ret(updatetime) { t_factor_return = table(100:0, ['factor', 'price_date', 'ret'], [SYMBOL, DATE, DOUBLE]); index_ids = ['IN0000007N']; // 中证全指 factor_momentum = 'FA000000MT'; // 动量因子 factor_reverse = 'FA000000RV'; // 反转因子 t_index_date = get_entity_list_by_nav_updatetime('MI', index_ids, updatetime, true); if(t_index_date.isVoid() || t_index_date.size() == 0) return t_factor_return; // 倒着多取1年的净值 s_json = (SELECT entity_id AS sec_id, price_date.temporalAdd(-1y) AS price_date FROM t_index_date).toStdJson(); t_index_nav = get_nav_for_return_calculation('MI', 'd', s_json, pre_nav_incld=2); if(t_index_nav.isVoid() || t_index_nav.size() == 0) return t_factor_return; // 取上交所交易日历 v_trade_day = getMarketCalendar('SSE', t_index_nav.price_date.min(), today()); t_index_nav = SELECT * FROM t_index_nav WHERE price_date in v_trade_day; t_index_ret = SELECT sec_id AS entity_id, price_date, cumulative_nav.ratios()-1 AS ret FROM t_index_nav.sortBy!(['sec_id', 'price_date']); // 反转因子:中证全指过去1个月(Java 是20日)的平均日收益 t_ret = SELECT factor_reverse AS factor_id , rt.price_date, ret FROM ( SELECT entity_id, price_date, tmavg(price_date, ret, 1M) AS ret FROM t_index_ret CONTEXT BY rt.entity_id ) rt INNER JOIN t_index_date dt ON rt.entity_id = dt.entity_id WHERE rt.price_date >= dt.price_date; t_factor_return.tableInsert(t_ret); // 动量因子:中证全指过去1年 (Java是220个交易日中200个交易日?没看明白)的平均日收益 t_ret = SELECT factor_momentum AS factor_id, rt.price_date, ret FROM ( SELECT entity_id, price_date, tmavg(price_date, ret, 1y) AS ret FROM t_index_ret CONTEXT BY rt.entity_id ) rt INNER JOIN t_index_date dt ON rt.entity_id = dt.entity_id WHERE rt.price_date >= dt.price_date; t_factor_return.tableInsert(t_ret); return t_factor_return; } /* * 计算基于债券的因子日收益 * * TODO: 算法看起来比较可疑,并且债指2022-03以后就不再有 duration, convexity 数据 */ def cal_bond_factor_ret(updatetime) { t_factor_return = table(100:0, ['factor', 'price_date', 'ret'], [SYMBOL, DATE, DOUBLE]); index_ids = ['IN0000007A', 'IN0000008I', 'IN0000008B', 'IN0000008C']; // 中证国债、长期国债、中高信用、中低信用 factor_term = 'FA000000ST'; // 期限因子 factor_credit_spread = 'FA000000CD'; // 信用利差因子 factor_hybond = 'FA00000HYB'; // 高收益债因子 t_index_date = get_entity_list_by_nav_updatetime('MI', index_ids, updatetime, true); if(t_index_date.isVoid() || t_index_date.size() == 0) return t_factor_return; // 倒着多取1年的净值 s_json = (SELECT entity_id AS sec_id, price_date.temporalAdd(-1y) AS price_date FROM t_index_date).toStdJson(); t_index_nav = get_nav_for_return_calculation('MI', 'd', s_json, pre_nav_incld=2); if(t_index_nav.isVoid() || t_index_nav.size() == 0) return t_factor_return; // 取上交所交易日历 v_trade_day = getMarketCalendar('SSE', t_index_nav.price_date.min(), today()); t_index_nav = SELECT * FROM t_index_nav WHERE price_date in v_trade_day; t_index_ret = SELECT sec_id AS entity_id, price_date, cumulative_nav.ratios()-1 AS ret FROM t_index_nav.sortBy!(['sec_id', 'price_date']); // TODO: ret = R_1 * W_1 - R_2 * W_2; 其中: R_x 代表收益,W_x 代表权重,D_x 代表久期; // W_1 = D_2 /(D_2 - D_1), W_2 = D_1/(D_2 - D_1) ??? JAVA 就是这个权重逻辑 return t_factor_return; } /* * [定时任务]批量计算因子净值、收益及指标 * * @param updatetime : 成分指数净值更新时间,忽略或传入1989.01.01及更早的日期被认为在做数据初始化 * * TODO: 非BFI的因子还未涉及,需要参考 PerformanceAttributionFactorServiceImpl * * * Example: CalFactorPerformanceTask(2024.10.28); * CalFactorPerformanceTask(1989.01.01); -- 【初始化专用】 (1.3 min) */ def CalFactorPerformanceTask(updatetime) { //updatetime=2024.10.28 rt = ''; is_save_local = iif(updatetime <= get_ini_data_const()['date'], true, false); tb_cal_factors = cal_and_save_synthesis_factor_nav(updatetime, is_save_local); // 9 min tb_cal_factors.rename!(['factor_id', 'first_cal_date', 'latest_cal_date'], ['entity_id', 'start_cal_date', 'end_cal_date']); rt = rt + '; ' + cal_and_save_entity_indicators('FA', tb_cal_factors, is_save_local); return rt; } /* * [定时任务]批量计算各类平均指数的点位 * * @param updatetime : 成分指数净值更新时间,忽略或传入1989.01.01及更早的日期被认为在做数据初始化 * * TODO: M* category average, manager, company * * * Example: CalCategoryAverageNavTask(2024.11.01); (22min) * CalCategoryAverageNavTask(1989.01.01); -- 【初始化专用】 */ def CalCategoryAverageNavTask(updatetime) { rt = ''; v_category_type = ['strategy', 'substrategy', 'bfi']; // 取有周收益有更新的最早日期 date_hedge_fund = get_oldest_date_by_weekly_return_updatetime('PF', updatetime, true); date_mutual_fund = get_oldest_date_by_weekly_return_updatetime('MF', updatetime, true); if(date_hedge_fund.isNull() && date_mutual_fund.isNull()) return rt; is_save_local = iif(updatetime <= get_ini_data_const()['date'], true, false); // for(category_type in v_category_type) { oldest_date = min([date_hedge_fund, date_mutual_fund]); // it could take mysql a few minutes to get results t_ret = get_category_avg_weekly_return(category_type, oldest_date, 5, 30, true); if(t_ret.isVoid() || t_ret.size() == 0) continue; t_ret.rename!('index_id', 'entity_id'); t_tmp = cal_entity_nav_by_return('FI', t_ret, 'w'); if(! t_tmp.isVoid() && t_tmp.size() > 0) { t_index_value = SELECT entity_id AS index_id, price_date, nav AS index_value, incl_cal_count AS incl_cal_fund_count, total_cnt AS total_fund_count FROM ej(t_tmp, t_ret, ['entity_id', 'price_date']); save_and_sync(t_index_value, 'raw_db.indexes_ty_index', ); if(is_save_local == true) { save_table(t_index_value, 'mfdb.indexes_ty_index', false); } } } return rt; }