task_fundPerformance.dos 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. module fundit::task_fundPerformance
  2. use fundit::sqlUtilities
  3. use fundit::dataPuller
  4. use fundit::returnCalculator
  5. use fundit::indicatorCalculator
  6. use fundit::fundCalculator
  7. /*
  8. * 存 fund_performance 表
  9. *
  10. *
  11. */
  12. def generate_fund_performance(fund_info, indicators, isToMySQL, mutable fund_performance) {
  13. t = null;
  14. if(isToMySQL) {
  15. t = SELECT entity_id, end_date, price_date, nav AS cumulative_nav, ret AS ret_1m, ret AS ret_1m_a, trailing_ret AS ret_3m, trailing_ret_a AS ret_3m_a
  16. FROM indicators['PBI-3M'] AS ind
  17. INNER JOIN fund_info fi ON ind.entity_id = fi.entity_id
  18. WHERE ind.end_date >= fi.price_date.month(); // 过滤掉不必更新的旧记录
  19. UPDATE t
  20. SET ret_6m = trailing_ret, ret_6m_a = trailing_ret_a
  21. FROM ej(t, indicators['PBI-6M'], ['entity_id', 'end_date']);
  22. UPDATE t
  23. SET ret_1y = trailing_ret, ret_1y_a = trailing_ret_a
  24. FROM ej(t, indicators['PBI-1Y'], ['entity_id', 'end_date']);
  25. UPDATE t
  26. SET ret_2y = trailing_ret, ret_2y_a = trailing_ret_a
  27. FROM ej(t, indicators['PBI-2Y'], ['entity_id', 'end_date']);
  28. UPDATE t
  29. SET ret_3y = trailing_ret, ret_3y_a = trailing_ret_a
  30. FROM ej(t, indicators['PBI-3Y'], ['entity_id', 'end_date']);
  31. UPDATE t
  32. SET ret_4y = trailing_ret, ret_4y_a = trailing_ret_a
  33. FROM ej(t, indicators['PBI-4Y'], ['entity_id', 'end_date']);
  34. UPDATE t
  35. SET ret_5y = trailing_ret, ret_5y_a = trailing_ret_a
  36. FROM ej(t, indicators['PBI-5Y'], ['entity_id', 'end_date']);
  37. UPDATE t
  38. SET ret_10y = trailing_ret, ret_10y_a = trailing_ret_a
  39. FROM ej(t, indicators['PBI-10Y'], ['entity_id', 'end_date']);
  40. UPDATE t
  41. SET ret_ytd = trailing_ret, ret_ytd_a = trailing_ret_a
  42. FROM ej(t, indicators['PBI-YTD'], ['entity_id', 'end_date']);
  43. // 取消了 ret_incep_a_all (没意义) 和 ret_incep_a_gips (ret_incep_a 与之相等)
  44. UPDATE t
  45. SET ret_incep = trailing_ret, ret_incep_a = trailing_ret_a
  46. FROM ej(t, indicators['PBI-INCEP'], ['entity_id', 'end_date']);
  47. INSERT INTO fund_performance SELECT * FROM t;
  48. } else {
  49. }
  50. }
  51. /*
  52. * 定时任务:最新净值触发的业绩指标计算
  53. *
  54. * @param entityType <STRING>: 'MF', 'HF'...
  55. * @param date <DATETIME>: 净值更新时间
  56. *
  57. * TODO: 目前收益表在MySQL中,所以需要将计算的最新收益与MySQL中的历史数据合并
  58. */
  59. def calFundPerformance(entityType, date) {
  60. rt = '';
  61. very_old_date = 1990.01.01;
  62. if(find(['HF', 'MF'], entityType) < 0) return null;
  63. // 取有最新净值变动的基金列表
  64. tb_cal_funds = get_entity_list_by_nav_updatetime(entityType, NULL, date, true);
  65. if(tb_cal_funds.isVoid() || tb_cal_funds.size() == 0 ) return;
  66. // 分批跑
  67. i = 0;
  68. batch_size = 1000;
  69. tb_fund_performance = table(1000:0,
  70. ['entity_id', 'end_date', 'price_date', 'cumulative_nav',
  71. 'ret_1m', 'ret_1m_a', 'ret_3m', 'ret_3m_a', 'ret_6m', 'ret_6m_a',
  72. 'ret_1y', 'ret_1y_a', 'ret_2y', 'ret_2y_a', 'ret_3y', 'ret_3y_a', 'ret_4y', 'ret_4y_a',
  73. 'ret_5y', 'ret_5y_a', 'ret_10y', 'ret_10y_a', 'ret_ytd', 'ret_ytd_a', 'ret_incep', 'ret_incep_a'],
  74. [SYMBOL, MONTH, DATE, DOUBLE,
  75. DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE,
  76. DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE,
  77. DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]);
  78. do {
  79. funds = tb_cal_funds[i:batch_size];
  80. fund_info = SELECT entity_id, price_date, inception_date, benchmark_id, ini_value
  81. FROM ej(funds, get_fund_info(funds.entity_id), 'entity_id', 'fund_id');
  82. // 计算月收益
  83. rets = mix_monthly_returns(entityType, fund_info);
  84. // 计算月度指标
  85. rets.rename!('cumulative_nav', 'nav');
  86. indicators = cal_monthly_indicators(entityType, 'PBI', rets);
  87. // TODO: 最新更新的指标存入数据库
  88. generate_fund_performance(fund_info, indicators, true, tb_fund_performance);
  89. i += batch_size;
  90. } while (i < batch_size);
  91. // } while (i <= tb_cal_funds.size());
  92. tb_fund_performance.rename!('entity_id', 'fund_id');
  93. // 将 dolphinDB 的 MONTH 换成 MySQL 的 YYYY-MM 格式
  94. v_end_date = EXEC end_date.temporalFormat('yyyy-MM') FROM tb_fund_performance;
  95. tb_fund_performance.replaceColumn!('end_date', v_end_date);
  96. try {
  97. save_table(tb_fund_performance, 'mfdb.fund_performance', true);
  98. } catch(ex) {
  99. //TODO: Log errors
  100. rt = ex;
  101. }
  102. return rt;
  103. }