task_fundPerformance.dos 12 KB


  1. module fundit::task_fundPerformance
  2. use fundit::sqlUtilities;
  3. use fundit::operationDataPuller;
  4. use fundit::performanceDataPuller;
  5. use fundit::dataSaver;
  6. use fundit::returnCalculator;
  7. use fundit::indicatorCalculator;
  8. use fundit::ms_dataPuller;
  9. /*
  10. * [定时任务]: 同步公募,私募,市场指数的净值并存入本地数据库
  11. *
  12. * @param date <DATETIME>
  13. *
  14. * Example: GetEntityNavTask(2024.09.01); // 【初始化专用】
  15. */
  16. def GetEntityNavTask(date) {
  17. for(entity_type in ['HF', 'MI', 'MF']) {
  18. des = get_nav_table_description(entity_type)[0];
  19. t_local_table = load_table_from_local('fundit', des.table_name);
  20. entity_date = get_entity_list_by_nav_updatetime(entity_type, null, date, true);
  21. if(entity_date.isVoid() || entity_date.size() == 0) return;
  22. v_entity_id = entity_date.entity_id.distinct();
  23. i = 0;
  24. batch_size = 1000;
  25. max_cnt = v_entity_id.size();
  26. // cnt_nums = 0:0;
  27. cnt_num = 0;
  28. do {
  29. tb_entity = SELECT * FROM entity_date WHERE entity_id IN v_entity_id[i : min(max_cnt, i+batch_size)];
  30. s_json = tb_entity.toStdJson();
  31. tb_nav = sync_entity_nav_by_date(entity_type, s_json);
  32. try {
  33. // 建个键值表来更新数据
  34. // t_local_table = keyedTable([des.sec_id_col, 'price_date'], load_table_from_local('fundit', des.table_name));
  35. // cnt_nums += t_local_table.tableUpsert(tb_nav, keyColNames=['entity_id', 'price_date']);
  36. t_local_table.append!(tb_nav);
  37. // 存入本地
  38. // save_table(t_local_table, des.table_name, false);
  39. } catch(ex) {
  40. writeLogLevel(ERROR, ex);
  41. }
  42. writeLogLevel(INFO, des.table_name + ': UPDATE ' + tb_nav.size()$STRING);
  43. i += batch_size;
  44. } while (i < max_cnt);
  45. }
  46. }
  47. /*
  48. * [定时任务]:最新净值触发的业绩指标计算
  49. *
  50. * @param entityType <STRING>: 'MF', 'HF'...
  51. * @param date <DATETIME>: 净值更新时间, 为空时缺省为当前时间-1天;为1989.01.01或更早日期时代表初始化,指标会被存入本地数据库
  52. *
  53. * NOTE: 与Java不同的是当月indicator计算每日触发,不必等到Month-end production
  54. *
  55. * Example: calFundPerformanceTask('HF', 2024.10.28);
  56. * calFundPerformanceTask('MI', 2024.10.28);
  57. * calFundPerformanceTask('FI', 2024.10.28);
  58. * calFundPerformanceTask('MF', get_ini_data_const()['date']); -- 【初始化数据专用】(70min)
  59. */
  60. def calFundPerformanceTask(entityType, date) {
  61. rt = '';
  62. if(!(entityType IN ['MF', 'HF', 'MI', 'FI'])) return null;
  63. if(date.isNothing() || date.isNull())
  64. end_day = temporalAdd(now(), -1d);
  65. else
  66. end_day = date;
  67. // 取有最新净值变动的基金列表 (2.5 min)
  68. tb_cal_funds = get_entity_list_by_nav_updatetime(entityType, NULL, end_day, true);
  69. if(tb_cal_funds.isVoid() || tb_cal_funds.size() == 0 ) return;
  70. // 按照 MySQL 建好各表
  71. tb_fund_performance = create_entity_performance();
  72. tb_fund_indicator = create_entity_indicator();
  73. tb_fund_risk_stats = create_entity_risk_stats();
  74. tb_fund_riskadjret_stats = create_entity_riskadjret_stats();
  75. tb_fund_style_stats = create_entity_style_stats();
  76. tb_fund_ms_stats = create_entity_ms_stats();
  77. tb_fund_performance_weekly = create_entity_performance_weekly();
  78. tb_fund_latest_performance = create_entity_latest_performance();
  79. // 分批跑
  80. i = 0;
  81. batch_size = 1000;
  82. do {
  83. funds = tb_cal_funds[i : min(tb_cal_funds.size(), i+batch_size)];
  84. if(funds.isVoid() || funds.size() == 0) break;
  85. // 200ms
  86. fund_info = SELECT entity_id, price_date, inception_date, benchmark_id, ini_value
  87. FROM ej(funds, get_entity_info(entityType, funds.entity_id), 'entity_id');
  88. // 计算月收益 (12s)
  89. rets = mix_monthly_returns(entityType, fund_info);
  90. if(!rets.isVoid() && rets.size() > 0) {
  91. // 计算月度指标 (56s)
  92. rets.rename!('cumulative_nav', 'nav');
  93. indicators = cal_monthly_indicators(entityType, 'PBI', rets);
  94. // 仿照MySQL的表结构准备好记录 (1s)
  95. generate_entity_performance(fund_info, indicators, true, tb_fund_performance);
  96. generate_entity_indicator(fund_info, indicators, true, tb_fund_indicator);
  97. generate_entity_risk_stats(fund_info, indicators, true, tb_fund_risk_stats);
  98. generate_entity_riskadjret_stats(fund_info, indicators, true, tb_fund_riskadjret_stats);
  99. generate_entity_style_stats(fund_info, indicators, true, tb_fund_style_stats);
  100. generate_entity_ms_stats(fund_info, indicators, true, tb_fund_ms_stats);
  101. }
  102. // 计算周收益 (8s)
  103. rets_w = cal_weekly_returns(entityType, fund_info);
  104. if(! rets_w.isVoid() && rets_w.size() > 0) {
  105. generate_entity_performance_weekly(fund_info, rets_w, true, tb_fund_performance_weekly);
  106. }
  107. // 计算最新收益 (69s)
  108. perf_latest = cal_latest_performance(entityType, fund_info, true);
  109. if(! perf_latest.isVoid() && perf_latest.size() > 0) {
  110. generate_entity_latest_performance(fund_info, perf_latest, true, tb_fund_latest_performance);
  111. }
  112. i += batch_size;
  113. // } while (i < batch_size);
  114. } while (i <= tb_cal_funds.size());
  115. if(! tb_fund_performance.isVoid() && tb_fund_performance.size() > 0) {
  116. // save data to MySQL (13s)
  117. try {
  118. chg_columns_for_mysql(tb_fund_performance, 'fund_id');
  119. save_and_sync(tb_fund_performance, 'raw_db.fund_performance', 'raw_db.fund_performance');
  120. chg_columns_for_mysql(tb_fund_indicator, 'fund_id');
  121. save_and_sync(tb_fund_indicator, 'raw_db.fund_indicator', 'raw_db.fund_indicator');
  122. chg_columns_for_mysql(tb_fund_risk_stats, 'fund_id');
  123. // mfdb.fund_performance 表中 maxdrawdown_6m & maxdrawdown_ytd 是虚拟列,这里用数据列顺序强行写入真实列 6m_maxdrawdown & ytd_maxdrawdown (DolphinDB 不允许字段名以数字开头)
  124. save_and_sync(tb_fund_risk_stats, 'raw_db.fund_risk_stats', 'raw_db.fund_risk_stats');
  125. chg_columns_for_mysql(tb_fund_riskadjret_stats, 'fund_id');
  126. save_and_sync(tb_fund_riskadjret_stats, 'raw_db.fund_riskadjret_stats', 'raw_db.fund_riskadjret_stats');
  127. chg_columns_for_mysql(tb_fund_style_stats, 'fund_id');
  128. save_and_sync(tb_fund_style_stats, 'raw_db.fund_style_stats', 'raw_db.fund_style_stats');
  129. chg_columns_for_mysql(tb_fund_ms_stats, 'fund_id');
  130. save_and_sync(tb_fund_ms_stats, 'raw_db.fund_ms_stats', 'mfdb.fund_ms_stats'); // new table, write into directly
  131. save_and_sync(tb_fund_performance_weekly, 'raw_db.fund_performance_weekly', 'raw_db.fund_performance_weekly');
  132. save_and_sync(tb_fund_latest_performance, 'raw_db.fund_latest_performance', 'raw_db.fund_latest_nav_performance');
  133. // 数据初始化时将指标存入本地
  134. if(end_day <= get_ini_data_const()['date']) {
  135. save_table(tb_fund_performance, 'mfdb.fund_performance', false);
  136. save_table(tb_fund_indicator, 'mfdb.fund_indicator', false);
  137. save_table(tb_fund_risk_stats, 'mfdb.fund_risk_stats', false);
  138. save_table(tb_fund_riskadjret_stats, 'mfdb.fund_riskadjret_stats', false);
  139. save_table(tb_fund_style_stats, 'mfdb.fund_style_stats', false);
  140. save_table(tb_fund_ms_stats, 'mfdb.fund_ms_stats', false);
  141. save_table(tb_fund_performance_weekly, 'mfdb.fund_performance_weekly', false);
  142. save_table(tb_fund_latest_performance, 'mfdb.fund_latest_performance', false);
  143. }
  144. } catch(ex) {
  145. //TODO: Log errors
  146. rt = ex;
  147. }
  148. }
  149. return rt;
  150. }
  151. /*
  152. * 【临时】用于数据初始化:只计算收益
  153. *
  154. * @param entityType <STRING>: 'MF', 'HF'...
  155. * @param date <DATETIME>: 净值更新时间
  156. *
  157. */
  158. def ms_calFundReturns() {
  159. rt = '';
  160. very_old_date = 1990.01.01;
  161. // 取基金列表 (27s)
  162. tb_cal_funds = ms_get_fund_list_by_nav_updatetime(NULL, very_old_date);
  163. if(tb_cal_funds.isVoid() || tb_cal_funds.size() == 0 ) return;
  164. tb_fund_performance = create_entity_performance();
  165. tb_fund_indicator = create_entity_indicator();
  166. tb_fund_risk_stats = create_entity_risk_stats();
  167. tb_fund_riskadjret_stats = create_entity_riskadjret_stats();
  168. tb_fund_style_stats = create_entity_style_stats();
  169. tb_fund_performance_weekly = create_entity_performance_weekly();
  170. tb_fund_latest_performance = create_entity_latest_performance();
  171. // 分批跑
  172. i = 0;
  173. batch_size = 1000;
  174. do {
  175. funds = tb_cal_funds[i : min(tb_cal_funds.size(), i+batch_size)];
  176. if(funds.isVoid() || funds.size() == 0) break;
  177. // 200ms
  178. fund_info = SELECT entity_id, price_date, inception_date, benchmark_id, ini_value
  179. FROM ej(funds, ms_get_fund_info(funds.entity_id), 'entity_id', 'fund_id');
  180. // 计算月收益 (19s)
  181. tb_nav = ms_get_fund_monthly_nav(fund_info.entity_id);
  182. rets = cal_monthly_returns_by_nav(fund_info, tb_nav);
  183. if(!rets.isVoid() && rets.size() > 0) {
  184. // 计算月度指标 (67s)
  185. rets.rename!('cumulative_nav', 'nav');
  186. indicators = cal_monthly_indicators('MF', 'PBI', rets);
  187. // 仿照MySQL的表结构准备好记录 (1s)
  188. generate_entity_performance(fund_info, indicators, true, tb_fund_performance);
  189. generate_entity_indicator(fund_info, indicators, true, tb_fund_indicator);
  190. generate_entity_risk_stats(fund_info, indicators, true, tb_fund_risk_stats);
  191. generate_entity_riskadjret_stats(fund_info, indicators, true, tb_fund_riskadjret_stats);
  192. generate_entity_style_stats(fund_info, indicators, true, tb_fund_style_stats);
  193. }
  194. // 计算周收益 (49s)
  195. rets_w = cal_weekly_returns('MF', fund_info);
  196. if(! rets_w.isVoid() && rets_w.size() > 0) {
  197. generate_entity_performance_weekly(fund_info, rets_w, true, tb_fund_performance_weekly);
  198. }
  199. // 计算最新收益 (23s)
  200. perf_latest = cal_latest_performance('MF', fund_info, true);
  201. if(! perf_latest.isVoid() && perf_latest.size() > 0) {
  202. generate_entity_latest_performance(fund_info, perf_latest, true, tb_fund_latest_performance);
  203. }
  204. i += batch_size;
  205. // } while (i < batch_size);
  206. } while (i <= tb_cal_funds.size());
  207. if(! tb_fund_performance.isVoid() && tb_fund_performance.size() > 0) {
  208. // save data to MySQL (26m)
  209. try {
  210. chg_columns_for_mysql(tb_fund_performance, 'fund_id');
  211. save_and_sync(tb_fund_performance, 'raw_db.fund_performance', 'raw_db.fund_performance');
  212. chg_columns_for_mysql(tb_fund_indicator, 'fund_id');
  213. save_and_sync(tb_fund_indicator, 'raw_db.fund_indicator', 'raw_db.fund_indicator');
  214. chg_columns_for_mysql(tb_fund_risk_stats, 'fund_id');
  215. // mfdb.fund_performance 表中 maxdrawdown_6m & maxdrawdown_ytd 是虚拟列,这里用数据列顺序强行写入真实列 6m_maxdrawdown & ytd_maxdrawdown (DolphinDB 不允许字段名以数字开头)
  216. save_and_sync(tb_fund_risk_stats, 'raw_db.fund_risk_stats', 'raw_db.fund_risk_stats');
  217. chg_columns_for_mysql(tb_fund_riskadjret_stats, 'fund_id');
  218. save_and_sync(tb_fund_riskadjret_stats, 'raw_db.fund_riskadjret_stats', 'raw_db.fund_riskadjret_stats');
  219. chg_columns_for_mysql(tb_fund_style_stats, 'fund_id');
  220. save_and_sync(tb_fund_style_stats, 'raw_db.fund_style_stats', 'raw_db.fund_style_stats');
  221. save_and_sync(tb_fund_performance_weekly, 'raw_db.fund_performance_weekly', 'raw_db.fund_performance_weekly');
  222. save_and_sync(tb_fund_latest_performance, 'raw_db.fund_latest_performance', 'raw_db.fund_latest_nav_performance');
  223. } catch(ex) {
  224. //TODO: Log errors
  225. rt = ex;
  226. }
  227. }
  228. return rt;
  229. }