Bläddra i källkod

组合净值计算重写

Joey 5 månader sedan
förälder
incheckning
6cad751fe0
5 ändrade filer med 377 tillägg och 40 borttagningar
  1. 102 3
      modules/dataPuller.dos
  2. 10 0
      modules/dataSaver.dos
  3. 97 37
      modules/navCalculator.dos
  4. 91 0
      modules/sqlUtilities.dos
  5. 77 0
      modules/task_portfolioPerformance.dos

+ 102 - 3
modules/dataPuller.dos

@@ -749,14 +749,113 @@ def get_benchmark_return(benchmarks, end_day) {
 
 
 /*
+ *  取某时间区间内的 XXX_indicator 表数据
+ * 
+ * 
+ */
+def get_entity_indicaor(entity_type, entity_ids, start_date, end_date, isFromMySQL) {
+
+    t = null;
+
+    s_entity_ids = ids_to_string(entity_ids);
+
+    if(s_entity_ids == null || s_entity_ids == '') return null;
+
+    tmp = get_indicator_table_description(entity_type);
+
+    yyyymm_start = start_date.temporalFormat("yyyy-MM")
+    yyyymm_end = end_date.temporalFormat("yyyy-MM")
+
+    if(isFromMySQL == true) {
+
+        s_query = "SELECT *
+                   FROM " + tmp.table_name[0] + "
+                   WHERE " + tmp.sec_id_col[0] + " IN (" + s_entity_ids + ")
+                      AND isvalid = 1
+                      AND end_date BETWEEN '" + yyyymm_start + "' AND '" + yyyymm_end + "'
+                   ORDER BY " + tmp.sec_id_col[0] + ", end_date";
+     
+        conn = connect_mysql()
+     
+        t = odbc::query(conn, s_query)
+     
+        conn.close()
+
+    }
+
+    return t
+}
+
+/*
+ *  取持有基金净值更新的组合列表
+ *  
+ *  TODO: 需要跑2分钟,待优化
+ * 
+ *  Example: get_portfolio_list_by_fund_nav_updatetime([166002,166114], 2024.10.28, true);
+ */
+def get_portfolio_list_by_fund_nav_updatetime(portfolio_ids, updatetime, isFromMySQL) {
+
+    t = null;
+
+    s_entity_ids = ids_to_string(portfolio_ids);
+
+    if(isFromMySQL == true) {
+
+        s_query = "CALL pfdb.sp_get_portfolios_to_cal_nav(" + iif(s_entity_ids.isNull(), 'NULL', "'" + s_entity_ids + "'") + ",'" + updatetime + "')";
+
+    conn = connect_mysql();
+
+    t = odbc::query(conn, s_query);
+
+    conn.close();
+
+    }
+
+    return t
+}
+
+
+/*
+ *  取Json中指定的组合当日净值
+ * 
+ *  @param s_json <JSON>
+ *  
+ *  Example: get_portfolio_nav_by_date([{"portfolio_id": 166002,"price_date": "2024.10.25"},{"portfolio_id": 166114,"price_date": "2024.03.13"}], true);
+ */
+def get_portfolio_nav_by_date(s_json, isFromMySQL) {
+
+    t = null;
+
+    if(isFromMySQL == true) {
+
+        s_query = "SELECT t.portfolio_id, t.price_date, nav.cumulative_nav
+                   FROM JSON_TABLE ( '" + s_json + "', '$[*]'
+                           COLUMNS ( portfolio_id INT PATH '$.portfolio_id',
+                                     price_date DATE PATH '$.price_date' ) ) t
+                   LEFT JOIN pfdb.pf_portfolio_nav nav ON t.portfolio_id = nav.portfolio_id AND t.price_date = nav.price_date;";
+
+        conn = connect_mysql();
+     
+        t = odbc::query(conn, s_query);
+     
+        conn.close();
+    }
+
+    return t;
+}
+
+
+/*
  *  【Morningstar Integration】取某时间后净值更新的公募基金列表
  *
  *  @param entity_ids <STRING|VECTOR>:
  *  @param update_time <DATETIME>: all updates after this time
+ *  
+ *  TODO: 将 public_nav2 换成 mfdb.public_nav 后,要把 createtime 改成 updatetime
  *
- *  Example: ms_get_fund_list_by_nav_createtime(['MF00003PW1','MF00003PWC'], 2024.10.26);
+ *  Example: ms_get_fund_list_by_nav_updatetime(['MF00003PW1','MF00003PWC'], 2024.10.26);
  */
-def ms_get_fund_list_by_nav_createtime(entity_ids, createtime) {
+def ms_get_fund_list_by_nav_updatetime(entity_ids, updatetime) {
 
     s_entity_ids = ids_to_string(entity_ids);
 
@@ -771,7 +870,7 @@ def ms_get_fund_list_by_nav_createtime(entity_ids, createtime) {
                WHERE isvalid = 1 " +
                  sql_entity_id + "
                  AND cumulative_nav > 0
-                 AND createtime >= '" + createtime$STRING + "'
+                 AND createtime >= '" + updatetime$STRING + "'
                GROUP BY fund_id
                ORDER BY fund_id, price_date";
     

+ 10 - 0
modules/dataSaver.dos

@@ -79,6 +79,16 @@ def save_and_sync(table, source_table_name, target_table_name) {
 
 
 /*
+ *   建表 XXXX_nav
+ */
+def create_entity_nav() {
+
+    return table(1000:0, 
+                ['entity_id', 'price_date', 'cumulative_nav'],
+                [SYMBOL, DATE, DOUBLE]);
+}
+
+/*
  *   建表 XXXX_performance
  */
 def create_entity_performance() {

+ 97 - 37
modules/navCalculator.dos

@@ -2,16 +2,20 @@ module fundit::navCalculator
 
 use fundit::dataPuller
 
+
 /*
  *  转交易表为交易日的持仓截面表
  *  NOTE: 假定所有基金证券都是T+1买入,也就是第一天没有收益
- * 
- * 
+ *        返回每个有交易的日期,以及当天会被纳入净值收益计算的各持仓份额(比如买入基金当天的份额数为0,卖出基金当天的份额是卖前份额)
+ *
+ *  Example: convert_transaction_to_snapshot("166002,166114", 2024.10.31);
  */
 def convert_transaction_to_snapshot(portfolio_ids, end_day) {
 
+    s_portfolio_ids = ids_to_string(portfolio_ids);
+
     // 取数据库中的持仓交易表
-    tb_transaction = get_portfolio_holding_history(portfolio_ids);
+    tb_transaction = get_portfolio_holding_history(s_portfolio_ids);
     
     // 所有交易日期
     tb_date = SELECT DISTINCT portfolio_id, holding_date FROM tb_transaction;
@@ -40,7 +44,7 @@ def convert_transaction_to_snapshot(portfolio_ids, end_day) {
     GROUP BY portfolio_id, fund_id
     HAVING fund_share.sum().round(0) > 0;
 
-    return tb;
+    return tb.sortBy!(['portfolio_id', 'holding_date', 'fund_id'], [1, 1, 1]);
 
 }
 
@@ -50,42 +54,64 @@ def convert_transaction_to_snapshot(portfolio_ids, end_day) {
  *        忽略手工净值会导致收益不精确或无法计算的问题,但可能错误的净值将导致错误的结果,两害取其轻。
  *  
  *  
- *  Create:  20240908 用于代替 sp_cal_portfolio_nav            Joey
+ *  Create:  20241101 用于代替 sp_cal_portfolio_nav                                               Joey
  * 
- *  @param portfolio_ids <STRING>: 逗号分隔的组合ID
- *  @param start_date <DATE>: 持仓证券净值更新的起始日期
- *  @param cal_method <INT>: 净值使用方法:1-依赖累计净值,但份额数不是真实的、2:依赖单位净值,份额数是真实的
+ *  @param portfolio_info <TABLE>: NEED COLUMNS portfolio_id, sec_id, start_cal_date, end_cal_date, org_id
  *  
+ *  Example:cal_portfolio_nav(get_portfolio_list_by_fund_nav_updatetime([166002,166114], 2024.10.28, true));
  *  
  */
- def cal_portfolio_nav(portfolio_ids, start_date, cal_method) {
+ def cal_portfolio_nav(portfolio_info) {
+
+    if(portfolio_info.isVoid() || portfolio_info.size() == 0) return NULL;
 
     // 取持仓截面get_nav_for_return_calculation
-    tb_snapshot = convert_transaction_to_snapshot(portfolio_ids, today()).rename!('fund_id', 'sec_id');
+    tb_snapshot = convert_transaction_to_snapshot(portfolio_info.portfolio_id, today()).rename!('fund_id', 'sec_id');
+
+    if(tb_snapshot.isVoid() || tb_snapshot.size() == 0) return NULL;
 
-    // 取涉及到的所有基金证券最早持仓日期
-    s_json = (SELECT sec_id, holding_date.min() AS price_date FROM tb_snapshot GROUP BY sec_id).toStdJson();
+    // 分别对应:私募,公募,私有基金,股票,市场指数,图译指数,私有指数,图译因子
+    v_universe = ['HF', 'MF', 'CF', 'EQ', 'MI', 'FI', 'CI', 'FA'];
+    v_prefix = ['HF%', 'MF%', 'CF%', 'EQ%', 'IN%', 'IN%', 'CI%', 'FA%'];
+    d_universe = dict(v_universe, v_prefix);
+    
+    tb_nav = table(100:0, ['sec_id', 'price_date', 'cumulative_nav', 'nav'], [SYMBOL, DATE, DOUBLE, DOUBLE]);
+    // 取计算所需的所有持仓净值数据
+    for(u in d_universe.keys()) {
+
+        // 取涉及到的所有基金证券最早持仓日期
+    	s_json = (SELECT sec_id, start_cal_date.min() AS price_date FROM portfolio_info WHERE sec_id LIKE d_universe[u] GROUP BY sec_id).toStdJson();
+
+        // 取涉及到的所有基金证券有用净值
+        // TODO: need consider inception date nav
+        tmp_nav = get_nav_for_return_calculation(u, 'd', s_json);
+
+        if(tmp_nav.isVoid() || tmp_nav.size() == 0) continue;
+        
+        INSERT INTO tb_nav SELECT * FROM tmp_nav;
+    }
 
-    // 取涉及到的所有基金证券有用净值
-    // TODO: need consider inception date nav
-    tb_nav = get_nav_for_return_calculation('PF', 'd', s_json);
 
-    // 补一下最新界面
+    // 补一下最新截面(虽然是个”假的”截面)
     tb_latest_snapshot = SELECT sec_id, holding_date, nav.mean().round(6) AS nav
                          FROM tb_snapshot
-                         WHERE NOT EXISTS ( SELECT 1 FROM tb_nav WHERE sec_id = tb_snapshot.sec_id AND price_date = tb_snapshot.holding_date )
+                         WHERE holding_date = today()
+                           AND NOT EXISTS ( SELECT 1 FROM tb_nav WHERE sec_id = tb_snapshot.sec_id AND price_date = tb_snapshot.holding_date )
                          GROUP BY sec_id, holding_date;
 
-    // Buggy DolphinDB, INSERT INTO Table1 (Columns) SELECT Columns FROM Table2 会报列数不匹配的奇葩错误
+    // Funky DolphinDB, INSERT INTO Table1 (Columns) SELECT Columns FROM Table2 会报列数不匹配的奇葩错误
     // this is the way to get around it
     INSERT INTO tb_nav (sec_id, price_date, cumulative_nav) VALUES (tb_latest_snapshot.sec_id, tb_latest_snapshot.holding_date, tb_latest_snapshot.nav);
 
     // 在各证券持仓时段中,填充所有无净值的但其它证券有净值的合理日期
     // 比如 2024-01-10 ~ 2024-01-20区间,组合持有基金A和基金B,基金A有每日净值
     // 而基金B只有01-12和01-19两期周五净值,那么基金B需要填充除这两天以外的所有日期
-    tb_holding_date_range = SELECT portfolio_id, sec_id, holding_date.min() AS oldest_date, holding_date.max() AS latest_date
-                            FROM tb_snapshot GROUP BY portfolio_id, sec_id;
-    // 所有净值日期
+    // TODO: 待查,如果有共享的子基金,但日期不同,会不会出BUG
+    tb_holding_date_range = SELECT p.portfolio_id, p.sec_id, n.price_date.min() AS oldest_date, n.price_date.max() AS latest_date
+                            FROM portfolio_info p 
+                            INNER JOIN tb_nav n ON n.sec_id = p.sec_id
+                            GROUP BY p.portfolio_id, p.sec_id;
+    // 所有净值日期+前值日期
     tb_date = SELECT DISTINCT dr.portfolio_id, n.price_date
               FROM tb_holding_date_range dr
               INNER JOIN tb_nav n ON dr.sec_id = n.sec_id
@@ -95,6 +121,8 @@ def convert_transaction_to_snapshot(portfolio_ids, end_day) {
     // 所有基金证券id
     tb_id = SELECT DISTINCT portfolio_id, sec_id FROM tb_snapshot;
 
+    // NOTE: 因为同一个组合下的持仓私募基金的净值前值日期会不一样, 所以在 tb_date里会混入多余的脏数据,导致某些私募的净值前值及日期被赋予错误的数据
+    // 好消息是最后返回的收益及净值会把这些错误的前值筛掉,但最好想个办法在这里清除掉
     tb_holdings = SELECT id.portfolio_id, dt.price_date, id.sec_id, n.cumulative_nav, n.nav
                   FROM tb_id id
                   INNER JOIN tb_date dt ON id.portfolio_id = dt.portfolio_id
@@ -117,28 +145,37 @@ def convert_transaction_to_snapshot(portfolio_ids, end_day) {
 
     // 把交易日截面的份额数用于组合收益表
     UPDATE tb_holdings
-      SET shares = ss.shares
+        SET shares = ss.shares
     FROM ej(tb_holdings AS pr, tb_snapshot AS ss, ['portfolio_id', 'price_date', 'sec_id'], ['portfolio_id', 'holding_date', 'sec_id']);
 
     // 填充份额数为空的无交易日期,这段时间所有证券基金处于 buy-n-hold
     UPDATE tb_holdings
-      SET shares = shares.bfill()
+        SET shares = shares.bfill()
     CONTEXT BY portfolio_id, sec_id;
 
+    // 记录每个组合最早的净值计算日期
+    tb_port_first_cal_date = SELECT portfolio_id, start_cal_date.min() AS first_cal_date, end_cal_date.max() AS latest_cal_date, org_id[0] AS org_id 
+                             FROM portfolio_info GROUP BY portfolio_id;
+
     // 计算各日期的持仓资产及总资产
-    if(cal_method == 1) {
-        UPDATE tb_holdings SET market_value = (cumulative_nav * shares).round(6);
-    } else {
-        UPDATE tb_holdings SET market_value = (nav * shares).round(6);
-    }
+    UPDATE tb_holdings
+        SET market_value = (cumulative_nav * shares).round(6)
+    FROM ej(tb_holdings, tb_port_first_cal_date, 'portfolio_id')
+    WHERE org_id = '1';
+
+    UPDATE tb_holdings
+        SET market_value = (nav * shares).round(6)
+    FROM ej(tb_holdings, tb_port_first_cal_date, 'portfolio_id')
+    WHERE org_id = '2';
 
     UPDATE tb_holdings
-      SET total_mkt_value = market_value.sum()
+        SET total_mkt_value = market_value.sum()
     CONTEXT BY portfolio_id, price_date;
 
+
     // 计算各持仓的权重
     UPDATE tb_holdings
-      SET weight = (market_value \ total_mkt_value).round(6)
+        SET weight = (market_value \ total_mkt_value).round(6)
     WHERE total_mkt_value <> 0;
 
     // 组合收益计算: RET = ∑( weight_i * ret_i )
@@ -146,15 +183,38 @@ def convert_transaction_to_snapshot(portfolio_ids, end_day) {
                        FROM tb_holdings
                        GROUP BY portfolio_id, price_date;
 
-    // 初始化净值
+    // 取组合净值前值
+    s_json = (SELECT portfolio_id, price_date.min() AS price_date
+              FROM ej(tb_portfolio_ret, tb_port_first_cal_date, 'portfolio_id')
+              WHERE tb_portfolio_ret.price_date < tb_port_first_cal_date.first_cal_date
+              GROUP BY portfolio_id).toStdJson();
+    tb_pre_nav = get_portfolio_nav_by_date(s_json, true);
+    
     tb_portfolio_ret.addColumn('nav', DOUBLE);
-    UPDATE tb_portfolio_ret SET nav = 1 WHERE ret IS NULL;
 
-    // 通过收益反算净值: nav_i = nav_0 * ∏(1 + ret_i)
-    UPDATE tb_portfolio_ret SET nav = (1+ret).cumprod().round(6) WHERE nav IS NULL CONTEXT BY portfolio_id;
+    // start_cal_date 是最早净值日期
+    UPDATE tb_portfolio_ret
+        SET nav = 1, ret = 0
+    FROM ej(tb_portfolio_ret, ej(tb_port_first_cal_date, tb_pre_nav, 'portfolio_id'), ['portfolio_id', 'price_date'], ['portfolio_id', 'first_cal_date'])
+    WHERE tb_pre_nav.cumulative_nav IS NULL;
 
-    // 删掉没有用的数据
-    DELETE FROM tb_portfolio_ret WHERE price_date >= today();
+    // start_cal_date 是最早净值日期,用它作为初始净值日期
+    UPDATE tb_pre_nav
+        SET price_date = first_cal_date, cumulative_nav = 1
+    FROM ej(tb_pre_nav, tb_port_first_cal_date, 'portfolio_id')
+    WHERE cumulative_nav IS NULL;
 
-    return tb_portfolio_ret;
+    tb_portfolio_ret.sortBy!(['portfolio_id', 'price_date'], [1, 1]);
+
+    // 通过收益反算净值: nav_i = nav_0 * ∏(1 + ret_i)
+    UPDATE tb_portfolio_ret 
+        SET nav = (tb_pre_nav.cumulative_nav * (1+ret).cumprod()).round(6) 
+    FROM ej(tb_portfolio_ret, tb_pre_nav, 'portfolio_id')
+    CONTEXT BY portfolio_id;
+
+    // 返回有用的数据 
+    return (SELECT DISTINCT tb_portfolio_ret.* 
+            FROM ej(tb_portfolio_ret, tb_port_first_cal_date, 'portfolio_id')
+            WHERE price_date >= first_cal_date AND price_date <= latest_cal_date
+            ORDER BY portfolio_id, price_date);
  }

+ 91 - 0
modules/sqlUtilities.dos

@@ -54,6 +54,7 @@ def load_table_from_local(server_name, table_name) {
  * 
  *   Example: ids_to_string("'a','b','c'");
  *            ids_to_string(['a', NULL, 'c']);
+ *            ids_to_string([1,2,3]);
  */
 def ids_to_string(ids) {
 
@@ -66,6 +67,9 @@ def ids_to_string(ids) {
         s_ids = ids.trim();
     // 输入的 ids 是字符串向量
     } else if(ids.form() == 1) {
+      if(ids.type() == 4) // INTEGER
+        s_ids = ids.concat(",").trim();
+      else // STRING
         s_ids = "'" + ids.concat("','").trim() + "'";
     // 缺省返回空
     } else {
@@ -116,3 +120,90 @@ def get_performance_table_description(entity_type) {
     return (SELECT * FROM tmp_universe u WHERE u.type = entity_type);
     
 }
+
+/*
+ *  根据不同类型的主体返回其杂项指标的表名、字段名
+ * 
+ *  Example: get_indicator_table_description('HF');
+ */
+def get_indicator_table_description(entity_type) {
+
+    tmp_universe = table(100:0, 
+                         ['type', 'table_name', 'sec_id_col'],
+                         [STRING, STRING, STRING]);
+
+    // 分别对应:私募,公募,私有基金,市场指数,图译指数,私有指数,图译因子,组合
+    INSERT INTO tmp_universe VALUES ( 
+        ['HF', 'MF', 'CF', 'MI', 'FI', 'CI', 'FA', 'PF'],
+        ['mfdb.fund_indicator', 'mfdb.fund_indicator', 'pfdb.pf_cus_fund_indicator', 'mfdb.fund_indicator', 'mfdb.fund_indicator', 'pfdb.cm_udf_index_indicator', 'pfdb.cm_factor_indicator', 'pfdb.pf_portfolio_indicator'],
+        ['fund_id', 'fund_id', 'fund_id', 'fund_id', 'fund_id', 'index_id', 'factor_id', 'portfolio_id'] );
+
+    return (SELECT * FROM tmp_universe u WHERE u.type = entity_type);
+
+}
+
+/*
+ *  根据不同类型的主体返回其风险指标的表名、字段名
+ * 
+ *  Example: get_risk_stats_table_description('HF');
+ */
+def get_risk_stats_table_description(entity_type) {
+
+    tmp_universe = table(100:0, 
+                         ['type', 'table_name', 'sec_id_col'],
+                         [STRING, STRING, STRING]);
+
+    // 分别对应:私募,公募,私有基金,市场指数,图译指数,私有指数,图译因子,组合
+    INSERT INTO tmp_universe VALUES ( 
+        ['HF', 'MF', 'CF', 'MI', 'FI', 'CI', 'FA', 'PF'],
+        ['mfdb.fund_risk_stats', 'mfdb.fund_risk_stats', 'pfdb.pf_cus_fund_risk_stats', 'mfdb.fund_risk_stats', 'mfdb.fund_risk_stats', 'pfdb.cm_udf_index_risk_stats', 'pfdb.cm_factor_risk_stats', 'pfdb.pf_portfolio_risk_stats'],
+        ['fund_id', 'fund_id', 'fund_id', 'fund_id', 'fund_id', 'index_id', 'factor_id', 'portfolio_id'] );
+
+    return (SELECT * FROM tmp_universe u WHERE u.type = entity_type);
+
+}
+
+/*
+ *  根据不同类型的主体返回其风险调整收益指标的表名、字段名
+ * 
+ *  Example: get_riskadjret_stats_table_description('HF');
+ */
+def get_riskadjret_stats_table_description(entity_type) {
+
+    tmp_universe = table(100:0, 
+                         ['type', 'table_name', 'sec_id_col'],
+                         [STRING, STRING, STRING]);
+
+    // 分别对应:私募,公募,私有基金,市场指数,图译指数,私有指数,图译因子,组合
+    INSERT INTO tmp_universe VALUES ( 
+        ['HF', 'MF', 'CF', 'MI', 'FI', 'CI', 'FA', 'PF'],
+        ['mfdb.fund_riskadjret_stats', 'mfdb.fund_riskadjret_stats', 'pfdb.pf_cus_fund_riskadjret_stats', 'mfdb.fund_riskadjret_stats', 'mfdb.fund_riskadjret_stats', 'pfdb.cm_udf_index_riskadjret_stats', 'pfdb.cm_factor_riskadjret_stats', 'pfdb.pf_portfolio_riskadjret_stats'],
+        ['fund_id', 'fund_id', 'fund_id', 'fund_id', 'fund_id', 'index_id', 'factor_id', 'portfolio_id'] );
+
+    return (SELECT * FROM tmp_universe u WHERE u.type = entity_type);
+
+}
+
+/*
+ *  Annulized multiple
+ */
+def get_annulization_multiple(freq) {
+
+  ret = 1;
+  
+  if (freq == 'd') {
+    ret = 252; // We have differences here between Java and DolphinDB, Java uses 365.25 days
+  } else if (freq == 'w') {
+    ret = 52;
+  } else if (freq == 'm') {
+    ret = 12;
+  } else if (freq == 'q') {
+    ret = 4;
+  } else if (freq == 's') {
+    ret = 2;
+  } else if (freq == 'a') {
+    ret = 1;
+  }
+  
+  return ret;
+}

+ 77 - 0
modules/task_portfolioPerformance.dos

@@ -0,0 +1,77 @@
+module fundit::task_portfolioPerformance
+
+use fundit::dataPuller;
+use fundit::dataSaver;
+use fundit::navCalculator;
+
+
+/*
+ *   多个组合计算全历史净值
+ * 
+ *   Example:calPortfolioNAV([143109, 145041]);
+ */
+def calPortfolioNAV(portfolio_ids) {
+
+    very_old_date = '1900-01-01';
+
+    port_info = get_portfolio_list_by_fund_nav_updatetime(portfolio_ids, very_old_date, true);
+    
+    tb_ret = fundit::navCalculator::cal_portfolio_nav(port_info);
+
+	
+}
+
+/*
+ *   [定时任务]批量计算组合净值
+ * 
+ * 
+ *   Example: calPortfolioPerformance(2024.10.28);
+ */
+def calPortfolioPerformance(date) {
+
+    rt = '';
+    // 2.5 min
+    tb_cal_ports = get_portfolio_list_by_fund_nav_updatetime(NULL, date, true);
+
+    if(tb_cal_ports.isVoid() || tb_cal_ports.size() == 0) return;
+
+    // 分批跑
+    i = 0;
+    batch_size = 1000;
+    tb_portfolio_nav = fundit::dataSaver::create_entity_nav();
+
+    all_portfolio_id = EXEC DISTINCT portfolio_id FROM tb_cal_ports;
+
+    do {
+
+        portfolio_info = SELECT * FROM tb_cal_ports
+                         WHERE portfolio_id IN all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)];
+
+        if(portfolio_info.isVoid() || portfolio_info.size() == 0) break;
+portfolio_info = select * from tb_cal_ports where portfolio_id in (143109, 145041)
+        // 16 sec
+        tb_ret = fundit::navCalculator::cal_portfolio_nav(portfolio_info);
+
+        INSERT INTO tb_portfolio_nav SELECT portfolio_id$STRING, price_date, nav FROM tb_ret;
+
+        i += batch_size;
+
+    } while (i <= tb_cal_ports.size());
+
+
+    if(! tb_portfolio_nav.isVoid() && tb_portfolio_nav.size() > 0) {
+
+        // save data to MySQL  (12 sec)
+        try {
+
+            tb_portfolio_nav.rename!('entity_id', 'portfolio_id');
+            save_and_sync(tb_portfolio_nav, 'raw_db.pf_portfolio_nav', 'raw_db.pf_portfolio_nav');
+
+        } catch(ex) {
+
+            //TODO: Log errors
+            rt = ex;
+        }
+    }
+}
+