task_weeklyPerformnce.dos 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. module fundit::task_weeklyPerformance
  2. use fundit::sqlUtilities;
  3. use fundit::operationDataPuller;
  4. use fundit::performanceDataPuller;
  5. use fundit::indicatorCalculator;
  6. use fundit::rbsaCalculator;
  7. use fundit::bfiMatcher;
  8. use fundit::dataSaver;
  9. /*
  10. * 根据收益更新日期计算 RBSA
  11. *
  12. * @param entityType <STRING>: MF, HF, PF (MF和HF等效)
  13. *
  14. * TODO: 是否要 monthly 运行,并写入 history 表?
  15. * portfolio 未测试
  16. *
  17. * Example: CalEntityRBSATask('MF', ['MF00003PW1'], 2024.10.14T10:00:00);
  18. * CalEntityRBSATask('MF', NULL, 2024.11.25T10:00:00);
  19. * CalEntityRBSATask('PF', NULL, 2024.11.25T10:00:00);
  20. */
  21. def CalEntityRBSATask(entityType, entityIds, updateTime) {
  22. // entityType = 'MF'
  23. //entityIds = NULL
  24. //updateTime = 2024.10.14T10:00:00
  25. t_cal = get_entity_list_by_latest_return_updatetime(entityType, entityIds, updateTime, true);
  26. window = 48;
  27. step = 13;
  28. if(t_cal.isVoid() || t_cal.size() == 0) return;
  29. d_rbsa = get_rbsa_index();
  30. // 拿到所有指数ID
  31. v_index = d_rbsa.values().flatten().distinct();
  32. // 因为用来做基准指数的可能是指数、因子、基金等等任何时间序列数据,所以不用填 entity_type
  33. t_index_ret = get_entity_return(NULL, v_index, 'w', t_cal.price_date.min().temporalAdd(-window, 'w'), today(), true);
  34. i = 0;
  35. batch_size = 200;
  36. total_cnt = t_cal.size();
  37. do {
  38. tb_result = table(1000:0,
  39. ["entity_id", "asset_type_id", "index_id", "effective_date", "level", "alternative_id", "weighting"],
  40. [iif(entityType=='PF', INT, STRING), STRING, STRING, STRING, INT, STRING, DOUBLE]);
  41. t = t_cal[i: min(total_cnt, i+batch_size)];
  42. // 起始日期是最早更新的净值日期再向前推一个时间窗口
  43. s_json = (SELECT entity_id, price_date.temporalAdd(-window, 'w') AS price_date FROM t).toStdJson();
  44. t_entity_ret = get_entity_ret_by_date(entityType, s_json, 'w', true);
  45. if(entityType == 'PF') {
  46. v_entity_id = t_entity_ret.entity_id$INT;
  47. t_entity_ret.replaceColumn!('entity_id', v_entity_id);
  48. }
  49. for(entity in t) {
  50. entity_ret = SELECT * FROM t_entity_ret WHERE entity_id = entity.entity_id;
  51. for(asset_type in d_rbsa.keys()) {
  52. index_ret = SELECT entity_id, price_date, ret FROM t_index_ret WHERE entity_id IN d_rbsa[asset_type] AND price_date IS NOT NULL;
  53. // 起始日期是最早更新日期再向前推一个时间窗口
  54. res = cal_entity_RBSA(entityType, entity_ret, index_ret, 'w',
  55. entity.price_date.temporalAdd(-window, 'w')[0], today(), true, window, step);
  56. if(res.isVoid() || res.size() == 0) continue;
  57. // 每日任务只负责更新最新的rbsa结果
  58. latest_date = (EXEC price_date.max() AS price_date FROM res)[0];
  59. tb_result.tableInsert(SELECT entity_id, asset_type, index_id, price_date, level, alternative_id, weights
  60. FROM res WHERE price_date = latest_date);
  61. }
  62. }
  63. if(entityType IN ['MF', 'HF'])
  64. save_and_sync(tb_result, 'raw_db.pf_fund_rbsa_breakdown', 'raw_db.pf_fund_rbsa_breakdown');
  65. else
  66. save_and_sync(tb_result, 'raw_db.pf_portfolio_rbsa_breakdown', 'raw_db.pf_portfolio_rbsa_breakdown');
  67. i += batch_size;
  68. } while (i < total_cnt);
  69. }
  70. /*
  71. * [定时任务] 计算基金和组合的BFI
  72. *
  73. * @param entityType <STRING>: MF, HF, PF
  74. *
  75. *
  76. * TODO: max_r2 表在哪里被用到了?应该和基金推荐有关系
  77. *
  78. * Example: MatchEntityBFITask('MF', 2024.11.20);
  79. */
  80. def MatchEntityBFITask(entityType, date) {
  81. //entityType = 'MF'
  82. //date = 2024.11.20
  83. rt = '';
  84. if(find(['HF', 'MF', 'PF'], entityType) < 0) return null;
  85. // 取有最新周收益变动的基金列表 (1s)
  86. tb_cal_entity = get_entity_list_by_weekly_return_updatetime(entityType, NULL, date, true);
  87. if(tb_cal_entity.isVoid() || tb_cal_entity.size() == 0 ) return;
  88. i = 0;
  89. size = tb_cal_entity.size();
  90. batch_size = 1000;
  91. do {
  92. // 4 min per 1000 funds or 2 min per 1000 portfolios
  93. coe = cal_entity_index_coe(entityType, tb_cal_entity[i : min(size, i+batch_size)]);
  94. if(coe.isVoid() || coe.size() == 0) continue;
  95. entity_info = get_entity_info(entityType, tb_cal_entity[i : min(size, i+batch_size)].entity_id);
  96. bfi_raw = match_entity_bfi(entityType, entity_info, coe);
  97. // 先存到数据库,落袋为安
  98. try {
  99. // 筛掉 correlation 绝对值不够阈值的记录
  100. t_coe = SELECT entity_id, end_date, index_id,
  101. iif(coe_1y.abs() < get_min_threshold('correlation'), double(NULL), coe_1y) AS coe_1y,
  102. iif(coe_3y.abs() < get_min_threshold('correlation'), double(NULL), coe_3y) AS coe_3y,
  103. iif(coe_5y.abs() < get_min_threshold('correlation'), double(NULL), coe_5y) AS coe_5y,
  104. info_ratio_1y, info_ratio_3y, info_ratio_5y, t_value_1y, t_value_3y, t_value_5y, beta_1y, beta_3y, beta_5y
  105. FROM coe;
  106. DELETE FROM t_coe WHERE coe_1y IS NULL AND coe_3y IS NULL AND coe_5y IS NULL;
  107. // 候选因子
  108. t_bfi_candidates = SELECT entity_id, end_date, index_id AS factor_id, coe_1y AS coe, coe_1y.square() AS r2, 'w' AS performance_flag, t_value_1y, beta_1y
  109. FROM t_coe WHERE index_id LIKE 'FA%';
  110. chg_columns_for_mysql(t_coe, iif(entityType == 'PF', 'portfolio_id', 'fund_id'));
  111. // 只有基金需要存 index_coe 表
  112. if(entityType IN ['MF', 'HF']) save_and_sync(t_coe, 'raw_db.pf_fund_index_coe', );
  113. // 所有的 factors 存到 xxx_factor_bfi 表;NOTE: Java 把所有 factor 的数据都存起来,这里只存 correlation 达标的记录 (反正这个表没啥用?)
  114. chg_columns_for_mysql(t_bfi_candidates, iif(entityType == 'PF', 'portfolio_id', 'fund_id'));
  115. save_and_sync(t_bfi_candidates, iif(entityType == 'PF', 'raw_db.pf_portfolio_factor_bfi', 'raw_db.cm_fund_factor_bfi'), );
  116. if(bfi_raw.isVoid() || bfi_raw.size() == 0) continue;
  117. // 有效因子
  118. t_bfi = SELECT entity_id, end_date, factor_id, coe_1y AS coe, r2, performance_flag, t_value_1y, beta_1y
  119. FROM bfi_raw ORDER BY entity_id, end_date, r2 DESC;
  120. // 最大R2因子及所有有效因子标签
  121. t_max_r2 = SELECT entity_id, factor_id.first() AS factor_id, end_date,
  122. string(NULL) AS performance_flag, coe.first() AS coe, r2.first() AS r2, concat(factor_name, ",") AS rz_portrait
  123. FROM ej(t_bfi, get_bfi_index_list(), 'factor_id')
  124. GROUP BY entity_id, end_date;
  125. // 有效 factors 存到 xxx_factor_bfi_by_category_group 表
  126. chg_columns_for_mysql(t_bfi, iif(entityType == 'PF', 'portfolio_id', 'fund_id'));
  127. save_and_sync(t_bfi, iif(entityType == 'PF', 'raw_db.pf_portfolio_factor_bfi_by_category_group', 'raw_db.pf_fund_factor_bfi_by_category_group'), );
  128. // 有效因子中 R2 最大的因子存 xxx_max_r2
  129. chg_columns_for_mysql(t_max_r2, iif(entityType == 'PF', 'portfolio_id', 'fund_id'));
  130. save_and_sync(t_max_r2, iif(entityType == 'PF', 'raw_db.pf_portfolio_factor_bfi_max_r2', 'raw_db.pf_fund_factor_bfi_by_category_group_max_r2'), );
  131. } catch (ex) {
  132. //TODO: Log errors
  133. rt += ex;
  134. }
  135. i = i + batch_size;
  136. } while (i<size)
  137. return rt;
  138. }
  139. /*
  140. * [定时任务] 计算BFI指标并存入数据库
  141. *
  142. * @param entityType <STRING>: 'MF', 'HF', 'PF' (MF和HF等效)
  143. * @param date <DATETIME>: BFI更新时间, 为空时缺省为当前时间的前1天;为1989.01.01或更早日期时代表初始化,指标会被存入本地数据库
  144. *
  145. *
  146. * Example: calEntityBfiIndicatorTask('MF', 2024.10.28);
  147. * calEntityBfiIndicatorTask('PF', 2024.10.28);
  148. */
  149. def calEntityBfiIndicatorTask(entityType, date) {
  150. // entityType = 'MF'
  151. // date = 2024.10.01
  152. rt = '';
  153. if(!(entityType IN ['MF', 'HF', 'PF'])) return null;
  154. very_old_day = 1900.01.01;
  155. if(date.isNothing() || date.isNull())
  156. end_day = temporalAdd(now(), -1d);
  157. else
  158. end_day = date;
  159. // 1989.01.01及以前的日期被认为从本地读数据
  160. isFromMySQL = iif(end_day <= 1989.01.01, false, true);
  161. // 取有最新bfi变动的基金列表 (1s)
  162. tb_cal_entities = get_entity_bfi_factors(entityType, NULL, very_old_day.month(), today().month(), end_day);
  163. if(tb_cal_entities.isVoid() || tb_cal_entities.size() == 0 ) return;
  164. v_uniq_entity_id = EXEC DISTINCT entity_id FROM tb_cal_entities;
  165. // 按照 MySQL 建好各表
  166. tb_bfi_indicator = create_entity_bfi_indicator(iif(entityType=='PF', true, false));
  167. // 分批跑
  168. i = 0;
  169. batch_size = 100;
  170. do {
  171. entities = SELECT * FROM tb_cal_entities WHERE entity_id IN v_uniq_entity_id[i : min(v_uniq_entity_id.size(), i+batch_size)];
  172. if(entities.isVoid() || entities.size() == 0) break;
  173. // 200ms
  174. entity_info = SELECT entity_id, end_date.temporalParse('yyyy-MM') AS end_date, inception_date, factor_id AS benchmark_id, ini_value
  175. FROM ej(entities, get_entity_info(entityType, entities.entity_id), 'entity_id');
  176. // 取月收益 (12s)
  177. rets = get_monthly_ret(entityType, entity_info.entity_id, very_old_day, entity_info.end_date.max().temporalFormat('yyyy-MM-dd').temporalParse('yyyy-MM-dd').monthEnd(), isFromMySQL);
  178. // 把 yyyy-MM 格式的 end_date 改成 dolphin 的 MONTH
  179. v_end_date = rets.end_date.temporalParse('yyyy-MM');
  180. rets.replaceColumn!('end_date', v_end_date);
  181. if(!rets.isVoid() && rets.size() > 0) {
  182. // 计算月度指标 (5s)
  183. indicators = cal_monthly_indicators(entityType, 'BFI', rets);
  184. // 仿照MySQL的表结构准备好记录 (1s)
  185. generate_entity_bfi_indicator(entity_info, indicators, true, tb_bfi_indicator);
  186. }
  187. i += batch_size;
  188. } while (i <= v_uniq_entity_id.size());
  189. if(! tb_bfi_indicator.isVoid() && tb_bfi_indicator.size() > 0) {
  190. // save data to MySQL
  191. try {
  192. t_desc = get_bfi_indicator_table_description(entityType);
  193. chg_columns_for_mysql(tb_bfi_indicator, t_desc.sec_id_col[0]);
  194. db_name = t_desc.table_name[0].split('.')[0];
  195. save_and_sync(tb_bfi_indicator, t_desc.table_name[0].strReplace(db_name, 'raw_db'), t_desc.table_name[0].strReplace(db_name, 'raw_db'));
  196. // 数据初始化时将指标存入本地,做排名之用
  197. if(end_day <= get_ini_data_const()['date'])
  198. save_table(tb_bfi_indicator, t_desc.table_name[0], false);
  199. } catch(ex) {
  200. //TODO: Log errors
  201. rt = ex;
  202. }
  203. }
  204. return rt;
  205. }