task_weeklyPerformance.dos 14 KB


  1. module fundit::task_weeklyPerformance
  2. use fundit::sqlUtilities;
  3. use fundit::operationDataPuller;
  4. use fundit::performanceDataPuller;
  5. use fundit::indicatorCalculator;
  6. use fundit::rbsaCalculator;
  7. use fundit::navCalculator;
  8. use fundit::bfiMatcher;
  9. use fundit::dataSaver;
  10. /*
  11. * 根据收益更新日期计算 RBSA (1h for funds; for portfolios)
  12. *
  13. * @param entityType <STRING>: MF, HF, PF (MF和HF等效)
  14. *
  15. * TODO: 是否要 monthly 运行,并写入 history 表?
  16. * portfolio 未测试
  17. *
  18. * Example: CalEntityRBSATask('MF', ['MF00003PW1'], 2024.10.14T10:00:00);
  19. * CalEntityRBSATask('MF', NULL, 2025.02.20);
  20. * CalEntityRBSATask('PF', NULL, 2025.01.23);
  21. */
  22. def CalEntityRBSATask(entityType, entityIds, updateTime) {
  23. // entityType = 'MF'
  24. //entityIds = NULL
  25. //updateTime = 2025.03.21T10:00:00
  26. t_cal = get_entity_list_by_latest_return_updatetime(entityType, entityIds, updateTime, true);
  27. window = 48;
  28. step = 13;
  29. if(t_cal.isVoid() || t_cal.size() == 0) return;
  30. // 拿到所有指数ID
  31. t_asset_index_map = get_asset_type_index_mapping(NULL, NULL, NULL);
  32. if(t_asset_index_map.isVoid() || t_asset_index_map.size() == 0) return;
  33. v_asset = t_asset_index_map.asset_type_id.distinct();
  34. v_index = t_asset_index_map.index_id.distinct();
  35. // 因为用来做基准指数的可能是指数、因子、基金等等任何时间序列数据,所以不用填 entity_type
  36. t_index_ret = get_entity_return(NULL, v_index, 'w', t_cal.price_date.min().temporalAdd(-window, 'w'), today(), true);
  37. i = 0;
  38. batch_size = 200; // 1 hour total
  39. total_cnt = t_cal.size();
  40. do {
  41. tb_result = table(1000:0,
  42. ["entity_id", "asset_type_id", "index_id", "effective_date", "level", "alternative_id", "weighting"],
  43. [iif(entityType=='PF', INT, STRING), STRING, STRING, STRING, INT, STRING, DOUBLE]);
  44. t = t_cal[i: min(total_cnt, i+batch_size)];
  45. // 起始日期是最早更新的净值日期再向前推一个时间窗口+10(防止数据缺失的冗余)
  46. s_json = (SELECT entity_id, price_date.temporalAdd(-window-10, 'w') AS price_date FROM t).toStdJson();
  47. t_entity_ret = get_entity_ret_by_date(entityType, s_json, 'w', true);
  48. if(entityType == 'PF') {
  49. v_entity_id = t_entity_ret.entity_id$INT;
  50. t_entity_ret.replaceColumn!('entity_id', v_entity_id);
  51. }
  52. for(entity in t) {
  53. //entity=t[0]
  54. entity_ret = SELECT * FROM t_entity_ret WHERE entity_id = entity.entity_id;
  55. for(asset in v_asset) {
  56. //asset=v_asset[0]
  57. index_ret = SELECT entity_id, price_date, ret
  58. FROM t_index_ret r
  59. INNER JOIN t_asset_index_map m ON r.entity_id = m.index_id
  60. WHERE m.asset_type_id = asset
  61. AND r.price_date IS NOT NULL;
  62. // 起始日期是最早更新日期再向前推一个时间窗口
  63. res = cal_entity_RBSA(entityType, entity_ret, index_ret, 'w',
  64. entity.price_date.temporalAdd(-window-10, 'w')[0], today(), true, window, step);
  65. if(res.isVoid() || res.size() == 0) continue;
  66. // 每日任务只负责更新最新的rbsa结果
  67. latest_date = (EXEC price_date.max() AS price_date FROM res)[0];
  68. tb_result.tableInsert(SELECT entity_id, asset, index_id, price_date, level, alternative_id, weights
  69. FROM res WHERE price_date = latest_date);
  70. }
  71. }
  72. if(entityType IN ['MF', 'HF'])
  73. save_and_sync(tb_result, 'raw_db.pf_fund_rbsa_breakdown', 'raw_db.pf_fund_rbsa_breakdown', 'fund_id', 'effective_date');
  74. else
  75. save_and_sync(tb_result, 'raw_db.pf_portfolio_rbsa_breakdown', 'raw_db.pf_portfolio_rbsa_breakdown', 'portfolio_id', 'effective_date');
  76. i += batch_size;
  77. } while (i < total_cnt);
  78. }
  79. /*
  80. * [定时任务] 计算基金和组合的BFI
  81. *
  82. * @param entityType <STRING>: MF, HF, PF
  83. *
  84. *
  85. * TODO: max_r2 表在哪里被用到了?应该和基金推荐有关系
  86. *
  87. * Example: MatchEntityBFITask('MF', 2025.02.21);
  88. */
  89. def MatchEntityBFITask(entityType, date) {
  90. //entityType = 'MF'
  91. //date = 2025.02.21
  92. rt = '';
  93. if(find(['HF', 'MF', 'PF'], entityType) < 0) return null;
  94. // 取有最新周收益变动的基金列表 (1s)
  95. tb_cal_entity = get_entity_list_by_weekly_return_updatetime(entityType, NULL, date, true);
  96. if(tb_cal_entity.isVoid() || tb_cal_entity.size() == 0 ) return;
  97. i = 0;
  98. size = tb_cal_entity.size();
  99. batch_size = 100;
  100. do {
  101. t_tmp_entity = tb_cal_entity[i : min(size, i+batch_size)];
  102. i = i + batch_size;
  103. // 4 min per 1000 funds or 2 min per 1000 portfolios
  104. coe = cal_entity_index_coe(entityType, t_tmp_entity);
  105. if(coe.isVoid() || coe.size() == 0) continue;
  106. entity_info = get_entity_info(entityType, t_tmp_entity.entity_id);
  107. bfi_raw = match_entity_bfi(entityType, entity_info, coe);
  108. // 先存到数据库,落袋为安
  109. try {
  110. // 筛掉 correlation 绝对值不够阈值的记录
  111. t_coe = SELECT entity_id, end_date, index_id,
  112. iif(coe_1y.abs() < get_min_threshold('correlation'), double(NULL), coe_1y) AS coe_1y,
  113. iif(coe_3y.abs() < get_min_threshold('correlation'), double(NULL), coe_3y) AS coe_3y,
  114. iif(coe_5y.abs() < get_min_threshold('correlation'), double(NULL), coe_5y) AS coe_5y,
  115. t_value_1y, t_value_3y, t_value_5y, beta_1y, beta_3y, beta_5y
  116. FROM coe;
  117. DELETE FROM t_coe WHERE coe_1y IS NULL AND coe_3y IS NULL AND coe_5y IS NULL;
  118. // 候选因子
  119. t_bfi_candidates = SELECT entity_id, end_date, index_id AS factor_id, coe_1y AS coe, coe_1y.square() AS r2, 'w' AS performance_flag, t_value_1y, beta_1y
  120. FROM t_coe WHERE index_id LIKE 'FA%';
  121. chg_columns_for_mysql(t_coe, iif(entityType == 'PF', 'portfolio_id', 'fund_id'));
  122. // 只有基金需要存 index_coe 表
  123. if(entityType IN ['MF', 'HF']) save_and_sync(t_coe, 'raw_db.pf_fund_index_coe', , 'fund_id', 'end_date');
  124. // 所有的 factors 存到 xxx_factor_bfi 表;NOTE: Java 把所有 factor 的数据都存起来,这里只存 correlation 达标的记录 (反正这个表没啥用?)
  125. entity_id_col = iif(entityType == 'PF', 'portfolio_id', 'fund_id');
  126. chg_columns_for_mysql(t_bfi_candidates, entity_id_col);
  127. save_and_sync(t_bfi_candidates, iif(entityType == 'PF', 'raw_db.pf_portfolio_factor_bfi', 'raw_db.cm_fund_factor_bfi'), , entity_id_col, 'end_date');
  128. if(bfi_raw.isVoid() || bfi_raw.size() == 0) continue;
  129. // 有效因子
  130. t_bfi = SELECT entity_id, end_date, factor_id, coe_1y AS coe, r2, performance_flag, t_value_1y, beta_1y
  131. FROM bfi_raw ORDER BY entity_id, end_date, r2 DESC;
  132. // 最大R2因子及所有有效因子标签
  133. t_max_r2 = SELECT entity_id, factor_id.first() AS factor_id, end_date,
  134. string(NULL) AS performance_flag, coe.first() AS coe, r2.first() AS r2, concat(factor_name, ",") AS rz_portrait
  135. FROM ej(t_bfi, get_bfi_index_list(), 'factor_id')
  136. GROUP BY entity_id, end_date;
  137. // 有效 factors 存到 xxx_factor_bfi_by_category_group 表
  138. chg_columns_for_mysql(t_bfi, entity_id_col);
  139. save_and_sync(t_bfi, iif(entityType == 'PF', 'raw_db.pf_portfolio_factor_bfi_by_category_group', 'raw_db.pf_fund_factor_bfi_by_category_group'), , entity_id_col, 'end_date');
  140. // 有效因子中 R2 最大的因子存 xxx_max_r2
  141. chg_columns_for_mysql(t_max_r2, entity_id_col);
  142. save_and_sync(t_max_r2, iif(entityType == 'PF', 'raw_db.pf_portfolio_factor_bfi_max_r2', 'raw_db.pf_fund_factor_bfi_by_category_group_max_r2'), , entity_id_col, 'end_date');
  143. } catch (ex) {
  144. //TODO: Log errors
  145. rt += ex;
  146. }
  147. } while (i<size)
  148. return rt;
  149. }
  150. /*
  151. * [定时任务] 计算BFI指标并存入数据库
  152. *
  153. * @param entityType <STRING>: 'MF', 'HF', 'PF' (MF和HF等效)
  154. * @param date <DATETIME>: BFI更新时间, 为空时缺省为当前时间的前1天;为1989.01.01或更早日期时代表初始化,指标会被存入本地数据库
  155. *
  156. *
  157. * Example: calEntityBfiIndicatorTask('MF', 2024.10.28);
  158. * calEntityBfiIndicatorTask('PF', 2024.10.28);
  159. */
  160. def calEntityBfiIndicatorTask(entityType, date) {
  161. // entityType = 'MF'
  162. // date = 2024.10.01
  163. rt = '';
  164. if(!(entityType IN ['MF', 'HF', 'PF'])) return null;
  165. very_old_day = 1900.01.01;
  166. if(date.isNothing() || date.isNull())
  167. end_day = temporalAdd(now(), -1d);
  168. else
  169. end_day = date;
  170. // 1989.01.01及以前的日期被认为从本地读数据
  171. isFromMySQL = iif(end_day <= 1989.01.01, false, true);
  172. // 取有最新bfi变动的基金列表 (1s)
  173. tb_cal_entities = get_entity_bfi_factors(entityType, NULL, very_old_day.month(), today().month(), end_day);
  174. if(tb_cal_entities.isVoid() || tb_cal_entities.size() == 0 ) return;
  175. v_uniq_entity_id = EXEC DISTINCT entity_id FROM tb_cal_entities;
  176. // 按照 MySQL 建好各表
  177. tb_bfi_indicator = create_entity_bfi_indicator(iif(entityType=='PF', true, false));
  178. // 分批跑
  179. i = 0;
  180. batch_size = 100;
  181. do {
  182. entities = SELECT * FROM tb_cal_entities WHERE entity_id IN v_uniq_entity_id[i : min(v_uniq_entity_id.size(), i+batch_size)];
  183. if(entities.isVoid() || entities.size() == 0) break;
  184. // 200ms
  185. entity_info = SELECT entity_id, end_date.temporalParse('yyyy-MM') AS end_date, inception_date, factor_id AS benchmark_id, ini_value
  186. FROM ej(entities, get_entity_info(entityType, entities.entity_id), 'entity_id');
  187. // 取月收益 (12s)
  188. rets = get_monthly_ret(entityType, entity_info.entity_id, very_old_day, entity_info.end_date.max().temporalFormat('yyyy-MM-dd').temporalParse('yyyy-MM-dd').monthEnd(), isFromMySQL);
  189. // 把 yyyy-MM 格式的 end_date 改成 dolphin 的 MONTH
  190. v_end_date = rets.end_date.temporalParse('yyyy-MM');
  191. rets.replaceColumn!('end_date', v_end_date);
  192. if(!rets.isVoid() && rets.size() > 0) {
  193. // 计算月度指标 (5s)
  194. indicators = cal_monthly_indicators(entityType, 'BFI', rets);
  195. // 仿照MySQL的表结构准备好记录 (1s)
  196. generate_entity_bfi_indicator(entity_info, indicators, true, tb_bfi_indicator);
  197. }
  198. i += batch_size;
  199. } while (i <= v_uniq_entity_id.size());
  200. if(! tb_bfi_indicator.isVoid() && tb_bfi_indicator.size() > 0) {
  201. // save data to MySQL
  202. try {
  203. t_desc = get_bfi_indicator_table_description(entityType);
  204. chg_columns_for_mysql(tb_bfi_indicator, t_desc.sec_id_col[0]);
  205. db_name = t_desc.table_name[0].split('.')[0];
  206. save_and_sync(tb_bfi_indicator, t_desc.table_name[0].strReplace(db_name, 'raw_db'), t_desc.table_name[0].strReplace(db_name, 'raw_db'), t_desc.sec_id_col[0], 'end_date');
  207. // 数据初始化时将指标存入本地,做排名之用
  208. if(end_day <= get_ini_data_const()['date'])
  209. save_table(tb_bfi_indicator, t_desc.table_name[0], false);
  210. } catch(ex) {
  211. //TODO: Log errors
  212. rt = ex;
  213. }
  214. }
  215. return rt;
  216. }
  217. /*
  218. * 计算并存储基金经理和公司月度净值
  219. *
  220. * @return <TABLE>: [COLUMNS] entity_id, curve_type, strategy, end_date, price_date, ret, nav
  221. *
  222. * NOTE: 基本上和 cal_and_save_mc_monthly_nav 一样
  223. *
  224. */
  225. def cal_and_save_mc_weekly_nav(entity_type, entity_date, is_save_local) {
  226. rt = '';
  227. if( !(entity_type in ['PL', 'CO']) ) return rt;
  228. if(entity_date.isVoid() || entity_date.size() == 0) return rt;
  229. // 准备类似MySQL结构的数据表
  230. tb_entity_nav = create_mc_nav();
  231. // 暂时与 MySQL 保持一致,只计算公募,私募,公私募综合三条时间序列。未来可细化至公、私募+主策略
  232. d_curve_type = dict(INT, INT);
  233. d_curve_type[1] = 1; // 私募
  234. d_curve_type[4] = 2; // 公募
  235. d_curve_type[7] = -99; // 公私募综合
  236. // 分批跑
  237. i = 0;
  238. batch_size = 1000;
  239. all_entity_id = entity_date.entity_id.distinct();
  240. do { // 14 sec
  241. tb_entity = SELECT entity_id, effective_date FROM entity_date
  242. WHERE entity_id IN all_entity_id[i : min(all_entity_id.size(), i+batch_size)];
  243. if(tb_entity.isVoid() || tb_entity.size() == 0) break;
  244. s_json = tb_entity.toStdJson();
  245. t_ret = get_mc_average_return(entity_type, 'w', s_json, 0, 1, true);
  246. for(cur in d_curve_type.keys()) {
  247. //cur=7
  248. tmp = SELECT entity_id, cur AS curve_type, 0 AS strategy, effective_date, price_date, ret, incl_cal_cnt
  249. FROM t_ret WHERE raise_type = d_curve_type[cur] AND strategy = -99; // 目前只需要全策略
  250. // 计算周收益
  251. tb_nav = cal_mc_nav_by_return(entity_type, tmp, 'w');
  252. INSERT INTO tb_entity_nav
  253. SELECT entity_id, curve_type, strategy, effective_date AS year_week, price_date, nav, ret, incl_cal_cnt
  254. FROM ej(tb_nav, tmp, ['entity_id', 'curve_type', 'strategy', 'effective_date']);
  255. }
  256. i += batch_size;
  257. } while (i <= all_entity_id.size());
  258. if(! tb_entity_nav.isVoid() && tb_entity_nav.size() > 0) {
  259. // save data to MySQL (12 sec)
  260. try {
  261. entity_id_col = iif(entity_type == 'PL', 'manager_id', 'company_id');
  262. tb_entity_nav.rename!('entity_id', entity_id_col);
  263. save_and_sync(tb_entity_nav, iif(entity_type == 'PL', 'raw_db.manager_nav', 'raw_db.company_nav'), , entity_id_col, 'year_week');
  264. // 数据初始化时将指标存入本地
  265. if(is_save_local == true) {
  266. save_table(tb_entity_nav, iif(entity_type == 'PL', 'mfdb.manager_nav', 'mfdb.company_nav'), false);
  267. }
  268. } catch(ex) {
  269. //TODO: Log errors
  270. rt += ex;
  271. }
  272. }
  273. return rt;
  274. }
  275. /*
  276. * [定时任务]: 基金经理/公司周净值计算
  277. *
  278. * Example: CalMCWeeklyNavTask('PL', 2025.01.23);
  279. */
  280. def CalMCWeeklyNavTask(entityType, updatetime) {
  281. //updatetime = 2024.11.05;
  282. //entityType = 'CO';
  283. if(!(entityType IN ['PL', 'CO'])) return;
  284. is_save_local = iif(updatetime <= get_ini_data_const()['updatetime'], true, false);
  285. // 60 sec 简化起见,不区分curve_type, strategy; TODO: 性能能否优化?
  286. if(entityType == 'PL') {
  287. entity_date = get_manager_list_by_fund_updatetime(updatetime, 'w');
  288. entity_date.rename!('manager_id', 'entity_id');
  289. }
  290. else {
  291. entity_date = get_company_list_by_fund_updatetime(updatetime, 'w');
  292. entity_date.rename!('company_id', 'entity_id');
  293. }
  294. // 15 sec
  295. cal_and_save_mc_weekly_nav(entityType, entity_date, is_save_local);
  296. entity_date = null;
  297. }