/* * Licensed Materials - Property of HCL * * IBM Informix DataBlade Module * (C) Copyright International Business Machines Corporation 2002. * (c) Copyright HCL Technologies Ltd. 2017. All Rights Reserved. * * COPYRIGHT LICENSE: * This information contains sample application programs in source language, * which illustrate programming techniques on various operating platforms. * You may copy, modify, and distribute these sample programs in any form * without payment to IBM, for the purposes of developing, using, marketing * or distributing application programs conforming to the application * programming interface for the operating platform for which the sample * programs are written. These examples have not been thoroughly tested under * all conditions. IBM, therefore, cannot guarantee or imply reliability, * serviceability, or function of these programs. You may copy, modify, and * distribute these sample programs in any form without payment to IBM for * the purposes of developing, using, marketing, or distributing application * programs conforming to IBM's application programming interfaces. * Each copy or any portion of these sample programs or any derivative work, * must include a copyright notice as follows: * © (your company name) (year). Portions of this code are derived from * IBM Corp. Sample Programs. © Copyright IBM Corp. (enter the year or * years). All rights reserved. * */ /* The loader procedure in this file loads time variant data from a * file into a table containing time series. It assumes that the * table has already been populated with the time invariant data. If * the table already has time series data the new data will overwrite * the old data or be appended to the existing time series depending * on the time stamps. * * SETUP: * * create procedure TSIncLoad( table_name lvarchar, * file_name lvarchar, * calendar_name lvarchar, * origin datetime year to day, * threshold integer, * regular boolean, * container_name lvarchar, * nelems integer) * external name '$INFORMIXDIR/extend/timeseries/Loader.bld(TSIncLoad)' * language C; * * create row type day_info ( * ValueDate DATETIME year to day, * carryover char(1), * spread integer, * pricing_bmk_id integer, * price float, * yield float, * priority char(1) ); * * create table corporates ( * Secid integer UNIQUE, * . * . * . * series TimeSeries(day_info)); * * create index corporatesIdx on corporates( Secid); * * Any name may be used for the "corporates" table. The corporates table * may have any number of columns in addition to the Secid and series columns. * * Each line of the data file has the following format: * Secid year-mon-day carryover spread pricing_bmk_id price yield priority * * For instance: * 25000006 1986-1-7 m 2 12 2.2000000000 22.2 6 * * The loader is invoked with a SQL statement like: * execute procedure TSIncLoad( 'corporates', * 'data_file_name', * 'cal_name', * '1980-1-1', * 20, * 't', * 'container-name'); * */ #include #include #include #include #include #include "datetime.h" #include "mi.h" #include "tseries.h" #define DAY_INFO_TYPE_NAME "day_info" #define DAILY_COL_COUNT 7 typedef struct { mi_integer fd; mi_unsigned_integer flags; #define LDBUF_LAST_CHAR_EOL 0x1 mi_integer buf_index; mi_integer buf_len; mi_integer line_no; mi_lvarchar *file_name; mi_string data[2048]; } FILE_BUF; #define STREAM_EOF (-1) typedef struct sec_entry_s { mi_integer sec_id; ts_tsdesc *tsdesc; mi_boolean in_row; /* Indicates whether the time series is stored in row. */ mi_boolean created; /* Indicates whether we just created this timeseries */ struct sec_entry_s *next; } sec_entry_t; typedef struct { mi_lvarchar *table_name; MI_TYPEID ts_typeid; /* The type id of timeseries(day_info) */ mi_string *calendar_name; mi_datetime *origin; mi_integer threshold; mi_boolean regular; mi_string *container_name; mi_integer nelems; /* For created time series. */ mi_integer hash_size; MI_CONNECTION *conn; sec_entry_t **hash; /* Value buffers -- only allocated once. */ MI_DATUM col_data[ DAILY_COL_COUNT]; mi_boolean col_is_null[ DAILY_COL_COUNT]; char *carryover; char *priority; mi_double_precision price, yield; mi_integer instances_created; /* A count of the number of tsinstancetable entries added. Used to decide * when to update statistics on this table. */ MI_SAVE_SET *save_set; } loader_context_t; /* *************************************************************************** * name: init_context * * purpose: Initialize the loader context structure. * * notes: *************************************************************************** */ static void init_context( mi_lvarchar *table_name, mi_lvarchar *calendar_name, mi_datetime *origin, mi_integer threshold, mi_boolean regular, mi_lvarchar *container_name, mi_integer nelems, loader_context_t *context_ptr) { mi_string buf[256]; mi_integer table_name_len = mi_get_varlen( table_name); MI_ROW *row = NULL; MI_DATUM retbuf = 0; mi_integer retlen = 0; mi_lvarchar *typename = NULL; MI_TYPEID *typeid = NULL; mi_integer err = 0; if( table_name_len > IDENTSIZE) mi_db_error_raise( NULL, MI_EXCEPTION, "The table name is too long"); memset( context_ptr, 0, sizeof( *context_ptr)); context_ptr->conn = mi_open( NULL, NULL, NULL); typename = mi_string_to_lvarchar( "timeseries(" DAY_INFO_TYPE_NAME ")"); typeid = mi_typename_to_id( context_ptr->conn, typename); mi_var_free( typename); if( NULL == typeid) mi_db_error_raise( NULL, MI_EXCEPTION, "Type timeseries(" DAY_INFO_TYPE_NAME ") not defined."); context_ptr->ts_typeid = *typeid; context_ptr->table_name = table_name; context_ptr->calendar_name = mi_lvarchar_to_string( calendar_name); context_ptr->origin = origin; context_ptr->threshold = threshold; context_ptr->regular = regular; context_ptr->container_name = mi_lvarchar_to_string( container_name); context_ptr->nelems = nelems; /* Use the size (count) of the table as the hash table size. */ sprintf( buf, "select count(*) from %.*s;", table_name_len, mi_get_vardata( table_name)); if( MI_OK != mi_exec( context_ptr->conn, buf, MI_QUERY_BINARY)) mi_db_error_raise( NULL, MI_EXCEPTION, "mi_exec failed"); if( MI_ROWS != mi_get_result( context_ptr->conn)) { sprintf( buf, "Could not get size of %.*s table.", table_name_len, mi_get_vardata( table_name)); mi_db_error_raise( NULL, MI_EXCEPTION, buf); } if( NULL == (row = mi_next_row( context_ptr->conn, &err))) mi_db_error_raise( NULL, MI_EXCEPTION, "mi_next_row failed"); if( MI_NORMAL_VALUE != mi_value( row, 0, &retbuf, &retlen) || 0 != dectoint( (mi_decimal *) retbuf, (int *)&context_ptr->hash_size) || context_ptr->hash_size == 0) context_ptr->hash_size = 256; (void) mi_query_finish( context_ptr->conn); context_ptr->hash = mi_zalloc( context_ptr->hash_size*sizeof( *context_ptr->hash)); context_ptr->col_data[1] = (MI_DATUM) mi_new_var(1); /* carryover */ context_ptr->col_data[6] = (MI_DATUM) mi_new_var(1); /* priority */ if( NULL == context_ptr->hash || NULL == context_ptr->col_data[1] || NULL == context_ptr->col_data[6]) mi_db_error_raise( NULL, MI_EXCEPTION, "Not enough memory."); context_ptr->carryover = mi_get_vardata( (mi_lvarchar *) context_ptr->col_data[1]); context_ptr->col_data[4] = (MI_DATUM) &context_ptr->price; context_ptr->col_data[5] = (MI_DATUM) &context_ptr->yield; context_ptr->priority = mi_get_vardata( (mi_lvarchar *) context_ptr->col_data[6]); context_ptr->save_set = mi_save_set_create( context_ptr->conn); } /* End of init_context. */ /* *************************************************************************** * name: close_context * * purpose: Close the context structure. Free up all allocated memory. * *************************************************************************** */ static void close_context( loader_context_t *context_ptr) { mi_free( context_ptr->hash); context_ptr->hash = NULL; context_ptr->hash_size = 0; mi_var_free( (mi_lvarchar *) context_ptr->col_data[1]); mi_var_free( (mi_lvarchar *) context_ptr->col_data[6]); context_ptr->col_data[1] = context_ptr->col_data[6] = 0; context_ptr->carryover = context_ptr->priority = NULL; (void) mi_save_set_destroy( context_ptr->save_set); context_ptr->save_set = NULL; (void) mi_close( context_ptr->conn); mi_free( context_ptr->calendar_name); context_ptr->calendar_name = NULL; mi_free( context_ptr->container_name); context_ptr->container_name = NULL; context_ptr->conn = NULL; } /* End of close_context. */ /* *************************************************************************** * name: update_series * * purpose: Update all the time series back into the table. * * returns: * * notes: *************************************************************************** */ static void update_series( loader_context_t *context_ptr) { mi_integer i = 0; register struct sec_entry_s *entry_ptr = NULL; struct sec_entry_s *next_entry_ptr = NULL; MI_STATEMENT *statement = NULL; char buf[256]; mi_integer rc = 0; MI_DATUM values[2] = {0, 0}; mi_integer lengths[2] = {-1, sizeof( mi_integer)}; static const mi_integer nulls[2] = {0, 0}; static const mi_string * const types[2] = {"timeseries(day_info)", "integer"}; mi_unsigned_integer yield_count = 0; sprintf( buf, "update %.*s set series = ? where Secid = ?;", mi_get_varlen( context_ptr->table_name), mi_get_vardata( context_ptr->table_name)); statement = mi_prepare( context_ptr->conn, buf, NULL); if( NULL == statement) mi_db_error_raise( NULL, MI_EXCEPTION, "mi_prepare failed"); /* Look at all the entries in the hash table. */ for( i = context_ptr->hash_size - 1; 0 <= i; i--) { for( entry_ptr = context_ptr->hash[i]; NULL != entry_ptr; entry_ptr = next_entry_ptr) { if( NULL != entry_ptr->tsdesc && MI_TRUE == entry_ptr->in_row) { yield_count++; if( 0 == (yield_count & 0x3f)) { if( mi_interrupt_check()) mi_db_error_raise( NULL, MI_EXCEPTION, "Load aborted."); mi_yield(); } values[0] = ts_get_ts( entry_ptr->tsdesc); values[1] = (MI_DATUM) entry_ptr->sec_id; lengths[0] = mi_get_varlen( ts_get_ts( entry_ptr->tsdesc)); if( mi_exec_prepared_statement( statement, MI_BINARY, 1, 2, values, lengths, (mi_integer *) nulls, (mi_char1 **) types, 0, NULL) != MI_OK) mi_db_error_raise( NULL, MI_EXCEPTION, "mi_exec_prepared_statement(update) failed"); } next_entry_ptr = entry_ptr->next; } } } /* End of update_series. */ /* *************************************************************************** * name: insert_series * * purpose: Update all the time series back into the table. * * returns: * * notes: *************************************************************************** */ static void insert_series( loader_context_t *context_ptr) { mi_integer i = 0; register struct sec_entry_s *entry_ptr = NULL; struct sec_entry_s *next_entry_ptr = NULL; MI_STATEMENT *statement = NULL; char buf[256]; mi_integer rc = 0; MI_DATUM values[2] = {0, 0}; mi_integer lengths[2] = {-1, sizeof( mi_integer)}; static const mi_integer nulls[2] = {0, 0}; static const mi_string *types[2] = {"timeseries(day_info)", "integer"}; mi_unsigned_integer yield_count = 0; sprintf( buf, "insert into %.*s (series, Secid) values (?, ?);", mi_get_varlen( context_ptr->table_name), mi_get_vardata( context_ptr->table_name)); statement = mi_prepare( context_ptr->conn, buf, NULL); if( NULL == statement) mi_db_error_raise( NULL, MI_EXCEPTION, "mi_prepare failed"); /* Look at all the entries in the hash table. */ for( i = context_ptr->hash_size - 1; 0 <= i; i--) { for( entry_ptr = context_ptr->hash[i]; NULL != entry_ptr; entry_ptr = next_entry_ptr) { if( NULL != entry_ptr->tsdesc && MI_TRUE == entry_ptr->created) { yield_count++; if( 0 == (yield_count & 0x3f)) { if( mi_interrupt_check()) mi_db_error_raise( NULL, MI_EXCEPTION, "Load aborted."); mi_yield(); } values[0] = ts_get_ts( entry_ptr->tsdesc); values[1] = (MI_DATUM) entry_ptr->sec_id; lengths[0] = mi_get_varlen( ts_get_ts( entry_ptr->tsdesc)); if( mi_exec_prepared_statement( statement, MI_BINARY, 1, 2, values, lengths, (mi_integer *) nulls, (mi_char1 **) types, 0, NULL) != MI_OK) mi_db_error_raise( NULL, MI_EXCEPTION, "mi_exec_prepared_statement(update) failed"); ts_close( entry_ptr->tsdesc); } next_entry_ptr = entry_ptr->next; mi_free( entry_ptr); } context_ptr->hash[i] = NULL; } } /* End of insert_series. */ /* *************************************************************************** * name: open_buf * * purpose: Open a file for reading and attach it to a buffer. * *************************************************************************** */ static void open_buf( mi_lvarchar *file_name, FILE_BUF *buf_ptr) { mi_string *file_name_str = mi_lvarchar_to_string( file_name); memset( buf_ptr, 0, sizeof( *buf_ptr)); buf_ptr->fd = mi_file_open( file_name_str, O_RDONLY, 0); mi_free( file_name_str); buf_ptr->file_name = file_name; if( MI_ERROR == buf_ptr->fd) { char buf[356]; mi_integer name_len = (256 < mi_get_varlen( file_name)) ? 256 : mi_get_varlen( file_name); sprintf( buf, "mi_file_open(%.*s) failed", name_len, mi_get_vardata( file_name)); mi_db_error_raise( NULL, MI_EXCEPTION, buf); } buf_ptr->buf_index = 0; buf_ptr->buf_len = 0; buf_ptr->line_no = 1; } /* End of open_buf. */ /* *************************************************************************** * name: get_char * * purpose: Get the next character from a buffered file * * returns: The character or STREAM_EOF * *************************************************************************** */ static mi_integer get_char( FILE_BUF *buf_ptr) { register mi_integer c = STREAM_EOF; if( buf_ptr->buf_index >= buf_ptr->buf_len) { buf_ptr->buf_index = 0; buf_ptr->buf_len = mi_file_read( buf_ptr->fd, buf_ptr->data, sizeof( buf_ptr->data)); if( MI_ERROR == buf_ptr->buf_len) { char buf[356]; mi_integer name_len = (256 < mi_get_varlen( buf_ptr->file_name)) ? 256 : mi_get_varlen( buf_ptr->file_name); sprintf( buf, "mi_file_read(%.*s) failed", name_len, mi_get_vardata(buf_ptr->file_name)); mi_db_error_raise( NULL, MI_EXCEPTION, buf); } if( 0 == buf_ptr->buf_len) return( STREAM_EOF); } /* Increment buf_ptr->line_no until we have started on the next line, not * when the newline character is seen. */ if( buf_ptr->flags & LDBUF_LAST_CHAR_EOL) { buf_ptr->line_no++; buf_ptr->flags &= ~LDBUF_LAST_CHAR_EOL; } c = buf_ptr->data[ buf_ptr->buf_index++]; if( '\n' == c) buf_ptr->flags |= LDBUF_LAST_CHAR_EOL; return( c); } /* End of get_char. */ /* *************************************************************************** * name: close_buf * * purpose: Close a file attached to a buffer. * * notes: *************************************************************************** */ static void close_buf( FILE_BUF *buf_ptr) { mi_file_close( buf_ptr->fd); buf_ptr->fd = MI_ERROR; buf_ptr->buf_index = 0; buf_ptr->buf_len = 0; buf_ptr->file_name = NULL; } /* End of close_buf. */ /* *************************************************************************** * name: get_token * * purpose: Get the next token from an input stream. * * returns: The token in a buffer and the next character after the buffer * * notes: Assumes that the tokens are separated by white space. *************************************************************************** */ static mi_integer get_token( FILE_BUF *buf_ptr, mi_string *token, size_t token_buf_len) { register mi_integer c = get_char( buf_ptr); register mi_integer i = 0; while( STREAM_EOF != c && isspace( c)) c = get_char( buf_ptr); for( ;STREAM_EOF != c && c != ',' && c != '\n'; c = get_char( buf_ptr)) { if( i >= token_buf_len - 1) { char err_buf[128]; sprintf( err_buf, "Word is too long on line %d.", buf_ptr->line_no); mi_db_error_raise( NULL, MI_EXCEPTION, err_buf); } token[i++] = c; } token[i] = 0; return( c); } /* End of get_token. */ /* *************************************************************************** * name: increment_instances_created * * purpose: Increment the instances_created field of the and update statistics * when it crosses a threshold. If the statistics for the * time series instance table were never updated then the server * would not use the index on the instance table, and time series * opens would be very slow. * * returns: nothing * * notes: *************************************************************************** */ static void increment_instances_created( loader_context_t *context_ptr) { context_ptr->instances_created++; if( 50 != context_ptr->instances_created) return; (void) mi_exec( context_ptr->conn, "update statistics high for table tsinstancetable( id);", MI_QUERY_BINARY); } /* End of increment_instances_created. */ /* *************************************************************************** * name: get_sec_entry * * purpose: Get the security entry for a security ID * * returns: A pointer to security entry * * notes: If the entry is not found in the hash table then the security is * looked up in the table and a new entry made in the hash table. * A warning message will be emitted if the security ID cannot be * found. In this case the security entry will have a NULL tsdesc. *************************************************************************** */ static sec_entry_t * get_sec_entry( loader_context_t *context_ptr, mi_integer sec_id, mi_integer line_no) { mi_unsigned_integer i = ((mi_unsigned_integer) sec_id) % context_ptr->hash_size; sec_entry_t *entry_ptr = context_ptr->hash[i]; mi_string buf[256]; mi_integer rc = 0; /* Look the security ID up in the hash table. */ for( ; NULL != entry_ptr; entry_ptr = entry_ptr->next) { if( sec_id == entry_ptr->sec_id) return( entry_ptr); } /* This is the first time this security ID has been seen. */ entry_ptr = mi_zalloc( sizeof( *entry_ptr)); entry_ptr->sec_id = sec_id; entry_ptr->next = context_ptr->hash[i]; context_ptr->hash[i] = entry_ptr; /* Look up the security ID in the database table. */ sprintf( buf, "select series from %.*s where Secid = %d;", mi_get_varlen( context_ptr->table_name), mi_get_vardata( context_ptr->table_name), sec_id); if( MI_OK != mi_exec( context_ptr->conn, buf, MI_QUERY_BINARY)) mi_db_error_raise( NULL, MI_EXCEPTION, "mi_exec failed."); rc = mi_get_result( context_ptr->conn); if( MI_NO_MORE_RESULTS == rc) { sprintf( buf, "Security %d (line %d) not in %.*s.", sec_id, line_no, mi_get_varlen( context_ptr->table_name), mi_get_vardata( context_ptr->table_name)); mi_db_error_raise( NULL, MI_MESSAGE, buf); /* Mi_db_error_raise returns after raising messages of type MI_MESSAGE. */ } else if( MI_ROWS != rc) mi_db_error_raise( NULL, MI_EXCEPTION, "mi_get_result failed."); else { mi_integer err = 0; MI_ROW *row = mi_next_row( context_ptr->conn, &err); MI_DATUM ts_datum = 0; mi_integer retlen = 0; /* Save the row so that the time series column will not be erased when * the query is finished. */ if( NULL != row && MI_NORMAL_VALUE == mi_value( row, 0, &ts_datum, &retlen)) { if( NULL == (row = mi_save_set_insert( context_ptr->save_set, row))) mi_db_error_raise( NULL, MI_EXCEPTION, "mi_save_set_insert failed"); } if( NULL != row) rc = mi_value( row, 0, &ts_datum, &retlen); if( MI_NORMAL_VALUE != rc && MI_NULL_VALUE != rc) { if( 0 != err) { sprintf( buf, "Look up of security ID %d in %.*s failed.", sec_id, mi_get_varlen( context_ptr->table_name), mi_get_vardata( context_ptr->table_name)); mi_db_error_raise( NULL, MI_EXCEPTION, buf); } else { sprintf( buf, "Security %d (line %d) not in %.*s.", sec_id, line_no, mi_get_varlen( context_ptr->table_name), mi_get_vardata( context_ptr->table_name)); mi_db_error_raise( NULL, MI_MESSAGE, buf); return( entry_ptr); } } if( MI_NULL_VALUE != rc) entry_ptr->in_row = (TS_IS_INCONTAINER( (ts_timeseries *) ts_datum) != 0); else { /* No time series has been created for this security yet. * Start one. */ ts_datum = ts_create( context_ptr->conn, context_ptr->calendar_name, context_ptr->origin, context_ptr->threshold, context_ptr->regular ? 0 : TS_CREATE_IRR, &context_ptr->ts_typeid, context_ptr->nelems, context_ptr->container_name); entry_ptr->in_row = MI_TRUE; entry_ptr->created = MI_TRUE; if( entry_ptr->in_row) increment_instances_created( context_ptr); } entry_ptr->tsdesc = ts_open( context_ptr->conn, ts_datum, &context_ptr->ts_typeid, 0); } return( entry_ptr); } /* End of get_sec_entry. */ /* *************************************************************************** * name: is_null * * purpose: Determine whether a token represents a NULL * * returns: 1 if so, 0 if not * *************************************************************************** */ static int is_null( register mi_string *token) { return( ('N' == token[0] || 'n' == token[0]) && ('U' == token[1] || 'u' == token[1]) && ('L' == token[2] || 'l' == token[2]) && ('L' == token[3] || 'l' == token[3]) && 0 == token[4]); } /* End of is_null. */ /* *************************************************************************** * name: read_day_data * * purpose: Read in the daily data for one security * * returns: Fills in the timestamp structure, the col_data and col_is_null * arrays. * * notes: Assumes that the col_is_null array is initialized to all TRUE. *************************************************************************** */ static void read_day_data( loader_context_t *context_ptr, FILE_BUF *buf_ptr, mi_string *token, size_t token_buf_len, mi_datetime *tstamp_ptr) { register mi_integer i = 0; register mi_integer c; /* ValueDate DATETIME year to day*/ c = get_token( buf_ptr, token, token_buf_len); if( STREAM_EOF== c && 0 == strlen( token) || '\n' == c) return; tstamp_ptr->dt_qual = TU_DTENCODE( TU_YEAR, TU_F5); if( is_null( token)) tstamp_ptr->dt_dec.dec_pos = DECPOSNULL; else { if( 0 == dtcvasc( token, tstamp_ptr)) { context_ptr->col_is_null[0] = MI_FALSE; context_ptr->col_data[0] = (MI_DATUM) tstamp_ptr; } else { mi_string err_buf[128]; sprintf( err_buf, "Illegal date on line %d", buf_ptr->line_no); mi_db_error_raise( NULL, MI_MESSAGE, err_buf); } } /* carryover char(1) */ c = get_token( buf_ptr, token, token_buf_len); if( STREAM_EOF== c && 0 == strlen( token) || '\n' == c) return; if( ! is_null( token)) { *(context_ptr->carryover) = token[0]; context_ptr->col_is_null[1] = MI_FALSE; } /* spread integer, * pricing_bmk_id integer */ for( i = 2; i < 4; i++) { c = get_token( buf_ptr, token, token_buf_len); if( STREAM_EOF== c && 0 == strlen( token) || '\n' == c) return; if( ! is_null( token)) { context_ptr->col_data[i] = (MI_DATUM) atoi( token); context_ptr->col_is_null[i] = MI_FALSE; } } /* price float, * yield float */ for( i = 4; i < 6; i++) { c = get_token( buf_ptr, token, token_buf_len); if( STREAM_EOF== c && 0 == strlen( token) || '\n' == c) return; if( ! is_null( token)) { *((double *) context_ptr->col_data[i]) = atof( token); context_ptr->col_is_null[i] = MI_FALSE; } } /* priority char(1) */ c = get_token( buf_ptr, token, token_buf_len); if( (STREAM_EOF == c || '\n' == c) && 0 == strlen( token)) return; if( ! is_null( token)) { *(context_ptr->priority) = token[0]; context_ptr->col_is_null[6] = MI_FALSE; } } /* End of read_day_data. */ /* *************************************************************************** * name: read_line * * purpose: Read a line from the file, fetch the time series descriptor * corresponding to the Secid, create a time series element for * the line, and convert the date into an mi_datetime structure * * returns: 1 if there was more data in the file, * 0 if the end of the file was found * * notes: Creates a new time series if the series column for the Secid is * NULL. *************************************************************************** */ int read_line( loader_context_t *context_ptr, FILE_BUF *buf_ptr, ts_tsdesc **tsdesc_ptr, ts_tselem *day_elem_ptr, int *null_line, mi_datetime *tstamp_ptr, sec_entry_t **sec_entry_ptr_ptr) { mi_integer sec_id = -1; sec_entry_t *sec_entry_ptr = NULL; mi_string token[256]; mi_integer c = 0; /* Next character from file. */ mi_integer i = 0; *sec_entry_ptr_ptr = NULL; *null_line = 1; for( i = 0; i < DAILY_COL_COUNT; i++) context_ptr->col_is_null[ i] = MI_TRUE; c = get_token( buf_ptr, token, sizeof( token)); if( STREAM_EOF== c && 0 == strlen( token)) return( 0); sec_id = atoi( token); *sec_entry_ptr_ptr = sec_entry_ptr = get_sec_entry( context_ptr, sec_id, buf_ptr->line_no); read_day_data( context_ptr, buf_ptr, token, sizeof( token), tstamp_ptr); *tsdesc_ptr = sec_entry_ptr->tsdesc; if( NULL == sec_entry_ptr->tsdesc) /* An invalid security ID. */ return( 1); if( context_ptr->col_is_null[0] && TS_IS_IRREGULAR( ts_get_ts( sec_entry_ptr->tsdesc))) { mi_string err_buf[128]; sprintf( err_buf, "Missing date on line %d.", buf_ptr->line_no); mi_db_error_raise( NULL, MI_MESSAGE, err_buf); return(1); } *null_line = 0; *day_elem_ptr = ts_make_elem_with_buf( sec_entry_ptr->tsdesc, context_ptr->col_data, context_ptr->col_is_null, NULL, *day_elem_ptr); return(1); } /* End of read_line. */ /* *************************************************************************** * name: TSIncLoad * * purpose: UDR for incremental loading of timeseries from a file. * *************************************************************************** */ #ifdef NT __declspec(dllexport) #endif void TSIncLoad( mi_lvarchar *table_name, /* the table that holds the time series. */ mi_lvarchar *file_name, /* The name of the file containing the data. It must be accessible * on the server machine. */ /* * The following parameters are only used to create new time * series. */ mi_lvarchar *calendar_name, mi_datetime *origin, mi_integer threshold, mi_boolean regular, mi_lvarchar *container_name, mi_integer nelems, MI_FPARAM *fParamPtr) { FILE_BUF buf = {0}; ts_tselem day_elem = NULL; ts_tsdesc *tsdesc = NULL; ts_timeseries *ts = NULL; mi_datetime tstamp = {0}; loader_context_t context = {0}; mi_unsigned_integer yield_count = 0; sec_entry_t *sec_entry_ptr = NULL; int null_line = 0; init_context( table_name, calendar_name, origin, threshold, regular, container_name, nelems, &context); open_buf( file_name, &buf); while( read_line( &context, &buf, &tsdesc, &day_elem, &null_line, &tstamp, &sec_entry_ptr)) { yield_count++; /* Periodically (once every 64 input lines) check for interrupts and * yield the processor to other threads. */ if( 0 == (yield_count & 0x3f)) { if( mi_interrupt_check()) mi_db_error_raise( NULL, MI_EXCEPTION, "Load aborted."); mi_yield(); } if( null_line) continue; ts = ts_put_elem_no_dups( tsdesc, day_elem, &tstamp); if( sec_entry_ptr->in_row && TS_IS_INCONTAINER( ts)) { sec_entry_ptr->in_row = 0; increment_instances_created( &context); } } if( NULL != day_elem) ts_free_elem( tsdesc, day_elem); close_buf( &buf); update_series( &context); insert_series( &context); close_context( &context); } /* End of TSIncLoad. */