Loader.c 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015
  1. /*
  2. * Licensed Materials - Property of HCL
  3. *
  4. * IBM Informix DataBlade Module
  5. * (C) Copyright International Business Machines Corporation 2002.
  6. * (c) Copyright HCL Technologies Ltd. 2017. All Rights Reserved.
  7. *
  8. * COPYRIGHT LICENSE:
  9. * This information contains sample application programs in source language,
  10. * which illustrate programming techniques on various operating platforms.
  11. * You may copy, modify, and distribute these sample programs in any form
  12. * without payment to IBM, for the purposes of developing, using, marketing
  13. * or distributing application programs conforming to the application
  14. * programming interface for the operating platform for which the sample
  15. * programs are written. These examples have not been thoroughly tested under
  16. * all conditions. IBM, therefore, cannot guarantee or imply reliability,
  17. * serviceability, or function of these programs. You may copy, modify, and
  18. * distribute these sample programs in any form without payment to IBM for
  19. * the purposes of developing, using, marketing, or distributing application
  20. * programs conforming to IBM's application programming interfaces.
  21. * Each copy or any portion of these sample programs or any derivative work,
  22. * must include a copyright notice as follows:
  23. * © (your company name) (year). Portions of this code are derived from
  24. * IBM Corp. Sample Programs. © Copyright IBM Corp. (enter the year or
  25. * years). All rights reserved.
  26. *
  27. */
  28. /* The loader procedure in this file loads time variant data from a
  29. * file into a table containing time series. It assumes that the
  30. * table has already been populated with the time invariant data. If
  31. * the table already has time series data the new data will overwrite
  32. * the old data or be appended to the existing time series depending
  33. * on the time stamps.
  34. *
  35. * SETUP:
  36. *
  37. * create procedure TSIncLoad( table_name lvarchar,
  38. * file_name lvarchar,
  39. * calendar_name lvarchar,
  40. * origin datetime year to day,
  41. * threshold integer,
  42. * regular boolean,
  43. * container_name lvarchar,
  44. * nelems integer)
  45. * external name '$INFORMIXDIR/extend/timeseries/Loader.bld(TSIncLoad)'
  46. * language C;
  47. *
  48. * create row type day_info (
  49. * ValueDate DATETIME year to day,
  50. * carryover char(1),
  51. * spread integer,
  52. * pricing_bmk_id integer,
  53. * price float,
  54. * yield float,
  55. * priority char(1) );
  56. *
  57. * create table corporates (
  58. * Secid integer UNIQUE,
  59. * .
  60. * .
  61. * .
  62. * series TimeSeries(day_info));
  63. *
  64. * create index corporatesIdx on corporates( Secid);
  65. *
  66. * Any name may be used for the "corporates" table. The corporates table
  67. * may have any number of columns in addition to the Secid and series columns.
  68. *
  69. * Each line of the data file has the following format:
  70. * Secid year-mon-day carryover spread pricing_bmk_id price yield priority
  71. *
  72. * For instance:
  73. * 25000006 1986-1-7 m 2 12 2.2000000000 22.2 6
  74. *
  75. * The loader is invoked with a SQL statement like:
  76. * execute procedure TSIncLoad( 'corporates',
  77. * 'data_file_name',
  78. * 'cal_name',
  79. * '1980-1-1',
  80. * 20,
  81. * 't',
  82. * 'container-name');
  83. *
  84. */
  85. #include <ctype.h>
  86. #include <fcntl.h>
  87. #include <stdio.h>
  88. #include <stdlib.h>
  89. #include <string.h>
  90. #include "datetime.h"
  91. #include "mi.h"
  92. #include "tseries.h"
  93. #define DAY_INFO_TYPE_NAME "day_info"
  94. #define DAILY_COL_COUNT 7
  95. typedef struct
  96. {
  97. mi_integer fd;
  98. mi_unsigned_integer flags;
  99. #define LDBUF_LAST_CHAR_EOL 0x1
  100. mi_integer buf_index;
  101. mi_integer buf_len;
  102. mi_integer line_no;
  103. mi_lvarchar *file_name;
  104. mi_string data[2048];
  105. }
  106. FILE_BUF;
  107. #define STREAM_EOF (-1)
  108. typedef struct sec_entry_s
  109. {
  110. mi_integer sec_id;
  111. ts_tsdesc *tsdesc;
  112. mi_boolean in_row; /* Indicates whether the time series is stored in row. */
  113. mi_boolean created; /* Indicates whether we just created this timeseries */
  114. struct sec_entry_s *next;
  115. }
  116. sec_entry_t;
  117. typedef struct
  118. {
  119. mi_lvarchar *table_name;
  120. MI_TYPEID ts_typeid; /* The type id of timeseries(day_info) */
  121. mi_string *calendar_name;
  122. mi_datetime *origin;
  123. mi_integer threshold;
  124. mi_boolean regular;
  125. mi_string *container_name;
  126. mi_integer nelems; /* For created time series. */
  127. mi_integer hash_size;
  128. MI_CONNECTION *conn;
  129. sec_entry_t **hash;
  130. /* Value buffers -- only allocated once. */
  131. MI_DATUM col_data[ DAILY_COL_COUNT];
  132. mi_boolean col_is_null[ DAILY_COL_COUNT];
  133. char *carryover;
  134. char *priority;
  135. mi_double_precision price, yield;
  136. mi_integer instances_created;
  137. /* A count of the number of tsinstancetable entries added. Used to decide
  138. * when to update statistics on this table.
  139. */
  140. MI_SAVE_SET *save_set;
  141. }
  142. loader_context_t;
  143. /*
  144. ***************************************************************************
  145. * name: init_context
  146. *
  147. * purpose: Initialize the loader context structure.
  148. *
  149. * notes:
  150. ***************************************************************************
  151. */
  152. static void
  153. init_context( mi_lvarchar *table_name,
  154. mi_lvarchar *calendar_name,
  155. mi_datetime *origin,
  156. mi_integer threshold,
  157. mi_boolean regular,
  158. mi_lvarchar *container_name,
  159. mi_integer nelems,
  160. loader_context_t *context_ptr)
  161. {
  162. mi_string buf[256];
  163. mi_integer table_name_len = mi_get_varlen( table_name);
  164. MI_ROW *row = NULL;
  165. MI_DATUM retbuf = 0;
  166. mi_integer retlen = 0;
  167. mi_lvarchar *typename = NULL;
  168. MI_TYPEID *typeid = NULL;
  169. mi_integer err = 0;
  170. if( table_name_len > IDENTSIZE)
  171. mi_db_error_raise( NULL, MI_EXCEPTION, "The table name is too long");
  172. memset( context_ptr, 0, sizeof( *context_ptr));
  173. context_ptr->conn = mi_open( NULL, NULL, NULL);
  174. typename = mi_string_to_lvarchar( "timeseries(" DAY_INFO_TYPE_NAME ")");
  175. typeid = mi_typename_to_id( context_ptr->conn, typename);
  176. mi_var_free( typename);
  177. if( NULL == typeid)
  178. mi_db_error_raise( NULL, MI_EXCEPTION,
  179. "Type timeseries(" DAY_INFO_TYPE_NAME ") not defined.");
  180. context_ptr->ts_typeid = *typeid;
  181. context_ptr->table_name = table_name;
  182. context_ptr->calendar_name = mi_lvarchar_to_string( calendar_name);
  183. context_ptr->origin = origin;
  184. context_ptr->threshold = threshold;
  185. context_ptr->regular = regular;
  186. context_ptr->container_name = mi_lvarchar_to_string( container_name);
  187. context_ptr->nelems = nelems;
  188. /* Use the size (count) of the table as the hash table size. */
  189. sprintf( buf, "select count(*) from %.*s;",
  190. table_name_len,
  191. mi_get_vardata( table_name));
  192. if( MI_OK != mi_exec( context_ptr->conn, buf, MI_QUERY_BINARY))
  193. mi_db_error_raise( NULL, MI_EXCEPTION, "mi_exec failed");
  194. if( MI_ROWS != mi_get_result( context_ptr->conn))
  195. {
  196. sprintf( buf, "Could not get size of %.*s table.",
  197. table_name_len,
  198. mi_get_vardata( table_name));
  199. mi_db_error_raise( NULL, MI_EXCEPTION, buf);
  200. }
  201. if( NULL == (row = mi_next_row( context_ptr->conn, &err)))
  202. mi_db_error_raise( NULL, MI_EXCEPTION, "mi_next_row failed");
  203. if( MI_NORMAL_VALUE != mi_value( row, 0, &retbuf, &retlen)
  204. || 0 != dectoint( (mi_decimal *) retbuf, (int *)&context_ptr->hash_size)
  205. || context_ptr->hash_size == 0)
  206. context_ptr->hash_size = 256;
  207. (void) mi_query_finish( context_ptr->conn);
  208. context_ptr->hash
  209. = mi_zalloc( context_ptr->hash_size*sizeof( *context_ptr->hash));
  210. context_ptr->col_data[1] = (MI_DATUM) mi_new_var(1); /* carryover */
  211. context_ptr->col_data[6] = (MI_DATUM) mi_new_var(1); /* priority */
  212. if( NULL == context_ptr->hash
  213. || NULL == context_ptr->col_data[1]
  214. || NULL == context_ptr->col_data[6])
  215. mi_db_error_raise( NULL, MI_EXCEPTION, "Not enough memory.");
  216. context_ptr->carryover
  217. = mi_get_vardata( (mi_lvarchar *) context_ptr->col_data[1]);
  218. context_ptr->col_data[4] = (MI_DATUM) &context_ptr->price;
  219. context_ptr->col_data[5] = (MI_DATUM) &context_ptr->yield;
  220. context_ptr->priority
  221. = mi_get_vardata( (mi_lvarchar *) context_ptr->col_data[6]);
  222. context_ptr->save_set = mi_save_set_create( context_ptr->conn);
  223. } /* End of init_context. */
  224. /*
  225. ***************************************************************************
  226. * name: close_context
  227. *
  228. * purpose: Close the context structure. Free up all allocated memory.
  229. *
  230. ***************************************************************************
  231. */
  232. static void
  233. close_context( loader_context_t *context_ptr)
  234. {
  235. mi_free( context_ptr->hash);
  236. context_ptr->hash = NULL;
  237. context_ptr->hash_size = 0;
  238. mi_var_free( (mi_lvarchar *) context_ptr->col_data[1]);
  239. mi_var_free( (mi_lvarchar *) context_ptr->col_data[6]);
  240. context_ptr->col_data[1] = context_ptr->col_data[6] = 0;
  241. context_ptr->carryover = context_ptr->priority = NULL;
  242. (void) mi_save_set_destroy( context_ptr->save_set);
  243. context_ptr->save_set = NULL;
  244. (void) mi_close( context_ptr->conn);
  245. mi_free( context_ptr->calendar_name);
  246. context_ptr->calendar_name = NULL;
  247. mi_free( context_ptr->container_name);
  248. context_ptr->container_name = NULL;
  249. context_ptr->conn = NULL;
  250. } /* End of close_context. */
  251. /*
  252. ***************************************************************************
  253. * name: update_series
  254. *
  255. * purpose: Update all the time series back into the table.
  256. *
  257. * returns:
  258. *
  259. * notes:
  260. ***************************************************************************
  261. */
  262. static void
  263. update_series( loader_context_t *context_ptr)
  264. {
  265. mi_integer i = 0;
  266. register struct sec_entry_s *entry_ptr = NULL;
  267. struct sec_entry_s *next_entry_ptr = NULL;
  268. MI_STATEMENT *statement = NULL;
  269. char buf[256];
  270. mi_integer rc = 0;
  271. MI_DATUM values[2] = {0, 0};
  272. mi_integer lengths[2] = {-1, sizeof( mi_integer)};
  273. static const mi_integer nulls[2] = {0, 0};
  274. static const mi_string * const types[2]
  275. = {"timeseries(day_info)", "integer"};
  276. mi_unsigned_integer yield_count = 0;
  277. sprintf( buf, "update %.*s set series = ? where Secid = ?;",
  278. mi_get_varlen( context_ptr->table_name),
  279. mi_get_vardata( context_ptr->table_name));
  280. statement = mi_prepare( context_ptr->conn, buf, NULL);
  281. if( NULL == statement)
  282. mi_db_error_raise( NULL, MI_EXCEPTION, "mi_prepare failed");
  283. /* Look at all the entries in the hash table. */
  284. for( i = context_ptr->hash_size - 1; 0 <= i; i--)
  285. {
  286. for( entry_ptr = context_ptr->hash[i];
  287. NULL != entry_ptr;
  288. entry_ptr = next_entry_ptr)
  289. {
  290. if( NULL != entry_ptr->tsdesc && MI_TRUE == entry_ptr->in_row)
  291. {
  292. yield_count++;
  293. if( 0 == (yield_count & 0x3f))
  294. {
  295. if( mi_interrupt_check())
  296. mi_db_error_raise( NULL, MI_EXCEPTION, "Load aborted.");
  297. mi_yield();
  298. }
  299. values[0] = ts_get_ts( entry_ptr->tsdesc);
  300. values[1] = (MI_DATUM) entry_ptr->sec_id;
  301. lengths[0] = mi_get_varlen( ts_get_ts( entry_ptr->tsdesc));
  302. if( mi_exec_prepared_statement( statement,
  303. MI_BINARY,
  304. 1,
  305. 2,
  306. values,
  307. lengths,
  308. (mi_integer *) nulls,
  309. (mi_char1 **) types,
  310. 0,
  311. NULL)
  312. != MI_OK)
  313. mi_db_error_raise( NULL, MI_EXCEPTION,
  314. "mi_exec_prepared_statement(update) failed");
  315. }
  316. next_entry_ptr = entry_ptr->next;
  317. }
  318. }
  319. } /* End of update_series. */
  320. /*
  321. ***************************************************************************
  322. * name: insert_series
  323. *
  324. * purpose: Update all the time series back into the table.
  325. *
  326. * returns:
  327. *
  328. * notes:
  329. ***************************************************************************
  330. */
  331. static void
  332. insert_series( loader_context_t *context_ptr)
  333. {
  334. mi_integer i = 0;
  335. register struct sec_entry_s *entry_ptr = NULL;
  336. struct sec_entry_s *next_entry_ptr = NULL;
  337. MI_STATEMENT *statement = NULL;
  338. char buf[256];
  339. mi_integer rc = 0;
  340. MI_DATUM values[2] = {0, 0};
  341. mi_integer lengths[2] = {-1, sizeof( mi_integer)};
  342. static const mi_integer nulls[2] = {0, 0};
  343. static const mi_string *types[2]
  344. = {"timeseries(day_info)", "integer"};
  345. mi_unsigned_integer yield_count = 0;
  346. sprintf( buf, "insert into %.*s (series, Secid) values (?, ?);",
  347. mi_get_varlen( context_ptr->table_name),
  348. mi_get_vardata( context_ptr->table_name));
  349. statement = mi_prepare( context_ptr->conn, buf, NULL);
  350. if( NULL == statement)
  351. mi_db_error_raise( NULL, MI_EXCEPTION, "mi_prepare failed");
  352. /* Look at all the entries in the hash table. */
  353. for( i = context_ptr->hash_size - 1; 0 <= i; i--)
  354. {
  355. for( entry_ptr = context_ptr->hash[i];
  356. NULL != entry_ptr;
  357. entry_ptr = next_entry_ptr)
  358. {
  359. if( NULL != entry_ptr->tsdesc && MI_TRUE == entry_ptr->created)
  360. {
  361. yield_count++;
  362. if( 0 == (yield_count & 0x3f))
  363. {
  364. if( mi_interrupt_check())
  365. mi_db_error_raise( NULL, MI_EXCEPTION, "Load aborted.");
  366. mi_yield();
  367. }
  368. values[0] = ts_get_ts( entry_ptr->tsdesc);
  369. values[1] = (MI_DATUM) entry_ptr->sec_id;
  370. lengths[0] = mi_get_varlen( ts_get_ts( entry_ptr->tsdesc));
  371. if( mi_exec_prepared_statement( statement,
  372. MI_BINARY,
  373. 1,
  374. 2,
  375. values,
  376. lengths,
  377. (mi_integer *) nulls,
  378. (mi_char1 **) types,
  379. 0,
  380. NULL)
  381. != MI_OK)
  382. mi_db_error_raise( NULL, MI_EXCEPTION,
  383. "mi_exec_prepared_statement(update) failed");
  384. ts_close( entry_ptr->tsdesc);
  385. }
  386. next_entry_ptr = entry_ptr->next;
  387. mi_free( entry_ptr);
  388. }
  389. context_ptr->hash[i] = NULL;
  390. }
  391. } /* End of insert_series. */
  392. /*
  393. ***************************************************************************
  394. * name: open_buf
  395. *
  396. * purpose: Open a file for reading and attach it to a buffer.
  397. *
  398. ***************************************************************************
  399. */
  400. static void
  401. open_buf( mi_lvarchar *file_name,
  402. FILE_BUF *buf_ptr)
  403. {
  404. mi_string *file_name_str = mi_lvarchar_to_string( file_name);
  405. memset( buf_ptr, 0, sizeof( *buf_ptr));
  406. buf_ptr->fd = mi_file_open( file_name_str, O_RDONLY, 0);
  407. mi_free( file_name_str);
  408. buf_ptr->file_name = file_name;
  409. if( MI_ERROR == buf_ptr->fd)
  410. {
  411. char buf[356];
  412. mi_integer name_len = (256 < mi_get_varlen( file_name))
  413. ? 256 : mi_get_varlen( file_name);
  414. sprintf( buf, "mi_file_open(%.*s) failed",
  415. name_len, mi_get_vardata( file_name));
  416. mi_db_error_raise( NULL, MI_EXCEPTION, buf);
  417. }
  418. buf_ptr->buf_index = 0;
  419. buf_ptr->buf_len = 0;
  420. buf_ptr->line_no = 1;
  421. } /* End of open_buf. */
  422. /*
  423. ***************************************************************************
  424. * name: get_char
  425. *
  426. * purpose: Get the next character from a buffered file
  427. *
  428. * returns: The character or STREAM_EOF
  429. *
  430. ***************************************************************************
  431. */
  432. static mi_integer
  433. get_char( FILE_BUF *buf_ptr)
  434. {
  435. register mi_integer c = STREAM_EOF;
  436. if( buf_ptr->buf_index >= buf_ptr->buf_len)
  437. {
  438. buf_ptr->buf_index = 0;
  439. buf_ptr->buf_len = mi_file_read( buf_ptr->fd,
  440. buf_ptr->data,
  441. sizeof( buf_ptr->data));
  442. if( MI_ERROR == buf_ptr->buf_len)
  443. {
  444. char buf[356];
  445. mi_integer name_len = (256 < mi_get_varlen( buf_ptr->file_name))
  446. ? 256 : mi_get_varlen( buf_ptr->file_name);
  447. sprintf( buf, "mi_file_read(%.*s) failed",
  448. name_len, mi_get_vardata(buf_ptr->file_name));
  449. mi_db_error_raise( NULL, MI_EXCEPTION, buf);
  450. }
  451. if( 0 == buf_ptr->buf_len)
  452. return( STREAM_EOF);
  453. }
  454. /* Increment buf_ptr->line_no until we have started on the next line, not
  455. * when the newline character is seen.
  456. */
  457. if( buf_ptr->flags & LDBUF_LAST_CHAR_EOL)
  458. {
  459. buf_ptr->line_no++;
  460. buf_ptr->flags &= ~LDBUF_LAST_CHAR_EOL;
  461. }
  462. c = buf_ptr->data[ buf_ptr->buf_index++];
  463. if( '\n' == c)
  464. buf_ptr->flags |= LDBUF_LAST_CHAR_EOL;
  465. return( c);
  466. } /* End of get_char. */
  467. /*
  468. ***************************************************************************
  469. * name: close_buf
  470. *
  471. * purpose: Close a file attached to a buffer.
  472. *
  473. * notes:
  474. ***************************************************************************
  475. */
  476. static void
  477. close_buf( FILE_BUF *buf_ptr)
  478. {
  479. mi_file_close( buf_ptr->fd);
  480. buf_ptr->fd = MI_ERROR;
  481. buf_ptr->buf_index = 0;
  482. buf_ptr->buf_len = 0;
  483. buf_ptr->file_name = NULL;
  484. } /* End of close_buf. */
  485. /*
  486. ***************************************************************************
  487. * name: get_token
  488. *
  489. * purpose: Get the next token from an input stream.
  490. *
  491. * returns: The token in a buffer and the next character after the buffer
  492. *
  493. * notes: Assumes that the tokens are separated by white space.
  494. ***************************************************************************
  495. */
  496. static mi_integer
  497. get_token( FILE_BUF *buf_ptr,
  498. mi_string *token,
  499. size_t token_buf_len)
  500. {
  501. register mi_integer c = get_char( buf_ptr);
  502. register mi_integer i = 0;
  503. while( STREAM_EOF != c && isspace( c))
  504. c = get_char( buf_ptr);
  505. for( ;STREAM_EOF != c && c != ',' && c != '\n'; c = get_char( buf_ptr))
  506. {
  507. if( i >= token_buf_len - 1)
  508. {
  509. char err_buf[128];
  510. sprintf( err_buf, "Word is too long on line %d.", buf_ptr->line_no);
  511. mi_db_error_raise( NULL, MI_EXCEPTION, err_buf);
  512. }
  513. token[i++] = c;
  514. }
  515. token[i] = 0;
  516. return( c);
  517. } /* End of get_token. */
  518. /*
  519. ***************************************************************************
  520. * name: increment_instances_created
  521. *
  522. * purpose: Increment the instances_created field of the and update statistics
  523. * when it crosses a threshold. If the statistics for the
  524. * time series instance table were never updated then the server
  525. * would not use the index on the instance table, and time series
  526. * opens would be very slow.
  527. *
  528. * returns: nothing
  529. *
  530. * notes:
  531. ***************************************************************************
  532. */
  533. static void
  534. increment_instances_created( loader_context_t *context_ptr)
  535. {
  536. context_ptr->instances_created++;
  537. if( 50 != context_ptr->instances_created)
  538. return;
  539. (void) mi_exec( context_ptr->conn,
  540. "update statistics high for table tsinstancetable( id);",
  541. MI_QUERY_BINARY);
  542. } /* End of increment_instances_created. */
  543. /*
  544. ***************************************************************************
  545. * name: get_sec_entry
  546. *
  547. * purpose: Get the security entry for a security ID
  548. *
  549. * returns: A pointer to security entry
  550. *
  551. * notes: If the entry is not found in the hash table then the security is
  552. * looked up in the table and a new entry made in the hash table.
  553. * A warning message will be emitted if the security ID cannot be
  554. * found. In this case the security entry will have a NULL tsdesc.
  555. ***************************************************************************
  556. */
  557. static sec_entry_t *
  558. get_sec_entry( loader_context_t *context_ptr,
  559. mi_integer sec_id,
  560. mi_integer line_no)
  561. {
  562. mi_unsigned_integer i
  563. = ((mi_unsigned_integer) sec_id) % context_ptr->hash_size;
  564. sec_entry_t *entry_ptr = context_ptr->hash[i];
  565. mi_string buf[256];
  566. mi_integer rc = 0;
  567. /* Look the security ID up in the hash table. */
  568. for( ; NULL != entry_ptr; entry_ptr = entry_ptr->next)
  569. {
  570. if( sec_id == entry_ptr->sec_id)
  571. return( entry_ptr);
  572. }
  573. /* This is the first time this security ID has been seen. */
  574. entry_ptr = mi_zalloc( sizeof( *entry_ptr));
  575. entry_ptr->sec_id = sec_id;
  576. entry_ptr->next = context_ptr->hash[i];
  577. context_ptr->hash[i] = entry_ptr;
  578. /* Look up the security ID in the database table. */
  579. sprintf( buf,
  580. "select series from %.*s where Secid = %d;",
  581. mi_get_varlen( context_ptr->table_name),
  582. mi_get_vardata( context_ptr->table_name),
  583. sec_id);
  584. if( MI_OK != mi_exec( context_ptr->conn, buf, MI_QUERY_BINARY))
  585. mi_db_error_raise( NULL, MI_EXCEPTION, "mi_exec failed.");
  586. rc = mi_get_result( context_ptr->conn);
  587. if( MI_NO_MORE_RESULTS == rc)
  588. {
  589. sprintf( buf, "Security %d (line %d) not in %.*s.",
  590. sec_id, line_no,
  591. mi_get_varlen( context_ptr->table_name),
  592. mi_get_vardata( context_ptr->table_name));
  593. mi_db_error_raise( NULL, MI_MESSAGE, buf);
  594. /* Mi_db_error_raise returns after raising messages of type MI_MESSAGE.
  595. */
  596. }
  597. else if( MI_ROWS != rc)
  598. mi_db_error_raise( NULL, MI_EXCEPTION, "mi_get_result failed.");
  599. else
  600. {
  601. mi_integer err = 0;
  602. MI_ROW *row = mi_next_row( context_ptr->conn, &err);
  603. MI_DATUM ts_datum = 0;
  604. mi_integer retlen = 0;
  605. /* Save the row so that the time series column will not be erased when
  606. * the query is finished.
  607. */
  608. if( NULL != row
  609. && MI_NORMAL_VALUE == mi_value( row, 0, &ts_datum, &retlen))
  610. {
  611. if( NULL == (row = mi_save_set_insert( context_ptr->save_set,
  612. row)))
  613. mi_db_error_raise( NULL, MI_EXCEPTION,
  614. "mi_save_set_insert failed");
  615. }
  616. if( NULL != row)
  617. rc = mi_value( row, 0, &ts_datum, &retlen);
  618. if( MI_NORMAL_VALUE != rc && MI_NULL_VALUE != rc)
  619. {
  620. if( 0 != err)
  621. {
  622. sprintf( buf, "Look up of security ID %d in %.*s failed.",
  623. sec_id,
  624. mi_get_varlen( context_ptr->table_name),
  625. mi_get_vardata( context_ptr->table_name));
  626. mi_db_error_raise( NULL, MI_EXCEPTION, buf);
  627. }
  628. else
  629. {
  630. sprintf( buf, "Security %d (line %d) not in %.*s.",
  631. sec_id, line_no,
  632. mi_get_varlen( context_ptr->table_name),
  633. mi_get_vardata( context_ptr->table_name));
  634. mi_db_error_raise( NULL, MI_MESSAGE, buf);
  635. return( entry_ptr);
  636. }
  637. }
  638. if( MI_NULL_VALUE != rc)
  639. entry_ptr->in_row = (TS_IS_INCONTAINER( (ts_timeseries *) ts_datum)
  640. != 0);
  641. else
  642. {
  643. /* No time series has been created for this security yet.
  644. * Start one.
  645. */
  646. ts_datum = ts_create( context_ptr->conn,
  647. context_ptr->calendar_name,
  648. context_ptr->origin,
  649. context_ptr->threshold,
  650. context_ptr->regular ? 0 : TS_CREATE_IRR,
  651. &context_ptr->ts_typeid,
  652. context_ptr->nelems,
  653. context_ptr->container_name);
  654. entry_ptr->in_row = MI_TRUE;
  655. entry_ptr->created = MI_TRUE;
  656. if( entry_ptr->in_row)
  657. increment_instances_created( context_ptr);
  658. }
  659. entry_ptr->tsdesc = ts_open( context_ptr->conn,
  660. ts_datum,
  661. &context_ptr->ts_typeid,
  662. 0);
  663. }
  664. return( entry_ptr);
  665. } /* End of get_sec_entry. */
  666. /*
  667. ***************************************************************************
  668. * name: is_null
  669. *
  670. * purpose: Determine whether a token represents a NULL
  671. *
  672. * returns: 1 if so, 0 if not
  673. *
  674. ***************************************************************************
  675. */
  676. static int
  677. is_null( register mi_string *token)
  678. {
  679. return( ('N' == token[0] || 'n' == token[0])
  680. && ('U' == token[1] || 'u' == token[1])
  681. && ('L' == token[2] || 'l' == token[2])
  682. && ('L' == token[3] || 'l' == token[3])
  683. && 0 == token[4]);
  684. } /* End of is_null. */
  685. /*
  686. ***************************************************************************
  687. * name: read_day_data
  688. *
  689. * purpose: Read in the daily data for one security
  690. *
  691. * returns: Fills in the timestamp structure, the col_data and col_is_null
  692. * arrays.
  693. *
  694. * notes: Assumes that the col_is_null array is initialized to all TRUE.
  695. ***************************************************************************
  696. */
  697. static void
  698. read_day_data( loader_context_t *context_ptr,
  699. FILE_BUF *buf_ptr,
  700. mi_string *token,
  701. size_t token_buf_len,
  702. mi_datetime *tstamp_ptr)
  703. {
  704. register mi_integer i = 0;
  705. register mi_integer c;
  706. /* ValueDate DATETIME year to day*/
  707. c = get_token( buf_ptr, token, token_buf_len);
  708. if( STREAM_EOF== c && 0 == strlen( token)
  709. || '\n' == c)
  710. return;
  711. tstamp_ptr->dt_qual = TU_DTENCODE( TU_YEAR, TU_F5);
  712. if( is_null( token))
  713. tstamp_ptr->dt_dec.dec_pos = DECPOSNULL;
  714. else
  715. {
  716. if( 0 == dtcvasc( token, tstamp_ptr))
  717. {
  718. context_ptr->col_is_null[0] = MI_FALSE;
  719. context_ptr->col_data[0] = (MI_DATUM) tstamp_ptr;
  720. }
  721. else
  722. {
  723. mi_string err_buf[128];
  724. sprintf( err_buf, "Illegal date on line %d", buf_ptr->line_no);
  725. mi_db_error_raise( NULL, MI_MESSAGE, err_buf);
  726. }
  727. }
  728. /* carryover char(1) */
  729. c = get_token( buf_ptr, token, token_buf_len);
  730. if( STREAM_EOF== c && 0 == strlen( token) || '\n' == c)
  731. return;
  732. if( ! is_null( token))
  733. {
  734. *(context_ptr->carryover) = token[0];
  735. context_ptr->col_is_null[1] = MI_FALSE;
  736. }
  737. /* spread integer,
  738. * pricing_bmk_id integer
  739. */
  740. for( i = 2; i < 4; i++)
  741. {
  742. c = get_token( buf_ptr, token, token_buf_len);
  743. if( STREAM_EOF== c && 0 == strlen( token)
  744. || '\n' == c)
  745. return;
  746. if( ! is_null( token))
  747. {
  748. context_ptr->col_data[i] = (MI_DATUM) atoi( token);
  749. context_ptr->col_is_null[i] = MI_FALSE;
  750. }
  751. }
  752. /* price float,
  753. * yield float
  754. */
  755. for( i = 4; i < 6; i++)
  756. {
  757. c = get_token( buf_ptr, token, token_buf_len);
  758. if( STREAM_EOF== c && 0 == strlen( token)
  759. || '\n' == c)
  760. return;
  761. if( ! is_null( token))
  762. {
  763. *((double *) context_ptr->col_data[i]) = atof( token);
  764. context_ptr->col_is_null[i] = MI_FALSE;
  765. }
  766. }
  767. /* priority char(1) */
  768. c = get_token( buf_ptr, token, token_buf_len);
  769. if( (STREAM_EOF == c || '\n' == c) && 0 == strlen( token))
  770. return;
  771. if( ! is_null( token))
  772. {
  773. *(context_ptr->priority) = token[0];
  774. context_ptr->col_is_null[6] = MI_FALSE;
  775. }
  776. } /* End of read_day_data. */
  777. /*
  778. ***************************************************************************
  779. * name: read_line
  780. *
  781. * purpose: Read a line from the file, fetch the time series descriptor
  782. * corresponding to the Secid, create a time series element for
  783. * the line, and convert the date into an mi_datetime structure
  784. *
  785. * returns: 1 if there was more data in the file,
  786. * 0 if the end of the file was found
  787. *
  788. * notes: Creates a new time series if the series column for the Secid is
  789. * NULL.
  790. ***************************************************************************
  791. */
  792. int
  793. read_line( loader_context_t *context_ptr,
  794. FILE_BUF *buf_ptr,
  795. ts_tsdesc **tsdesc_ptr,
  796. ts_tselem *day_elem_ptr,
  797. int *null_line,
  798. mi_datetime *tstamp_ptr,
  799. sec_entry_t **sec_entry_ptr_ptr)
  800. {
  801. mi_integer sec_id = -1;
  802. sec_entry_t *sec_entry_ptr = NULL;
  803. mi_string token[256];
  804. mi_integer c = 0; /* Next character from file. */
  805. mi_integer i = 0;
  806. *sec_entry_ptr_ptr = NULL;
  807. *null_line = 1;
  808. for( i = 0; i < DAILY_COL_COUNT; i++)
  809. context_ptr->col_is_null[ i] = MI_TRUE;
  810. c = get_token( buf_ptr, token, sizeof( token));
  811. if( STREAM_EOF== c && 0 == strlen( token))
  812. return( 0);
  813. sec_id = atoi( token);
  814. *sec_entry_ptr_ptr = sec_entry_ptr
  815. = get_sec_entry( context_ptr, sec_id, buf_ptr->line_no);
  816. read_day_data( context_ptr,
  817. buf_ptr,
  818. token,
  819. sizeof( token),
  820. tstamp_ptr);
  821. *tsdesc_ptr = sec_entry_ptr->tsdesc;
  822. if( NULL == sec_entry_ptr->tsdesc)
  823. /* An invalid security ID. */
  824. return( 1);
  825. if( context_ptr->col_is_null[0]
  826. && TS_IS_IRREGULAR( ts_get_ts( sec_entry_ptr->tsdesc)))
  827. {
  828. mi_string err_buf[128];
  829. sprintf( err_buf, "Missing date on line %d.", buf_ptr->line_no);
  830. mi_db_error_raise( NULL, MI_MESSAGE, err_buf);
  831. return(1);
  832. }
  833. *null_line = 0;
  834. *day_elem_ptr = ts_make_elem_with_buf( sec_entry_ptr->tsdesc,
  835. context_ptr->col_data,
  836. context_ptr->col_is_null,
  837. NULL,
  838. *day_elem_ptr);
  839. return(1);
  840. } /* End of read_line. */
  841. /*
  842. ***************************************************************************
  843. * name: TSIncLoad
  844. *
  845. * purpose: UDR for incremental loading of timeseries from a file.
  846. *
  847. ***************************************************************************
  848. */
  849. #ifdef NT
  850. __declspec(dllexport)
  851. #endif
  852. void
  853. TSIncLoad( mi_lvarchar *table_name, /* the table that holds the time series. */
  854. mi_lvarchar *file_name,
  855. /* The name of the file containing the data. It must be accessible
  856. * on the server machine.
  857. */
  858. /*
  859. * The following parameters are only used to create new time
  860. * series.
  861. */
  862. mi_lvarchar *calendar_name,
  863. mi_datetime *origin,
  864. mi_integer threshold,
  865. mi_boolean regular,
  866. mi_lvarchar *container_name,
  867. mi_integer nelems,
  868. MI_FPARAM *fParamPtr)
  869. {
  870. FILE_BUF buf = {0};
  871. ts_tselem day_elem = NULL;
  872. ts_tsdesc *tsdesc = NULL;
  873. ts_timeseries *ts = NULL;
  874. mi_datetime tstamp = {0};
  875. loader_context_t context = {0};
  876. mi_unsigned_integer yield_count = 0;
  877. sec_entry_t *sec_entry_ptr = NULL;
  878. int null_line = 0;
  879. init_context( table_name,
  880. calendar_name,
  881. origin,
  882. threshold,
  883. regular,
  884. container_name,
  885. nelems,
  886. &context);
  887. open_buf( file_name, &buf);
  888. while( read_line( &context,
  889. &buf,
  890. &tsdesc,
  891. &day_elem,
  892. &null_line,
  893. &tstamp,
  894. &sec_entry_ptr))
  895. {
  896. yield_count++;
  897. /* Periodically (once every 64 input lines) check for interrupts and
  898. * yield the processor to other threads.
  899. */
  900. if( 0 == (yield_count & 0x3f))
  901. {
  902. if( mi_interrupt_check())
  903. mi_db_error_raise( NULL, MI_EXCEPTION, "Load aborted.");
  904. mi_yield();
  905. }
  906. if( null_line)
  907. continue;
  908. ts = ts_put_elem_no_dups( tsdesc, day_elem, &tstamp);
  909. if( sec_entry_ptr->in_row && TS_IS_INCONTAINER( ts))
  910. {
  911. sec_entry_ptr->in_row = 0;
  912. increment_instances_created( &context);
  913. }
  914. }
  915. if( NULL != day_elem)
  916. ts_free_elem( tsdesc, day_elem);
  917. close_buf( &buf);
  918. update_series( &context);
  919. insert_series( &context);
  920. close_context( &context);
  921. } /* End of TSIncLoad. */