task_fundPerformance.dos 19 KB


  1. module fundit::task_fundPerformance
  2. use fundit::sqlUtilities;
  3. use fundit::operationDataPuller;
  4. use fundit::performanceDataPuller;
  5. use fundit::dataSaver;
  6. use fundit::returnCalculator;
  7. use fundit::indicatorCalculator;
  8. use fundit::rbsaCalculator;
  9. use fundit::bfiMatcher;
  10. use fundit::ms_dataPuller;
  11. /*
  12. * [定时任务]:最新净值触发的业绩指标计算
  13. *
  14. * @param entityType <STRING>: 'MF', 'HF'...
  15. * @param date <DATETIME>: 净值更新时间, 为空时缺省为当前时间-1天;为1989.01.01或更早日期时代表初始化,指标会被存入本地数据库
  16. *
  17. * NOTE: 与Java不同的是当月indicator计算每日触发,不必等到Month-end production
  18. *
  19. * Example: calFundPerformanceTask('MF', 2024.10.28);
  20. * calFundPerformanceTask('MI', 2024.10.28);
  21. * calFundPerformanceTask('MF', get_ini_data_const()['date']); -- 【初始化数据专用】(70min)
  22. */
  23. def calFundPerformanceTask(entityType, date) {
  24. rt = '';
  25. if(!(entityType IN ['MF', 'HF', 'MI', 'FI'])) return null;
  26. if(date.isNothing() || date.isNull())
  27. end_day = temporalAdd(now(), -1d);
  28. else
  29. end_day = date;
  30. // 取有最新净值变动的基金列表 (1s)
  31. tb_cal_funds = get_entity_list_by_nav_updatetime(entityType, NULL, end_day, true);
  32. if(tb_cal_funds.isVoid() || tb_cal_funds.size() == 0 ) return;
  33. // 按照 MySQL 建好各表
  34. tb_fund_performance = create_entity_performance();
  35. tb_fund_indicator = create_entity_indicator();
  36. tb_fund_risk_stats = create_entity_risk_stats();
  37. tb_fund_riskadjret_stats = create_entity_riskadjret_stats();
  38. tb_fund_style_stats = create_entity_style_stats();
  39. tb_fund_performance_weekly = create_entity_performance_weekly();
  40. tb_fund_latest_performance = create_entity_latest_performance();
  41. // 分批跑
  42. i = 0;
  43. batch_size = 1000;
  44. do {
  45. funds = tb_cal_funds[i : min(tb_cal_funds.size(), i+batch_size)];
  46. if(funds.isVoid() || funds.size() == 0) break;
  47. // 200ms
  48. fund_info = SELECT entity_id, price_date, inception_date, benchmark_id, ini_value
  49. FROM ej(funds, get_entity_info(entityType, funds.entity_id), 'entity_id');
  50. // 计算月收益 (12s)
  51. rets = mix_monthly_returns(entityType, fund_info);
  52. if(!rets.isVoid() && rets.size() > 0) {
  53. // 计算月度指标 (56s)
  54. rets.rename!('cumulative_nav', 'nav');
  55. indicators = cal_monthly_indicators(entityType, 'PBI', rets);
  56. // 仿照MySQL的表结构准备好记录 (1s)
  57. generate_entity_performance(fund_info, indicators, true, tb_fund_performance);
  58. generate_entity_indicator(fund_info, indicators, true, tb_fund_indicator);
  59. generate_entity_risk_stats(fund_info, indicators, true, tb_fund_risk_stats);
  60. generate_entity_riskadjret_stats(fund_info, indicators, true, tb_fund_riskadjret_stats);
  61. generate_entity_style_stats(fund_info, indicators, true, tb_fund_style_stats);
  62. }
  63. // 计算周收益 (8s)
  64. rets_w = cal_weekly_returns(entityType, fund_info);
  65. if(! rets_w.isVoid() && rets_w.size() > 0) {
  66. generate_entity_performance_weekly(fund_info, rets_w, true, tb_fund_performance_weekly);
  67. }
  68. // 计算最新收益 (69s)
  69. perf_latest = cal_latest_performance(entityType, fund_info, true);
  70. if(! perf_latest.isVoid() && perf_latest.size() > 0) {
  71. generate_entity_latest_performance(fund_info, perf_latest, true, tb_fund_latest_performance);
  72. }
  73. i += batch_size;
  74. // } while (i < batch_size);
  75. } while (i <= tb_cal_funds.size());
  76. if(! tb_fund_performance.isVoid() && tb_fund_performance.size() > 0) {
  77. // save data to MySQL (13s)
  78. try {
  79. chg_columns_for_mysql(tb_fund_performance, 'fund_id');
  80. save_and_sync(tb_fund_performance, 'raw_db.fund_performance', 'raw_db.fund_performance');
  81. chg_columns_for_mysql(tb_fund_indicator, 'fund_id');
  82. save_and_sync(tb_fund_indicator, 'raw_db.fund_indicator', 'raw_db.fund_indicator');
  83. chg_columns_for_mysql(tb_fund_risk_stats, 'fund_id');
  84. // mfdb.fund_performance 表中 maxdrawdown_6m & maxdrawdown_ytd 是虚拟列,这里用数据列顺序强行写入真实列 6m_maxdrawdown & ytd_maxdrawdown (DolphinDB 不允许字段名以数字开头)
  85. save_and_sync(tb_fund_risk_stats, 'raw_db.fund_risk_stats', 'raw_db.fund_risk_stats');
  86. chg_columns_for_mysql(tb_fund_riskadjret_stats, 'fund_id');
  87. save_and_sync(tb_fund_riskadjret_stats, 'raw_db.fund_riskadjret_stats', 'raw_db.fund_riskadjret_stats');
  88. chg_columns_for_mysql(tb_fund_style_stats, 'fund_id');
  89. save_and_sync(tb_fund_style_stats, 'raw_db.fund_style_stats', 'raw_db.fund_style_stats');
  90. save_and_sync(tb_fund_performance_weekly, 'raw_db.fund_performance_weekly', 'raw_db.fund_performance_weekly');
  91. save_and_sync(tb_fund_latest_performance, 'raw_db.fund_latest_performance', 'raw_db.fund_latest_nav_performance');
  92. // 数据初始化时将指标存入本地
  93. if(end_day <= get_ini_data_const['date']) {
  94. save_table(tb_fund_performance, 'mfdb.fund_performance', false);
  95. save_table(tb_fund_indicator, 'mfdb.fund_indicator', false);
  96. save_table(tb_fund_risk_stats, 'mfdb.fund_risk_stats', false);
  97. save_table(tb_fund_riskadjret_stats, 'mfdb.fund_riskadjret_stats', false);
  98. save_table(tb_fund_style_stats, 'mfdb.fund_style_stats', false);
  99. save_table(tb_fund_performance_weekly, 'mfdb.fund_performance_weekly', false);
  100. save_table(tb_fund_latest_performance, 'mfdb.fund_latest_performance', false);
  101. }
  102. } catch(ex) {
  103. //TODO: Log errors
  104. rt = ex;
  105. }
  106. }
  107. return rt;
  108. }
  109. /*
  110. * [定时任务] 匹配BFI并存入数据库
  111. *
  112. *
  113. */
  114. def matchEntityBFI(entityType, date) {
  115. }
  116. /*
  117. * [定时任务] 计算BFI指标并存入数据库
  118. *
  119. * @param entityType <STRING>: 'MF', 'HF', 'PF'; 前两个是一样的
  120. * @param date <DATETIME>: BFI更新时间, 为空时缺省为当前时间的前1天;为1989.01.01或更早日期时代表初始化,指标会被存入本地数据库
  121. *
  122. *
  123. * Example: calEntityBfiIndicatorTask('MF', 2024.10.28);
  124. * calEntityBfiIndicatorTask('PF', 2024.10.28);
  125. */
  126. def calEntityBfiIndicatorTask(entityType, date) {
  127. // entityType = 'MF'
  128. // date = 2024.10.01
  129. rt = '';
  130. if(!(entityType IN ['MF', 'HF', 'PF'])) return null;
  131. very_old_day = 1900.01.01;
  132. if(date.isNothing() || date.isNull())
  133. end_day = temporalAdd(now(), -1d);
  134. else
  135. end_day = date;
  136. // 1989.01.01及以前的日期被认为从本地读数据
  137. isFromMySQL = iif(end_day <= 1989.01.01, false, true);
  138. // 取有最新bfi变动的基金列表 (1s)
  139. tb_cal_entities = get_entity_bfi_factors(entityType, NULL, very_old_day.month(), today().month(), end_day);
  140. if(tb_cal_entities.isVoid() || tb_cal_entities.size() == 0 ) return;
  141. v_uniq_entity_id = EXEC DISTINCT entity_id FROM tb_cal_entities;
  142. // 按照 MySQL 建好各表
  143. tb_bfi_indicator = create_entity_bfi_indicator(iif(entityType=='PF', true, false));
  144. // 分批跑
  145. i = 0;
  146. batch_size = 100;
  147. do {
  148. entities = SELECT * FROM tb_cal_entities WHERE entity_id IN v_uniq_entity_id[i : min(v_uniq_entity_id.size(), i+batch_size)];
  149. if(entities.isVoid() || entities.size() == 0) break;
  150. // 200ms
  151. entity_info = SELECT entity_id, end_date.temporalParse('yyyy-MM') AS end_date, inception_date, factor_id AS benchmark_id, ini_value
  152. FROM ej(entities, get_entity_info(entityType, entities.entity_id), 'entity_id');
  153. // 取月收益 (12s)
  154. rets = get_monthly_ret(entityType, entity_info.entity_id, very_old_day, entity_info.end_date.max().temporalFormat('yyyy-MM-dd').temporalParse('yyyy-MM-dd').monthEnd(), isFromMySQL);
  155. // 把 yyyy-MM 格式的 end_date 改成 dolphin 的 MONTH
  156. v_end_date = rets.end_date.temporalParse('yyyy-MM');
  157. rets.replaceColumn!('end_date', v_end_date);
  158. if(!rets.isVoid() && rets.size() > 0) {
  159. // 计算月度指标 (5s)
  160. indicators = cal_monthly_indicators(entityType, 'BFI', rets);
  161. // 仿照MySQL的表结构准备好记录 (1s)
  162. generate_entity_bfi_indicator(entity_info, indicators, true, tb_bfi_indicator);
  163. }
  164. i += batch_size;
  165. } while (i <= v_uniq_entity_id.size());
  166. if(! tb_bfi_indicator.isVoid() && tb_bfi_indicator.size() > 0) {
  167. // save data to MySQL
  168. try {
  169. t_desc = get_bfi_indicator_table_description(entityType);
  170. chg_columns_for_mysql(tb_bfi_indicator, t_desc.sec_id_col[0]);
  171. db_name = t_desc.table_name[0].split('.')[0];
  172. save_and_sync(tb_bfi_indicator, t_desc.table_name[0].strReplace(db_name, 'raw_db'), t_desc.table_name[0].strReplace(db_name, 'raw_db'));
  173. // 数据初始化时将指标存入本地,做排名之用
  174. if(end_day <= get_ini_data_const['date']) {
  175. save_table(tb_bfi_indicator, t_desc.table_name[0], false);
  176. }
  177. } catch(ex) {
  178. //TODO: Log errors
  179. rt = ex;
  180. }
  181. }
  182. return rt;
  183. }
  184. /*
  185. * 根据收益更新日期计算 RBSA
  186. *
  187. * Example: CalFundRBSATask('MF', ['MF00003PW1'], 2024.10.14T10:00:00);
  188. */
  189. def CalFundRBSATask(entityType, entityIds, updateTime) {
  190. // entityType = 'MF'
  191. //entityIds = ['MF00003PW1']
  192. //updateTime = 2024.10.14T10:00:00
  193. tb_result = table(100:0,
  194. ["entity_id", "asset_type_id", "index_id", "effective_date", "level", "alternative_id", "weighting"],
  195. [iif(entityType=='PF', INT, STRING), STRING, STRING, STRING, INT, STRING, DOUBLE]);
  196. t = get_entity_list_by_weekly_return_updatetime(entityType, entityIds, updateTime, true);
  197. window = 48;
  198. step = 13;
  199. if(t.isVoid() || t.size() == 0) return;
  200. d_rbsa = get_rbsa_index();
  201. for(entity in t) {
  202. for(asset_type in d_rbsa.keys()) {
  203. // 起始日期是最早更新日期再向前推一个时间窗口
  204. res = cal_entity_RBSA(entityType, entity.entity_id, d_rbsa[asset_type], 'w',
  205. t.price_date.temporalAdd(-window, 'w')[0], today(), true, window, step);
  206. // 每日任务只负责更新最新的rbsa结果
  207. latest_date = (EXEC price_date.max() AS price_date FROM res)[0];
  208. tb_result.tableInsert(SELECT entity_id, asset_type, index_id, price_date, level, alternative_id, weights
  209. FROM res WHERE price_date = latest_date);
  210. }
  211. }
  212. save_and_sync(tb_result, 'raw_db.pf_fund_rbsa_breakdown', 'raw_db.pf_fund_rbsa_breakdown');
  213. }
  214. /*
  215. * 【临时】用于数据初始化:只计算收益
  216. *
  217. * @param entityType <STRING>: 'MF', 'HF'...
  218. * @param date <DATETIME>: 净值更新时间
  219. *
  220. */
  221. def ms_calFundReturns() {
  222. rt = '';
  223. very_old_date = 1990.01.01;
  224. // 取基金列表 (27s)
  225. tb_cal_funds = ms_get_fund_list_by_nav_updatetime(NULL, very_old_date);
  226. if(tb_cal_funds.isVoid() || tb_cal_funds.size() == 0 ) return;
  227. tb_fund_performance = create_entity_performance();
  228. tb_fund_indicator = create_entity_indicator();
  229. tb_fund_risk_stats = create_entity_risk_stats();
  230. tb_fund_riskadjret_stats = create_entity_riskadjret_stats();
  231. tb_fund_style_stats = create_entity_style_stats();
  232. tb_fund_performance_weekly = create_entity_performance_weekly();
  233. tb_fund_latest_performance = create_entity_latest_performance();
  234. // 分批跑
  235. i = 0;
  236. batch_size = 1000;
  237. do {
  238. funds = tb_cal_funds[i : min(tb_cal_funds.size(), i+batch_size)];
  239. if(funds.isVoid() || funds.size() == 0) break;
  240. // 200ms
  241. fund_info = SELECT entity_id, price_date, inception_date, benchmark_id, ini_value
  242. FROM ej(funds, ms_get_fund_info(funds.entity_id), 'entity_id', 'fund_id');
  243. // 计算月收益 (19s)
  244. tb_nav = ms_get_fund_monthly_nav(fund_info.entity_id);
  245. rets = cal_monthly_returns_by_nav(fund_info, tb_nav);
  246. if(!rets.isVoid() && rets.size() > 0) {
  247. // 计算月度指标 (67s)
  248. rets.rename!('cumulative_nav', 'nav');
  249. indicators = cal_monthly_indicators('MF', 'PBI', rets);
  250. // 仿照MySQL的表结构准备好记录 (1s)
  251. generate_entity_performance(fund_info, indicators, true, tb_fund_performance);
  252. generate_entity_indicator(fund_info, indicators, true, tb_fund_indicator);
  253. generate_entity_risk_stats(fund_info, indicators, true, tb_fund_risk_stats);
  254. generate_entity_riskadjret_stats(fund_info, indicators, true, tb_fund_riskadjret_stats);
  255. generate_entity_style_stats(fund_info, indicators, true, tb_fund_style_stats);
  256. }
  257. // 计算周收益 (49s)
  258. rets_w = cal_weekly_returns('MF', fund_info);
  259. if(! rets_w.isVoid() && rets_w.size() > 0) {
  260. generate_entity_performance_weekly(fund_info, rets_w, true, tb_fund_performance_weekly);
  261. }
  262. // 计算最新收益 (23s)
  263. perf_latest = cal_latest_performance('MF', fund_info, true);
  264. if(! perf_latest.isVoid() && perf_latest.size() > 0) {
  265. generate_entity_latest_performance(fund_info, perf_latest, true, tb_fund_latest_performance);
  266. }
  267. i += batch_size;
  268. // } while (i < batch_size);
  269. } while (i <= tb_cal_funds.size());
  270. if(! tb_fund_performance.isVoid() && tb_fund_performance.size() > 0) {
  271. // save data to MySQL (26m)
  272. try {
  273. chg_columns_for_mysql(tb_fund_performance, 'fund_id');
  274. save_and_sync(tb_fund_performance, 'raw_db.fund_performance', 'raw_db.fund_performance');
  275. chg_columns_for_mysql(tb_fund_indicator, 'fund_id');
  276. save_and_sync(tb_fund_indicator, 'raw_db.fund_indicator', 'raw_db.fund_indicator');
  277. chg_columns_for_mysql(tb_fund_risk_stats, 'fund_id');
  278. // mfdb.fund_performance 表中 maxdrawdown_6m & maxdrawdown_ytd 是虚拟列,这里用数据列顺序强行写入真实列 6m_maxdrawdown & ytd_maxdrawdown (DolphinDB 不允许字段名以数字开头)
  279. save_and_sync(tb_fund_risk_stats, 'raw_db.fund_risk_stats', 'raw_db.fund_risk_stats');
  280. chg_columns_for_mysql(tb_fund_riskadjret_stats, 'fund_id');
  281. save_and_sync(tb_fund_riskadjret_stats, 'raw_db.fund_riskadjret_stats', 'raw_db.fund_riskadjret_stats');
  282. chg_columns_for_mysql(tb_fund_style_stats, 'fund_id');
  283. save_and_sync(tb_fund_style_stats, 'raw_db.fund_style_stats', 'raw_db.fund_style_stats');
  284. save_and_sync(tb_fund_performance_weekly, 'raw_db.fund_performance_weekly', 'raw_db.fund_performance_weekly');
  285. save_and_sync(tb_fund_latest_performance, 'raw_db.fund_latest_performance', 'raw_db.fund_latest_nav_performance');
  286. } catch(ex) {
  287. //TODO: Log errors
  288. rt = ex;
  289. }
  290. }
  291. return rt;
  292. }
  293. /*
  294. * [定时任务] 计算基金和组合的BFI
  295. *
  296. *
  297. * TODO: max_r2 表在哪里被用到了?应该和基金推荐有关系
  298. */
  299. def MatchEntityBFI(entityType, date) {
  300. //entityType = 'MF'
  301. //date = 2024.11.20
  302. rt = '';
  303. if(find(['HF', 'MF', 'PF'], entityType) < 0) return null;
  304. // 取有最新净值变动的基金列表 (1s)
  305. tb_cal_entity = get_entity_list_by_nav_updatetime(entityType, NULL, date, true);
  306. if(tb_cal_entity.isVoid() || tb_cal_entity.size() == 0 ) return;
  307. i = 0;
  308. size = tb_cal_entity.size();
  309. batch_size = 1000;
  310. entity_index_coe = create_entity_index_coe();
  311. do {
  312. // 4 min per 1000 funds
  313. coe = cal_entity_index_coe(entityType, tb_cal_entity[i : min(size, i+batch_size)]);
  314. if(coe.isVoid() || coe.size() == 0) continue;
  315. entity_info = get_entity_info(entityType, tb_cal_entity[i : min(size, i+batch_size)].entity_id);
  316. bfi_raw = match_entity_bfi(entityType, entity_info, coe);
  317. // 先存到数据库,落袋为安
  318. try {
  319. // 筛掉 correlation 绝对值不够阈值的记录
  320. t_coe = SELECT entity_id, end_date, index_id,
  321. iif(coe_1y.abs() < get_min_threshold('correlation'), double(NULL), coe_1y) AS coe_1y,
  322. iif(coe_3y.abs() < get_min_threshold('correlation'), double(NULL), coe_3y) AS coe_3y,
  323. iif(coe_5y.abs() < get_min_threshold('correlation'), double(NULL), coe_5y) AS coe_5y,
  324. info_ratio_1y, info_ratio_3y, info_ratio_5y, t_value_1y, t_value_3y, t_value_5y, beta_1y, beta_3y, beta_5y
  325. FROM coe;
  326. DELETE FROM t_coe WHERE coe_1y IS NULL AND coe_3y IS NULL AND coe_5y IS NULL;
  327. // 候选因子
  328. t_bfi_candidates = SELECT entity_id, end_date, index_id AS factor_id, coe_1y AS coe, coe_1y.square() AS r2, 'w' AS performance_flag, t_value_1y, beta_1y
  329. FROM t_coe WHERE index_id LIKE 'FA%';
  330. chg_columns_for_mysql(t_coe, iif(entityType == 'PF', 'portfolio_id', 'fund_id'));
  331. // 只有基金需要存 index_coe 表
  332. if(entityType IN ['MF', 'HF']) save_and_sync(t_coe, 'raw_db.pf_fund_index_coe', );
  333. // 所有的 factors 存到 xxx_factor_bfi 表;NOTE: Java 把所有 factor 的数据都存起来,这里只存 correlation 达标的记录 (反正这个表没啥用?)
  334. chg_columns_for_mysql(t_bfi_candidates, iif(entityType == 'PF', 'portfolio_id', 'fund_id'));
  335. save_and_sync(t_bfi_candidates, iif(entityType == 'PF', 'raw_db..pf_portfolio_factor_bfi', 'raw_db.cm_fund_factor_bfi'), );
  336. if(bfi_raw.isVoid() || bfi_raw.size() == 0) continue;
  337. // 有效因子
  338. t_bfi = SELECT entity_id, end_date, factor_id, coe_1y AS coe, r2, performance_flag, t_value_1y, beta_1y
  339. FROM bfi_raw ORDER BY entity_id, end_date, r2 DESC;
  340. // 最大R2因子及所有有效因子标签
  341. t_max_r2 = SELECT entity_id, factor_id.first() AS factor_id, end_date,
  342. int(NULL) AS performance_flag, coe.first() AS coe, r2.first() AS r2, concat(factor_name, ",") AS rz_portrait
  343. FROM ej(t_bfi, get_bfi_index_list(), 'factor_id')
  344. GROUP BY entity_id, end_date;
  345. // 有效 factors 存到 xxx_factor_bfi_by_category_group 表
  346. chg_columns_for_mysql(t_bfi, iif(entityType == 'PF', 'portfolio_id', 'fund_id'));
  347. save_and_sync(t_bfi_candidates, iif(entityType == 'PF', 'raw_db.pf_portfolio_factor_bfi_by_category_group', 'raw_db.pf_fund_factor_bfi_by_category_group'), );
  348. // 有效因子中 R2 最大的因子存 xxx_max_r2
  349. chg_columns_for_mysql(t_max_r2, iif(entityType == 'PF', 'portfolio_id', 'fund_id'));
  350. save_and_sync(t_max_r2, iif(entityType == 'PF', 'raw_db.pf_portfolio_factor_bfi_max_r2', 'raw_db.pf_fund_factor_bfi_by_category_group_max_r2'), );
  351. } catch (ex) {
  352. //TODO: Log errors
  353. rt += ex + '\n';
  354. }
  355. i = i + batch_size;
  356. } while (i<size)
  357. return rt;
  358. }