Browse Source

完成dolphinDB写回的表 _dolphinDB 同步至正式表的逻辑(暂时用raw_db)

Joey 6 months ago
parent
commit
599934b20c
4 changed files with 34 additions and 52 deletions
  1. 15 10
      modules/dataSaver.dos
  2. 3 11
      modules/fund_risk_stats_dolphin.sql
  3. 5 21
      modules/sqlUtilities.dos
  4. 11 10
      modules/task_fundPerformance.dos

+ 15 - 10
modules/dataSaver.dos

@@ -7,7 +7,7 @@ use fundit::sqlUtilities
 /*
  * 存数据表到mySQL或本地dolphindb,原数据会被替代!
  *
- * save_table(tb_fund_performance, "mfdb.fund_performance", false)
+ * save_table(tb_fund_performance, "raw_db.fund_performance", false)
  */
 
 def save_table(tb, table_name, isToMySQL) {
@@ -15,11 +15,7 @@ def save_table(tb, table_name, isToMySQL) {
 
     if(isToMySQL == true) {
     
-        tb.addColumn("creatorid" "createtime" "updaterid" "updatetime" "isvalid", [INT, DATETIME, INT, DATETIME, INT])
-        
-        UPDATE tb SET creatorid = 888888, createtime = now(), updaterid = null, updatetime = null, isvalid = 1
-    
-        conn = connect_mysql()
+        conn = connect_mysql('raw_db');
         
         odbc::execute(conn, "TRUNCATE TABLE " + table_name + "_dolphin")
         
@@ -51,11 +47,18 @@ def save_hedge_fund_nav_to_local(tb_nav) {
  *  将数据存回MySQL并同步至正式表
  * 
  */
-def save_and_sync(table, table_name, sync_sp_name) {
+def save_and_sync(table, source_table_name, target_table_name) {
+
+    save_table(table, source_table_name, true);
+
+    s_query = "CALL raw_db.sp_sync_table_from_dolphin('" + source_table_name + "_dolphin', '" + target_table_name + "');"
+
+    conn = connect_mysql('raw_db');
+
+    odbc::execute(conn, s_query);
+
+    conn.close();
 
-    save_table(table, table_name, true);
-    
-    call_mysql_sp(sync_sp_name);
 }
 
 
@@ -105,6 +108,8 @@ def create_entity_indicator() {
 
 /*
  *   建表 XXX_risk_stats
+ *   
+ *   NOTE: mfdb.fund_risk_stats 中 maxdrawdown_6m 和 maxdrawdown_ytd 因不明原因分别是 6m_maxdrawdown 和 ytd_maxdrawdown 的虚拟列!
  */
 def create_entity_risk_stats() {
 

+ 3 - 11
modules/fund_risk_stats_dolphin.sql

@@ -11,7 +11,7 @@ CREATE TABLE `fund_risk_stats_dolphin` (
   `skewness_6m` decimal(22,6) DEFAULT NULL,
   `kurtosis_6m` decimal(22,6) DEFAULT NULL,
   `worstmonth_6m` decimal(22,6) DEFAULT NULL,
-  `maxdrawdown_6m` decimal(22,6) DEFAULT NULL,
+  `6m_maxdrawdown` decimal(22,6) DEFAULT NULL, -- mfdb正式表中 maxdrawdown_6m 是 6m_maxdrawdown 的虚拟列
   `stddev_1y` decimal(22,6) DEFAULT NULL,
   `downsidedev_1y` decimal(22,6) DEFAULT NULL,
   `alpha_1y` decimal(22,6) DEFAULT NULL,
@@ -74,7 +74,7 @@ CREATE TABLE `fund_risk_stats_dolphin` (
   `skewness_ytd` decimal(22,6) DEFAULT NULL,
   `kurtosis_ytd` decimal(22,6) DEFAULT NULL,
   `worstmonth_ytd` decimal(22,6) DEFAULT NULL,
-  `maxdrawdown_ytd` decimal(22,6) DEFAULT NULL,
+  `ytd_maxdrawdown` decimal(22,6) DEFAULT NULL, -- mfdb正式表中 maxdrawdown_ytd 是 ytd_maxdrawdown的虚拟列
   `stddev_incep` decimal(22,6) DEFAULT NULL,
   `downsidedev_incep` decimal(22,6) DEFAULT NULL,
   `alpha_incep` decimal(22,6) DEFAULT NULL,
@@ -84,13 +84,5 @@ CREATE TABLE `fund_risk_stats_dolphin` (
   `kurtosis_incep` decimal(22,6) DEFAULT NULL,
   `worstmonth_incep` decimal(22,6) DEFAULT NULL,
   `maxdrawdown_incep` decimal(22,6) DEFAULT NULL,
-  `creatorid` int DEFAULT NULL COMMENT '创建者Id,默认第一次创建者名称,创建后不变更',
-  `createtime` datetime DEFAULT NULL COMMENT '创建时间,默认第一次创建的getdate()时间',
-  `updaterid` int DEFAULT NULL COMMENT '修改者Id;第一次创建时与Creator值相同,修改时与修改人值相同',
-  `updatetime` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间;第一次创建时与CreatTime值相同,修改时与修改时间相同',
-  `isvalid` tinyint NOT NULL DEFAULT '1' COMMENT '记录的有效性;1-有效;0-无效;',
-
-  PRIMARY KEY (`fund_id`,`end_date`),
-  KEY (`end_date`),
-  KEY (`updatetime`)
+  PRIMARY KEY (`fund_id`,`end_date`)
 ) COMMENT='DolphinDB写回的基金历史风险指标表'

+ 5 - 21
modules/sqlUtilities.dos

@@ -2,12 +2,12 @@ module fundit::sqlUtilities
 
  
 /*
- *  MySQL 连接,使用前应确保 loadPlugin("ODBC")已经被运行过
+ *  MySQL dev server 连接,使用前应确保 loadPlugin("ODBC")已经被运行过
  *
  *  Create  20240711  使用ODBC连接MySQL数据库                                     Joey
  *  
  */
-def connect_mysql() {
+def connect_mysql(user='pf_user') {
 
     // 阿里云的mysql被魔改过,当前DolphinDB无法支持MySQL插件,只能用ODBC
     // loadPlugin("ODBC")
@@ -16,11 +16,12 @@ def connect_mysql() {
 
     // 使用Windows的ODBC数据源事先设置号的连接
     // conn = odbc::connect("Dsn=FunditDB-mfdb")
-    conn = odbc::connect("Dsn=FunditDB-dev-mfdb")
+    s = "Dsn=FunditDB-dev-" + user;
+    conn = odbc::connect(s);
 
 //    t = odbc::query(conn, "SELECT * FROM pfdb.pf_portfolio_nav LIMIT 100")
 
-    return conn
+    return conn;
 }
 
 /*
@@ -49,23 +50,6 @@ def load_table_from_local(server_name, table_name) {
 }
 
 /*
- *  调用mysql中无参数存储过程
- * 
- */
-def call_mysql_sp(store_procedure) {
-
-    s_query = "CALL " + store_procedure + "();"
-
-    conn = connect_mysql()
-
-    t = odbc::query(conn, s_query)
-
-    conn.close()
-
-    return t
-}
-
-/*
  *   未知形态的id转为MySQL需要的的逗号分隔字符串
  * 
  *   Example: ids_to_string("'a','b','c'");

+ 11 - 10
modules/task_fundPerformance.dos

@@ -80,7 +80,7 @@ def generate_entity_risk_stats(entity_info, indicators, isToMySQL, mutable entit
     if(isToMySQL) {
 
         t = SELECT entity_id, end_date, std_dev_a AS stddev_6m, ds_dev_a AS downsidedev_6m, alpha_a AS alpha_6m, winrate AS winrate_6m, beta AS beta_6m,
-                                        skewness AS skewness_6m, kurtosis AS kurtosis_6m, wrst_month AS worstmonth_6m, drawdown AS maxdrawdown_6m
+                                        skewness AS skewness_6m, kurtosis AS kurtosis_6m, wrst_month AS worstmonth_6m, drawdown AS maxdrawdown_6m // mfdb中的真实字段名是 6m_maxdrawdown
             FROM indicators['PBI-6M'] AS ind
             INNER JOIN entity_info fi ON ind.entity_id = fi.entity_id
             WHERE ind.end_date >= fi.price_date.month(); // 过滤掉不必更新的旧记录
@@ -117,7 +117,7 @@ def generate_entity_risk_stats(entity_info, indicators, isToMySQL, mutable entit
         
         UPDATE t
         SET stddev_ytd = std_dev_a, downsidedev_ytd = ds_dev_a, alpha_ytd = alpha_a, winrate_ytd = winrate, beta_ytd = beta,
-            skewness_ytd = skewness, kurtosis_ytd = kurtosis, worstmonth_ytd = wrst_month, maxdrawdown_ytd = drawdown
+            skewness_ytd = skewness, kurtosis_ytd = kurtosis, worstmonth_ytd = wrst_month, maxdrawdown_ytd = drawdown // mfdb中的真实字段名是 ytd_maxdrawdown
         FROM ej(t, indicators['PBI-YTD'], ['entity_id', 'end_date']);
         
         UPDATE t
@@ -371,7 +371,7 @@ def generate_entity_latest_performance(entity_info, perf_latest, isToMySQL, muta
  *   @param entityType <STRING>: 'MF', 'HF'...
  *   @param date <DATETIME>: 净值更新时间
  * 
- *   TODO: 目前收益表在MySQL中,所以需要将计算的最新收益与MySQL中的历史数据合并
+ *   TODO: 当月indicator每日计算是不是没有意义?
  */
 def calFundPerformance(entityType, date) {
 
@@ -456,23 +456,24 @@ def calFundPerformance(entityType, date) {
         try {
 
             chg_columns_for_mysql(tb_fund_performance, 'fund_id');
-            save_and_sync(tb_fund_performance, 'raw_db.fund_performance', 'raw_db.sync_fund_performance_from_dolphin');
+            save_and_sync(tb_fund_performance, 'raw_db.fund_performance', 'raw_db.fund_performance');
 
             chg_columns_for_mysql(tb_fund_indicator, 'fund_id');
-            save_table(tb_fund_indicator, 'raw_db.fund_indicator', true);
+            save_and_sync(tb_fund_indicator, 'raw_db.fund_indicator', 'raw_db.fund_indicator');
 
             chg_columns_for_mysql(tb_fund_risk_stats, 'fund_id');
-            save_table(tb_fund_risk_stats, 'raw_db.fund_risk_stats', true);
+            // mfdb.fund_performance 表中 maxdrawdown_6m & maxdrawdown_ytd 是虚拟列,这里用数据列顺序强行写入真实列 6m_maxdrawdown & ytd_maxdrawdown (DolphinDB 不允许字段名以数字开头)
+            save_and_sync(tb_fund_risk_stats, 'raw_db.fund_risk_stats', 'raw_db.fund_risk_stats');
 
             chg_columns_for_mysql(tb_fund_riskadjret_stats, 'fund_id');
-            save_table(tb_fund_riskadjret_stats, 'raw_db.fund_riskadjret_stats', true);
+            save_and_sync(tb_fund_riskadjret_stats, 'raw_db.fund_riskadjret_stats', 'raw_db.fund_riskadjret_stats');
 
             chg_columns_for_mysql(tb_fund_style_stats, 'fund_id');
-            save_table(tb_fund_style_stats, 'raw_db.fund_style_stats', true);
+            save_and_sync(tb_fund_style_stats, 'raw_db.fund_style_stats', 'raw_db.fund_style_stats');
 
-            save_table(tb_fund_performance_weekly, 'raw_db.fund_performance_weekly', true);
+            save_and_sync(tb_fund_performance_weekly, 'raw_db.fund_performance_weekly', 'raw_db.fund_performance_weekly');
 
-            save_table(tb_fund_latest_performance, 'raw_db.fund_latest_performance', true);
+            save_and_sync(tb_fund_latest_performance, 'raw_db.fund_latest_performance', 'raw_db.fund_latest_nav_performance');
 
         } catch(ex) {