task_portfolioPerformance.dos 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. module fundit::task_portfolioPerformance
  2. use fundit::operationDataPuller;
  3. use fundit::performanceDataPuller;
  4. use fundit::portfolioDataPuller;
  5. use fundit::dataSaver;
  6. use fundit::navCalculator;
  7. use fundit::returnCalculator;
  8. use fundit::indicatorCalculator;
  9. /*
  10. * 计算组合历史净值(不存数据库)
  11. *
  12. * @param portfolio_ids <STRING|VECTOR>: 组合IDS,为空时跑全集(但不建议,因为可能会很吃内存)
  13. * @param updatetime <DATETIME>: 持仓证券净值更新时间,忽略时跑全历史
  14. *
  15. * @return <TABLE>: portfolio_id, price_date, ret, nav
  16. *
  17. * Example:calPortfolioNAV([143109, 145041]);
  18. * calPortfolioNAV([143109, 145041], 2024.10.28);
  19. */
  20. def calPortfolioNAV(portfolio_ids, updatetime=1900.01.01) {
  21. // portfolio_ids=[364743, 364744];
  22. // updatetime=1900.01.01;
  23. port_info = get_portfolio_list_by_fund_nav_updatetime(portfolio_ids, updatetime, true);
  24. tb_nav = cal_portfolio_nav(port_info);
  25. return tb_nav;
  26. }
  27. /*
  28. * 计算组合历史收益和指标(不存数据库)
  29. *
  30. * @param navs <TABLE>: NEED COLUMNS portfolio_id, price_date, ret, nav
  31. *
  32. * @return <DICTIONARY>:
  33. *
  34. * Example:calPortfolioPerformance(calPortfolioNAV([143109, 145041]));
  35. */
  36. def calPortfolioPerformance(navs) {
  37. if(navs.isVoid() || navs.size() == 0) return;
  38. tb_navs = navs;
  39. tb_navs.rename!(['portfolio_id'], ['entity_id']);
  40. port_ids = EXEC DISTINCT entity_id from tb_navs;
  41. port_info = get_entity_info('PF', port_ids);
  42. // 这个函数会根据情况加入成立日当月的初始净值,比直接用navs表中可能带的ret更方便
  43. tb_navs.rename!('nav', 'cumulative_nav');
  44. tb_month_ret = cal_monthly_returns_by_nav(port_info, tb_navs);
  45. tb_month_ret.rename!('cumulative_nav', 'nav');
  46. indicators = cal_monthly_indicators('PF', 'PBI', tb_month_ret);
  47. return indicators;
  48. }
  49. /*
  50. * 计算组合净值并存入数据库
  51. *
  52. * TODO: release 时改变同步目标表为正式表
  53. */
  54. def cal_and_save_portfolio_nav(cal_portfolio_info) {
  55. rt = '';
  56. // 准备类似MySQL结构的数据表
  57. tb_portfolio_nav = create_entity_nav();
  58. // 分批跑
  59. i = 0;
  60. batch_size = 1000;
  61. all_portfolio_id = EXEC DISTINCT portfolio_id FROM cal_portfolio_info;
  62. do { // 先把净值算出来存入数据库,落袋为安
  63. portfolio_info = SELECT * FROM cal_portfolio_info
  64. WHERE portfolio_id IN all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)];
  65. if(portfolio_info.isVoid() || portfolio_info.size() == 0) break;
  66. // 30 sec
  67. tb_ret = cal_portfolio_nav(portfolio_info);
  68. INSERT INTO tb_portfolio_nav SELECT portfolio_id$STRING, price_date, nav FROM tb_ret;
  69. i += batch_size;
  70. } while (i <= cal_portfolio_info.size());
  71. if(! tb_portfolio_nav.isVoid() && tb_portfolio_nav.size() > 0) {
  72. // save data to MySQL (12 sec)
  73. try {
  74. tb_portfolio_nav.rename!('entity_id', 'portfolio_id');
  75. save_and_sync(tb_portfolio_nav, 'raw_db.pf_portfolio_nav', 'raw_db.pf_portfolio_nav');
  76. } catch(ex) {
  77. //TODO: Log errors
  78. rt = ex;
  79. }
  80. }
  81. return rt;
  82. }
  83. /*
  84. * 计算组合标准指标并存入数据库
  85. *
  86. * TODO: release 时改变同步目标表为正式表
  87. */
  88. def cal_and_save_portfolio_indicators(cal_portfolio_info) {
  89. rt = '';
  90. // 准备类似MySQL结构的数据表
  91. tb_portfolio_performance = create_entity_performance(true);
  92. tb_portfolio_indicator = create_entity_indicator(true);
  93. tb_portfolio_risk_stats = create_entity_risk_stats(true);
  94. tb_portfolio_riskadjret_stats = create_entity_riskadjret_stats(true);
  95. tb_portfolio_style_stats = create_entity_style_stats(true);
  96. tb_portfolio_performance_weekly = create_entity_performance_weekly(true);
  97. tb_portfolio_latest_performance = create_entity_latest_performance(true);
  98. // 分批跑
  99. i = 0;
  100. batch_size = 1000;
  101. all_portfolio_id = EXEC DISTINCT portfolio_id FROM cal_portfolio_info;
  102. do {
  103. cal_port = SELECT * FROM cal_portfolio_info
  104. WHERE portfolio_id IN all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)];
  105. if(cal_port.isVoid() || cal_port.size() == 0) break;
  106. // 取数据库月度净值及前值 5 sec
  107. s_json = (SELECT portfolio_id, 1900.01.01 AS price_date FROM cal_port GROUP BY portfolio_id).rename!('portfolio_id', 'sec_id').toStdJson();
  108. tb_monthly_nav = get_nav_for_return_calculation('PF', 'm', s_json);
  109. // 把 portfolio id 字段从字符串换回整型,不然后面Join table的时候会出错
  110. v_portfolio_id = tb_monthly_nav.sec_id$INT;
  111. tb_monthly_nav.replaceColumn!('sec_id', v_portfolio_id);
  112. tb_monthly_nav.dropColumns!('nav').rename!(['sec_id', 'cumulative_nav'], ['portfolio_id', 'nav']);
  113. // 计算各标准指标
  114. indicators = calPortfolioPerformance(tb_monthly_nav);
  115. // 仿照MySQL的表结构准备好记录 (1s)
  116. port_info = (SELECT portfolio_id, start_cal_date.min() AS price_date FROM cal_port GROUP BY portfolio_id).rename!('portfolio_id', 'entity_id');
  117. generate_entity_performance(port_info, indicators, true, tb_portfolio_performance);
  118. generate_entity_indicator(port_info, indicators, true, tb_portfolio_indicator);
  119. generate_entity_risk_stats(port_info, indicators, true, tb_portfolio_risk_stats);
  120. generate_entity_riskadjret_stats(port_info, indicators, true, tb_portfolio_riskadjret_stats);
  121. generate_entity_style_stats(port_info, indicators, true, tb_portfolio_style_stats);
  122. // 计算周收益 (49s)
  123. port_info = SELECT * FROM ej(port_info, get_entity_info('PF', all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)]), 'entity_id')
  124. rets_w = cal_weekly_returns('PF', port_info);
  125. if(! rets_w.isVoid() && rets_w.size() > 0) {
  126. // 把 portfolio id 字段从字符串换回整型,不然后面Join table的时候会出错
  127. v_portfolio_id = rets_w.entity_id$INT;
  128. rets_w.replaceColumn!('entity_id', v_portfolio_id);
  129. generate_entity_performance_weekly(port_info, rets_w, true, tb_portfolio_performance_weekly);
  130. }
  131. // 计算最新收益 (23s)
  132. perf_latest = cal_latest_performance('PF', port_info, true);
  133. if(! perf_latest.isVoid() && perf_latest.size() > 0) {
  134. generate_entity_latest_performance(port_info, perf_latest, true, tb_portfolio_latest_performance);
  135. }
  136. i += batch_size;
  137. } while (i <= cal_portfolio_info.size());
  138. if(! tb_portfolio_performance.isVoid() && tb_portfolio_performance.size() > 0) {
  139. // save data to MySQL
  140. try {
  141. chg_columns_for_mysql(tb_portfolio_performance, 'portfolio_id');
  142. save_and_sync(tb_portfolio_performance, 'raw_db.pf_portfolio_performance', 'raw_db.pf_portfolio_performance');
  143. chg_columns_for_mysql(tb_portfolio_indicator, 'portfolio_id');
  144. save_and_sync(tb_portfolio_indicator, 'raw_db.pf_portfolio_indicator', 'raw_db.pf_portfolio_indicator');
  145. chg_columns_for_mysql(tb_portfolio_risk_stats, 'portfolio_id');
  146. save_and_sync(tb_portfolio_risk_stats, 'raw_db.pf_portfolio_risk_stats', 'raw_db.pf_portfolio_risk_stats');
  147. chg_columns_for_mysql(tb_portfolio_riskadjret_stats, 'portfolio_id');
  148. save_and_sync(tb_portfolio_riskadjret_stats, 'raw_db.pf_portfolio_riskadjret_stats', 'raw_db.pf_portfolio_riskadjret_stats');
  149. chg_columns_for_mysql(tb_portfolio_style_stats, 'portfolio_id');
  150. save_and_sync(tb_portfolio_style_stats, 'raw_db.pf_portfolio_style_stats', 'raw_db.pf_portfolio_style_stats');
  151. save_and_sync(tb_portfolio_performance_weekly, 'raw_db.pf_portfolio_performance_weekly', 'raw_db.pf_portfolio_performance_weekly');
  152. save_and_sync(tb_portfolio_latest_performance, 'raw_db.pf_portfolio_latest_performance', 'raw_db.pf_portfolio_latest_performance');
  153. } catch(ex) {
  154. //TODO: Log errors
  155. rt = ex;
  156. }
  157. }
  158. return rt;
  159. }
  160. /*
  161. * [定时任务]批量计算组合净值、收益及指标
  162. *
  163. * @param updatetime <DATETIME>: 持仓证券净值更新时间,忽略时跑全历史
  164. *
  165. *
  166. * Example: CalPortfolioPerformanceTask(2024.10.28);
  167. */
  168. def CalPortfolioPerformanceTask(updatetime=1900.01.01) {
  169. rt = '';
  170. // 3 min
  171. tb_cal_ports = get_portfolio_list_by_fund_nav_updatetime(NULL, updatetime, true);
  172. if(tb_cal_ports.isVoid() || tb_cal_ports.size() == 0) return;
  173. // 26 min
  174. rt = cal_and_save_portfolio_nav(tb_cal_ports);
  175. // 9 min
  176. rt = rt + '; ' + cal_and_save_portfolio_indicators(tb_cal_ports);
  177. return rt;
  178. }