task_portfolioPerformance.dos 12 KB


  1. module fundit::task_portfolioPerformance
  2. use fundit::sqlUtilities;
  3. use fundit::operationDataPuller;
  4. use fundit::performanceDataPuller;
  5. use fundit::portfolioDataPuller;
  6. use fundit::dataSaver;
  7. use fundit::navCalculator;
  8. use fundit::returnCalculator;
  9. use fundit::indicatorCalculator;
  10. /*
  11. * 计算组合历史净值(不存数据库)
  12. *
  13. * @param portfolio_ids <STRING|VECTOR>: 组合IDS,为空时跑全集(但不建议,因为可能会很吃内存)
  14. * @param updatetime <DATETIME>: 持仓证券净值更新时间,忽略时跑全历史
  15. *
  16. * @return <TABLE>: portfolio_id, price_date, ret, nav
  17. *
  18. * Example:calPortfolioNAV([143109, 145041]);
  19. * calPortfolioNAV([143109, 145041], 2024.10.28);
  20. */
  21. def calPortfolioNAV(portfolio_ids, updatetime=1900.01.01) {
  22. // portfolio_ids=[364743, 364744];
  23. // updatetime=1900.01.01;
  24. port_info = get_portfolio_list_by_fund_nav_updatetime(portfolio_ids, updatetime, true);
  25. tb_nav = cal_portfolio_nav(port_info);
  26. return tb_nav;
  27. }
  28. /*
  29. * 计算组合历史收益和指标(不存数据库)
  30. *
  31. * @param navs <TABLE>: NEED COLUMNS portfolio_id, price_date, ret, nav
  32. *
  33. * @return <DICTIONARY>:
  34. *
  35. * Example:calPortfolioPerformance(calPortfolioNAV([143109, 145041]));
  36. */
  37. def calPortfolioPerformance(navs) {
  38. if(navs.isVoid() || navs.size() == 0) return;
  39. tb_navs = navs;
  40. tb_navs.rename!(['portfolio_id'], ['entity_id']);
  41. port_ids = EXEC DISTINCT entity_id from tb_navs;
  42. port_info = get_entity_info('PF', port_ids);
  43. // 这个函数会根据情况加入成立日当月的初始净值,比直接用navs表中可能带的ret更方便
  44. tb_navs.rename!('nav', 'cumulative_nav');
  45. tb_month_ret = cal_monthly_returns_by_nav(port_info, tb_navs);
  46. tb_month_ret.rename!('cumulative_nav', 'nav');
  47. indicators = cal_monthly_indicators('PF', 'PBI', tb_month_ret);
  48. return indicators;
  49. }
  50. /*
  51. * 计算组合净值并存入数据库
  52. *
  53. * TODO: release 时改变同步目标表为正式表
  54. */
  55. def cal_and_save_portfolio_nav(cal_portfolio_info, is_save_local) {
  56. rt = '';
  57. // 准备类似MySQL结构的数据表
  58. tb_portfolio_nav = create_entity_nav();
  59. // 分批跑
  60. i = 0;
  61. batch_size = 1000;
  62. all_portfolio_id = EXEC DISTINCT portfolio_id FROM cal_portfolio_info;
  63. do { // 先把净值算出来存入数据库,落袋为安
  64. portfolio_info = SELECT * FROM cal_portfolio_info
  65. WHERE portfolio_id IN all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)];
  66. if(portfolio_info.isVoid() || portfolio_info.size() == 0) break;
  67. // 30 sec
  68. tb_ret = cal_portfolio_nav(portfolio_info);
  69. INSERT INTO tb_portfolio_nav SELECT portfolio_id$STRING, price_date, nav FROM tb_ret;
  70. i += batch_size;
  71. } while (i <= cal_portfolio_info.size());
  72. if(! tb_portfolio_nav.isVoid() && tb_portfolio_nav.size() > 0) {
  73. // save data to MySQL (12 sec)
  74. try {
  75. tb_portfolio_nav.rename!('entity_id', 'portfolio_id');
  76. save_and_sync(tb_portfolio_nav, 'raw_db.pf_portfolio_nav', 'raw_db.pf_portfolio_nav');
  77. // 数据初始化时将指标存入本地
  78. if(is_save_local == true) {
  79. save_table(tb_portfolio_nav, 'pfdb.pf_portfolio_nav', false);
  80. }
  81. } catch(ex) {
  82. //TODO: Log errors
  83. rt = ex;
  84. }
  85. }
  86. return rt;
  87. }
  88. /*
  89. * 计算组合标准指标并存入数据库
  90. *
  91. * TODO: release 时改变同步目标表为正式表
  92. */
  93. def cal_and_save_portfolio_indicators(cal_portfolio_info, is_save_local) {
  94. rt = '';
  95. // 准备类似MySQL结构的数据表
  96. tb_portfolio_performance = create_entity_performance(true);
  97. tb_portfolio_indicator = create_entity_indicator(true);
  98. tb_portfolio_risk_stats = create_entity_risk_stats(true);
  99. tb_portfolio_riskadjret_stats = create_entity_riskadjret_stats(true);
  100. tb_portfolio_style_stats = create_entity_style_stats(true);
  101. tb_portfolio_performance_weekly = create_entity_performance_weekly(true);
  102. tb_portfolio_latest_performance = create_entity_latest_performance(true);
  103. // 分批跑
  104. i = 0;
  105. batch_size = 1000;
  106. all_portfolio_id = EXEC DISTINCT portfolio_id FROM cal_portfolio_info;
  107. do {
  108. cal_port = SELECT * FROM cal_portfolio_info
  109. WHERE portfolio_id IN all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)];
  110. if(cal_port.isVoid() || cal_port.size() == 0) break;
  111. // 取数据库月度净值及前值 5 sec
  112. s_json = (SELECT portfolio_id, 1900.01.01 AS price_date FROM cal_port GROUP BY portfolio_id).rename!('portfolio_id', 'sec_id').toStdJson();
  113. tb_monthly_nav = get_nav_for_return_calculation('PF', 'm', s_json);
  114. // 把 portfolio id 字段从字符串换回整型,不然后面Join table的时候会出错
  115. v_portfolio_id = tb_monthly_nav.sec_id$INT;
  116. tb_monthly_nav.replaceColumn!('sec_id', v_portfolio_id);
  117. tb_monthly_nav.dropColumns!('nav').rename!(['sec_id', 'cumulative_nav'], ['portfolio_id', 'nav']);
  118. // 计算各标准指标
  119. indicators = calPortfolioPerformance(tb_monthly_nav);
  120. // 仿照MySQL的表结构准备好记录 (1s)
  121. port_info = (SELECT portfolio_id, start_cal_date.min() AS price_date FROM cal_port GROUP BY portfolio_id).rename!('portfolio_id', 'entity_id');
  122. generate_entity_performance(port_info, indicators, true, tb_portfolio_performance);
  123. generate_entity_indicator(port_info, indicators, true, tb_portfolio_indicator);
  124. generate_entity_risk_stats(port_info, indicators, true, tb_portfolio_risk_stats);
  125. generate_entity_riskadjret_stats(port_info, indicators, true, tb_portfolio_riskadjret_stats);
  126. generate_entity_style_stats(port_info, indicators, true, tb_portfolio_style_stats);
  127. // 计算周收益 (49s)
  128. port_info = SELECT * FROM ej(port_info, get_entity_info('PF', all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)]), 'entity_id')
  129. rets_w = cal_weekly_returns('PF', port_info);
  130. if(! rets_w.isVoid() && rets_w.size() > 0) {
  131. // 把 portfolio id 字段从字符串换回整型,不然后面Join table的时候会出错
  132. v_portfolio_id = rets_w.entity_id$INT;
  133. rets_w.replaceColumn!('entity_id', v_portfolio_id);
  134. generate_entity_performance_weekly(port_info, rets_w, true, tb_portfolio_performance_weekly);
  135. }
  136. // 计算最新收益 (23s)
  137. perf_latest = cal_latest_performance('PF', port_info, true);
  138. if(! perf_latest.isVoid() && perf_latest.size() > 0) {
  139. generate_entity_latest_performance(port_info, perf_latest, true, tb_portfolio_latest_performance);
  140. }
  141. i += batch_size;
  142. } while (i <= cal_portfolio_info.size());
  143. if(! tb_portfolio_performance.isVoid() && tb_portfolio_performance.size() > 0) {
  144. // save data to MySQL
  145. try {
  146. chg_columns_for_mysql(tb_portfolio_performance, 'portfolio_id');
  147. save_and_sync(tb_portfolio_performance, 'raw_db.pf_portfolio_performance', 'raw_db.pf_portfolio_performance');
  148. chg_columns_for_mysql(tb_portfolio_indicator, 'portfolio_id');
  149. save_and_sync(tb_portfolio_indicator, 'raw_db.pf_portfolio_indicator', 'raw_db.pf_portfolio_indicator');
  150. chg_columns_for_mysql(tb_portfolio_risk_stats, 'portfolio_id');
  151. save_and_sync(tb_portfolio_risk_stats, 'raw_db.pf_portfolio_risk_stats', 'raw_db.pf_portfolio_risk_stats');
  152. chg_columns_for_mysql(tb_portfolio_riskadjret_stats, 'portfolio_id');
  153. save_and_sync(tb_portfolio_riskadjret_stats, 'raw_db.pf_portfolio_riskadjret_stats', 'raw_db.pf_portfolio_riskadjret_stats');
  154. chg_columns_for_mysql(tb_portfolio_style_stats, 'portfolio_id');
  155. save_and_sync(tb_portfolio_style_stats, 'raw_db.pf_portfolio_style_stats', 'raw_db.pf_portfolio_style_stats');
  156. save_and_sync(tb_portfolio_performance_weekly, 'raw_db.pf_portfolio_performance_weekly', 'raw_db.pf_portfolio_performance_weekly');
  157. save_and_sync(tb_portfolio_latest_performance, 'raw_db.pf_portfolio_latest_performance', 'raw_db.pf_portfolio_latest_performance');
  158. // 数据初始化时将指标存入本地
  159. if(is_save_local == true) {
  160. save_table(tb_portfolio_performance, 'pfdb.pf_portfolio_performance', false);
  161. save_table(tb_portfolio_indicator, 'pfdb.pf_portfolio_indicator', false);
  162. save_table(tb_portfolio_risk_stats, 'pfdb.pf_portfolio_risk_stats', false);
  163. save_table(tb_portfolio_riskadjret_stats, 'pfdb.pf_portfolio_riskadjret_stats', false);
  164. save_table(tb_portfolio_style_stats, 'pfdb.pf_portfolio_style_stats', false);
  165. save_table(tb_portfolio_performance_weekly, 'pfdb.pf_portfolio_performance_weekly', false);
  166. save_table(tb_portfolio_latest_performance, 'pfdb.pf_portfolio_latest_performance', false);
  167. }
  168. } catch(ex) {
  169. //TODO: Log errors
  170. rt = ex;
  171. }
  172. }
  173. return rt;
  174. }
  175. /*
  176. * [定时任务]批量计算组合净值、收益及指标
  177. *
  178. * @param updatetime <DATETIME>: 持仓证券净值更新时间,忽略或传入1989.01.01及更早的日期被认为在做数据初始化
  179. *
  180. *
  181. * Example: CalPortfolioPerformanceTask(2024.10.28);
  182. * CalPortfolioPerformanceTask(1989.01.01); -- 【初始化专用】 (45min)
  183. */
  184. def CalPortfolioPerformanceTask(updatetime) {
  185. rt = '';
  186. // 3 min
  187. tb_cal_ports = get_portfolio_list_by_fund_nav_updatetime(NULL, updatetime, true);
  188. if(tb_cal_ports.isVoid() || tb_cal_ports.size() == 0) return;
  189. is_save_local = iif(updatetime <= get_ini_data_const()['date'], true, false);
  190. // 26 min
  191. rt = cal_and_save_portfolio_nav(tb_cal_ports, is_save_local);
  192. // 9 min
  193. rt = rt + '; ' + cal_and_save_portfolio_indicators(tb_cal_ports, is_save_local);
  194. return rt;
  195. }
  196. /*
  197. * 批量计算BFI因子净值
  198. *
  199. * Example: cal_and_save_factor_nav(2024.11.15, false);
  200. * cal_and_save_factor_nav(1989.01.01, true);
  201. */
  202. def cal_and_save_factor_nav(updatetime, is_save_local) {
  203. ret = ''
  204. // 根据成分指数净值更新日期,取有影响的因子
  205. tb_cal_factors = get_bfi_factor_list_by_index_nav_updatetime(NULL, updatetime, true);
  206. if(tb_cal_factors.isVoid() || tb_cal_factors.size() == 0) return;
  207. t_factor_value = table(100:0, ['factor_id', 'price_date', 'factor_value'], [SYMBOL, DATE, DOUBLE]);
  208. // 因子个数有限,用循环更简便
  209. for(factor in tb_cal_factors) {
  210. v_factor_id = array(STRING, 0).append!(factor.factor_id);
  211. // 取因子成分指数
  212. tb_holdings = get_fixed_weight_portfolio_holding('FA', v_factor_id);
  213. UPDATE tb_holdings SET first_cal_date = first_cal_date, latest_cal_date = latest_cal_date
  214. FROM ej(tb_holdings, tb_cal_factors, 'entity_id', 'factor_id');
  215. s_json = (SELECT sec_id, first_cal_date.min() AS price_date FROM tb_holdings GROUP BY sec_id).toStdJson();
  216. // 取含前值的成分指数点位
  217. tb_nav = get_nav_for_return_calculation('MI', 'd', s_json).sortBy!(['sec_id', 'price_date'], [1, 1]);
  218. // 计算每期收益
  219. UPDATE tb_nav SET ret = cumulative_nav.ratios() - 1 CONTEXT BY sec_id;
  220. t = SELECT h.entity_id, n.price_date, h.sec_id, n.ret, h.weight/100 AS weight
  221. FROM tb_holdings AS h
  222. INNER JOIN tb_nav AS n ON h.sec_id = n.sec_id
  223. ORDER BY h.entity_id, h.sec_id, n.price_date;
  224. t_factor = SELECT factor_id AS entity_id, first_cal_date, latest_cal_date FROM tb_cal_factors WHERE factor_id = factor.factor_id;
  225. t_tmp = cal_nav_by_return('FA', t_factor, t);
  226. if(!t_tmp.isVoid() && t_tmp.size() > 0) {
  227. INSERT INTO t_factor_value
  228. SELECT entity_id AS factor_id, price_date, nav AS factor_value FROM cal_nav_by_return('FA', t_factor, t);
  229. }
  230. }
  231. if(! t_factor_value.isVoid() && t_factor_value.size() > 0) {
  232. save_and_sync(t_factor_value, 'raw_db.cm_factor_value', 'raw_db.cm_factor_value');
  233. if(is_save_local == true) {
  234. save_table(t_factor_value, 'pfdb.cm_factor_value', false);
  235. }
  236. }
  237. }