module fundit::task_portfolioPerformance use fundit::dataPuller; use fundit::dataSaver; use fundit::navCalculator; use fundit::returnCalculator; use fundit::indicatorCalculator; /* * 计算组合历史净值(不存数据库) * * @param portfolio_ids : 组合IDS,为空时跑全集(但不建议,因为可能会很吃内存) * @param updatetime : 持仓证券净值更新时间,忽略时跑全历史 * * @return : portfolio_id, price_date, ret, nav * * Example:calPortfolioNAV([143109, 145041]); * calPortfolioNAV([143109, 145041], 2024.10.28); */ def calPortfolioNAV(portfolio_ids, updatetime=1900.01.01) { // portfolio_ids=[364743, 364744]; // updatetime=1900.01.01; port_info = get_portfolio_list_by_fund_nav_updatetime(portfolio_ids, updatetime, true); tb_nav = cal_portfolio_nav(port_info); return tb_nav; } /* * 计算组合历史收益和指标(不存数据库) * * @param navs
: NEED COLUMNS portfolio_id, price_date, ret, nav * * @return : * * Example:calPortfolioPerformance(calPortfolioNAV([143109, 145041])); */ def calPortfolioPerformance(navs) { if(navs.isVoid() || navs.size() == 0) return; tb_navs = navs; tb_navs.rename!(['portfolio_id'], ['entity_id']); port_ids = EXEC DISTINCT entity_id from tb_navs; port_info = get_entity_info('PF', port_ids); // 这个函数会根据情况加入成立日当月的初始净值,比直接用navs表中可能带的ret更方便 tb_navs.rename!('nav', 'cumulative_nav'); tb_month_ret = cal_monthly_returns_by_nav(port_info, tb_navs); tb_month_ret.rename!('cumulative_nav', 'nav'); indicators = cal_monthly_indicators('PF', 'PBI', tb_month_ret); return indicators; } /* * 计算组合净值并存入数据库 * * TODO: release 时改变同步目标表为正式表 */ def cal_and_save_portfolio_nav(cal_portfolio_info) { rt = ''; // 准备类似MySQL结构的数据表 tb_portfolio_nav = create_entity_nav(); // 分批跑 i = 0; batch_size = 1000; all_portfolio_id = EXEC DISTINCT portfolio_id FROM cal_portfolio_info; do { // 先把净值算出来存入数据库,落袋为安 portfolio_info = SELECT * FROM cal_portfolio_info WHERE portfolio_id IN all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)]; if(portfolio_info.isVoid() || portfolio_info.size() == 0) break; // 30 sec tb_ret = cal_portfolio_nav(portfolio_info); INSERT INTO tb_portfolio_nav SELECT portfolio_id$STRING, price_date, nav FROM tb_ret; i += batch_size; } while (i <= cal_portfolio_info.size()); if(! tb_portfolio_nav.isVoid() && tb_portfolio_nav.size() > 0) { // save data to MySQL (12 sec) try { tb_portfolio_nav.rename!('entity_id', 'portfolio_id'); save_and_sync(tb_portfolio_nav, 'raw_db.pf_portfolio_nav', 'raw_db.pf_portfolio_nav'); } catch(ex) { //TODO: Log errors rt = ex; } } return rt; } /* * 计算组合标准指标并存入数据库 * * TODO: release 时改变同步目标表为正式表 */ def cal_and_save_portfolio_indicators(cal_portfolio_info) { rt = ''; // 准备类似MySQL结构的数据表 tb_portfolio_performance = create_entity_performance(true); tb_portfolio_indicator = create_entity_indicator(true); tb_portfolio_risk_stats = create_entity_risk_stats(true); tb_portfolio_riskadjret_stats = create_entity_riskadjret_stats(true); tb_portfolio_style_stats = create_entity_style_stats(true); tb_portfolio_performance_weekly = create_entity_performance_weekly(true); tb_portfolio_latest_performance = create_entity_latest_performance(true); // 分批跑 i = 0; batch_size = 1000; all_portfolio_id = EXEC DISTINCT portfolio_id FROM cal_portfolio_info; do { cal_port = SELECT * FROM cal_portfolio_info WHERE portfolio_id IN all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)]; if(cal_port.isVoid() || cal_port.size() == 0) break; // 取数据库月度净值及前值 5 sec s_json = (SELECT portfolio_id, 1900.01.01 AS price_date FROM cal_port GROUP BY portfolio_id).rename!('portfolio_id', 'sec_id').toStdJson(); tb_monthly_nav = get_nav_for_return_calculation('PF', 'm', s_json); // 把 portfolio id 字段从字符串换回整型,不然后面Join table的时候会出错 v_portfolio_id = tb_monthly_nav.sec_id$INT; tb_monthly_nav.replaceColumn!('sec_id', v_portfolio_id); tb_monthly_nav.dropColumns!('nav').rename!(['sec_id', 'cumulative_nav'], ['portfolio_id', 'nav']); // 计算各标准指标 indicators = calPortfolioPerformance(tb_monthly_nav); // 仿照MySQL的表结构准备好记录 (1s) port_info = (SELECT portfolio_id, start_cal_date.min() AS price_date FROM cal_port GROUP BY portfolio_id).rename!('portfolio_id', 'entity_id'); generate_entity_performance(port_info, indicators, true, tb_portfolio_performance); generate_entity_indicator(port_info, indicators, true, tb_portfolio_indicator); generate_entity_risk_stats(port_info, indicators, true, tb_portfolio_risk_stats); generate_entity_riskadjret_stats(port_info, indicators, true, tb_portfolio_riskadjret_stats); generate_entity_style_stats(port_info, indicators, true, tb_portfolio_style_stats); // 计算周收益 (49s) port_info = SELECT * FROM ej(port_info, get_entity_info('PF', all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)]), 'entity_id') rets_w = cal_weekly_returns('PF', port_info); if(! rets_w.isVoid() && rets_w.size() > 0) { // 把 portfolio id 字段从字符串换回整型,不然后面Join table的时候会出错 v_portfolio_id = rets_w.entity_id$INT; rets_w.replaceColumn!('entity_id', v_portfolio_id); generate_entity_performance_weekly(port_info, rets_w, true, tb_portfolio_performance_weekly); } // 计算最新收益 (23s) perf_latest = cal_latest_performance('PF', port_info, true); if(! perf_latest.isVoid() && perf_latest.size() > 0) { generate_entity_latest_performance(port_info, perf_latest, true, tb_portfolio_latest_performance); } i += batch_size; } while (i <= cal_portfolio_info.size()); if(! tb_portfolio_performance.isVoid() && tb_portfolio_performance.size() > 0) { // save data to MySQL try { chg_columns_for_mysql(tb_portfolio_performance, 'portfolio_id'); save_and_sync(tb_portfolio_performance, 'raw_db.pf_portfolio_performance', 'raw_db.pf_portfolio_performance'); chg_columns_for_mysql(tb_portfolio_indicator, 'portfolio_id'); save_and_sync(tb_portfolio_indicator, 'raw_db.pf_portfolio_indicator', 'raw_db.pf_portfolio_indicator'); chg_columns_for_mysql(tb_portfolio_risk_stats, 'portfolio_id'); save_and_sync(tb_portfolio_risk_stats, 'raw_db.pf_portfolio_risk_stats', 'raw_db.pf_portfolio_risk_stats'); chg_columns_for_mysql(tb_portfolio_riskadjret_stats, 'portfolio_id'); save_and_sync(tb_portfolio_riskadjret_stats, 'raw_db.pf_portfolio_riskadjret_stats', 'raw_db.pf_portfolio_riskadjret_stats'); chg_columns_for_mysql(tb_portfolio_style_stats, 'portfolio_id'); save_and_sync(tb_portfolio_style_stats, 'raw_db.pf_portfolio_style_stats', 'raw_db.pf_portfolio_style_stats'); save_and_sync(tb_portfolio_performance_weekly, 'raw_db.pf_portfolio_performance_weekly', 'raw_db.pf_portfolio_performance_weekly'); save_and_sync(tb_portfolio_latest_performance, 'raw_db.pf_portfolio_latest_performance', 'raw_db.pf_portfolio_latest_performance'); } catch(ex) { //TODO: Log errors rt = ex; } } return rt; } /* * [定时任务]批量计算组合净值、收益及指标 * * @param updatetime : 持仓证券净值更新时间,忽略时跑全历史 * * * Example: CalPortfolioPerformanceTask(2024.10.28); */ def CalPortfolioPerformanceTask(updatetime=1900.01.01) { rt = ''; // 3 min tb_cal_ports = get_portfolio_list_by_fund_nav_updatetime(NULL, updatetime, true); if(tb_cal_ports.isVoid() || tb_cal_ports.size() == 0) return; // 26 min rt = cal_and_save_portfolio_nav(tb_cal_ports); // 9 min rt = rt + '; ' + cal_and_save_portfolio_indicators(tb_cal_ports); return rt; }