Data Marts

This page documents a feature in SanteDB version 3.0.

Introduction

SanteDB's Business Intelligence services allow developers of SanteDB applets to define custom reports, queries, views and also (starting with Version 3.0) custom data marts.

A data mart is replica of the primary SanteDB database whereby selective data is pre-pivoted, calculated and stored in a schema which is more suited for running reports. This makes writing reports easier and makes running reports faster. It also allows system administrators to host data for reporting purposes on another physical server instance.

The SanteDB BI data mart infrastructure allows for the extension and derivation of datamarts on one another. A data mart definition is defined in a new bi asset file with a DatamartDefinition element and contains:

  • Metadata about the data mart definition including authors, and permissions to access the data mart.

  • A data source registration which identifies the <dataSource which can be referenced by reports, queries and views.

  • A <schema> which identifies the tables, views and relationships between these objects within the datamart.

  • A <flows> element which defines one or more data flows and data pipelines for extracting data from one or SanteDB data sources (including other data marts) to populate the new data mart.

Previous versions of SanteDB (namely OpenIZ and SanteDB 1.0) shipped with a series of ETL jobs written in Talend, however this solution was not portable, and did not allow definition of data marts in an end-user conifgurable manner, and also did not work on the mobile application environment. The new SanteDB 3.0 data mart seeks to resolve this limitation.

Defining a Datamart

Datamarts are defined by create a new asset file in the bi/ folder of your applet. The XML file has a structure as illustrated below:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-model href="~/.ref/schema/BusinessIntelligence.xsd" 
     type="application/xml" schematypens="http://www.w3.org/2001/XMLSchema"?>
<BiDatamartDefinition xmlns="http://santedb.org/bi" 
    id="org.example.bi.datamart.example"
    name="my-datamart" label="my-datamart">

  <meta status="active" version="1.0">
    <authors>
      <add>YourCo Inc.</add>
    </authors>
    <annotation>
      <div xmlns="http://www.w3.org/1999/xhtml">
        <p>
          This is an example!
        </p>
      </div>
    </annotation>
  </meta>

  <!-- If your data-mart is a copy and extension of an existing mart -->
  <basedOn ... >
  
  <!-- If your data-mart definition adds to another data-mart that already exists -->
  <extends ...>
  
  <!-- The data source that this data mart produces -->
  <produces ... >
  
  <!-- The schema of your data mart -->
  <schema ...>
  
  <!-- The name of the data flow where the ETL process should start 
       Note: The default is a data flow named "main" -->
  <entry ...>
  
  <!-- Data Flows in the Data-Mart -->
  <dataFlows ...> 
  
</BiDatamartDefinition>

Deriving / Extending Data Marts

SanteDB 3.0 provides a default datamart named org.santedb.bi.dataSource.warehouse, and other projects developed by SanteSuite (such as SanteEMR, SanteIMS) may have their own. Implementers can either extend these data marts or base entirely new marts on them.

Extending an Existing Mart

Extending an existing data mart is useful when:

  • You wish to add new views to an already existing mart

  • You wish to add new tables to an existing mart

  • You wish to change or modify data in an existing mart after it is populated (post processing tasks)

To extend a data-mart, the extends element is used with a reference to the other data mart

<BiDatamartDefinition xmlns="http://santedb.org/bi" 
    id="org.example.bi.datamart.extension"
    name="extends-core" label="extends-core">
   
   <meta status="active" version="1.0">
    <authors>
      <add>YourCo Inc.</add>
    </authors>
    <annotation>
      <div xmlns="http://www.w3.org/1999/xhtml">
        <p>Adds FOO_TBL to the core data mart</p>
      </div>
    </annotation>
  </meta>
  <extends ref="#org.santedb.bi.dataSource.warehouse" />
  <schema>
    <table name="FOO_TBL">
      <column type="string" name="BAR" />
      <parent ref="ENT_TBL" />
    </table>
  </schema>
</BiDatamartDefinition>    

Deriving a New Mart

It is also possible for developers to base an entirely new data mart on an existing definition. When using a derivation, the underlying mart definition is "copied", this saves developers from re-defining common assets.

Deriving a data-mart is done with the basedOn element.

Produced Data Source

The primary purpose of the data mart is to provide a BI data source which can be used for reports, queries, or like any other connection in SanteDB. The data source which is created or used as the output for the data mart is identified in the <produces element of the datamart definition.

Existing Data Source

If you would like to output your data mart to an existing BI data source, use the ref option.

<produces ref="#org.santedb.bi.dataSource.main" />

Configured Database

If you have configured a connection string in the SanteDB server configuration or would like to place the output of your datamart in an existing location you can specify the connectionString attribute on your datamart definition.

<produces id="org.example.bi.mart.example" name="example" 
     connectionString="nameOfConnectionStringInFile" />

New Database

If you would like the BI layer to create a new database on your database server, you can simply specify the name, permissions, and other metadata for your new data source.

<produces id="org.example.bi.mart.example" name="example">
 <meta status="active">
   <policies>
     <!-- This requires query clinical data -->
     <demand>1.3.6.1.4.1.33349.3.1.5.9.2.2.0</demand>
   </policies>
  </meta>
</produces>

You can specify a different databse server and technology for all data marts by creating a new connection string in your server configuration file with the biSkel attribute.

Schema Definition

The <schema> element of the data mart definition is used to define the tables and views for the data mart in a database technology agnostic manner.

For example, to define a data mart which has two tables FACILITY_TBL, PATIENT_BY_DOB_TBL:

<BiDatamartDefinition xmlns="http://santedb.org/bi" 
    id="org.example.bi.datamart.extension"
    name="extends-core" label="extends-core">
    
    ...
    
    <schema>
        <table name="FACILITY_TBL">
            <column type="uuid" name="ID" key="true" notNull="true" />
            <column type="string" name="NAME" />
        </table>
        
        <table name="PATIENT_BY_DOB">
            <column type="date" name="DOB" key="true" notNull="true" />
            <column type="ref" name="FACILITY_ID">
                <otherTable ref="FACILITY_TBL" />
            </column>
            <column type="integer" name="MALES" />
            <column type="integer" name="FEMALES" />
        </table>
    </schema>
    
    ...
    
</BiDatamartDefinition>

Basic Table Definition

The basic table definition in a schema is expanded below:

<table name="{NAME_OF_TABLE}" temporary="true|false" tableSpace="{NAME_OF_TABLESPACE}">
    <column
        name="NAME_OF_COLUMN"
        type="integer|date|date-time|string|uuid|ref|decimal|bool"
        key="true|false"
        index="true|false"
        unique="true|false"
        notNull="true|false">
        <otherTable ref="{FOREIGN_KEY_TABLE}" />
    </column>
    <parent ref="{NAME_OF_PARENT_TABLE}" />
</table>

Foreign Keys

Links between tables can be established using a type="ref" on your column and including an <otherTable> reference. For example, to link an ADDRESS table to a PATIENTS table:

<table name="PATIENTS">
    <column type="uuid" name="PATIENT_ID" key="true" />
    <column type="date" name="DATE_OF_BIRTH" />
</table>
<table name="ADDRESS">
    <column type="uuid" name="ADDRESS_ID" key="true" />
    <column type="ref" name="PATIENT_ID">
        <otherTable ref="PATIENTS" />
    </column>
    <column type="string" name="CITY" />
</table>

Would result in a schema where:

Parent Tables

SanteDB's primary CDR stores data as a series of objects in a hierarchy (see: Conceptual Information Model). In a relational database this is represented as "table per class" pattern, using the entity classes, this can be represented as.

This means that every Place, Person, Patient, and Provider should have an entry in Entity as well with the core data elements. Each other table merely adds more data to the previous.

The BI schema definition allows the devleoper to shortcut this, and automatically create views which link these tables up the hierarchy using the <parent> element on the table, for example:

<table name="PERSON">
    <column name="PERSON_ID" type="uuid" key="true" />
    <column name="GENDER" type="string" index="true" />
    <column name="FIRST_NAME" type="string" />
    <column name="LAST_NAME" type="string" />
</table>
<table name="PATIENT">
    <parent ref="PERSON" />
    <column type="string" name="MRN" notNull="true" />
    <column type="ref" name="MOTHER">
        <otherTable ref="PERSON" />
    </column>
    <column type="ref" name="FATHER">
        <otherTable ref="PERSON" />
    </column>
</table>

Would allow a PATIENT to have a MOTHER which is either just a PERSON (who has only a gender, first and last name), or a PATIENT with an MRN.

The BI layer also automatically establishes a view VW_PATIENT which can be used to get all applicable fields for the PATIENT. The SQL generated for this definition is similar to:

CREATE TABLE PERSON (
    PERSON_ID UUID NOT NULL,
    GENDER TEXT,
    FIRST_NAME TEXT,
    LAST_NAME TEXT,
    CONSTRAINT PK_PERSON PRIMARY KEY (PERSON_ID)
)
CREATE INDEX PERSON_GENDER_IDX ON PERSON(GENDER);
CREATE TABLE PATIENT (
    MRN TEXT NOT NULL,
    MOTHER UUID,
    FATHER UUID,
    PERSON_ID UUID NOT NULL,
    CONSTRAINT PK_PATIENT PRIMARY KEY (PERSON_ID),
    CONSTRAINT FK_PATIENT_PARENT FOREIGN KEY (PERSON_ID) REFERENCES
        PERSON(PERSON_ID),
    CONSTRAINT FK_PATIENT_MOTHER FOREIGN KEY (PERSON_ID) REFERENCES
        PERSON(PERSON_ID),
    CONSTRAINT FK_PATIENT_FATHER FOREIGN KEY (PERSON_ID) REFERENCES
        PERSON(PERSON_ID)
);
CREATE VIEW VW_PATIENT AS
    SELECT MRN, MOTHER, FATHER, PERSON_ID, GENDER, FIRST_NAME, LAST_NAME
    FROM PATIENT
        INNER JOIN PERSON USING(PERSON_ID);

Data Flows

The schema of the data mart defines its structure, the data flows of the data mart definition describe how data is extracted from another source to populate the data mart.

<dataFlows>
    <flow name="{DATA_FLOW_NAME}">
        <parameters>
            <int|string|ref|bool|uuid name="{NAME_OF_PARAMETER}" />
            <int|string|ref|bool|uuid name="{NAME_OF_PARAMETER}" />
        </parameters>
        <pipeline>
            <!-- Pipeline Steps here -->
        </pipeline>
        <return ref="{REFERENCE_TO_PIPELINE_OUTPUT_TO_RETURN}" />
    </flow>
    <flow name=....
</dataFlows>

Each data flow defines one or more parameters (which can be passed by a caller) and a single data pipeline.

Data Pipeline

The order of the elements in the <pipeline> element don't necessarily have an impact on the order in which the items are called, the execution of a data pipeline starts with all un-streamed components and works backwards. This creates a streaming IEnumerable data flow for each component, for example:

<flow name="example">
    <pipeline>
        <connection name="inputConnection" mode="read-only">
            <dataSource ref="#org.santedb.bi.dataSource.main" />
        </connection>
        <connection name="outputConnection" mode="read-write">
            <dataSource ref="#org.example.bi.mart.example" />
        </connection>
        <reader name="read_addresses">
            <connection ref="inputConnection" />
            <sql>
                <add>
                    <![CDATA[  
                        SELECT ENT_ID, MNEMONIC, VAL
                        FROM ENT_ADDR_CMP_TBL
                           INNER JOIN CD_VRSN_TBL ON (TYP_CD_ID = CD_ID)
                    ]]>
                </add>
            </sql>
        </reader>
        <crosstab name="pivot_addresses">
            <input ref="read_addresses" />
            <pivot fn="first" key="ent_id" value="val" columnDef="mnemonic">
                <columns>
                    <add>Country</add>
                    <add>City</add>
                    <add>AddressLine</add>
                    <add>PostalCode</add>
                </columns>
            </pivot>
        </crosstab>
        <writer mode="insert" truncate="true" name="write_address">
            <input ref="pivot_addresses" />
            <connection ref="outputConnection" />
            <target ref="ADDRESSES" />
        </writer>
    </pipeline>
    <return ref="write_addresses" />
</flow>

The execution engine analysis would conclude that the terminal node of this pipeline is the <writer> step, which relies on <crosstab> which relies on <reader> so the order of execution would be:

  • Open connection inputConnection

  • Execute SQL and name the stream read_addresses

  • Pivot the output of read_addresses and namd the stream pivot_addresses

  • Open the outputConnection connection

  • Write the stream pivot_addresses to the outputConnection

Pipeline Steps

The <pipeline> element of a flow represents the activities which are to be performed for the flow. Each pipeline step must carry a name="" attribute, and should reference other steps where possible. For example, referencing components is performed with the ref="" attribute as illustrated below:

<connection name="myConnection">
    ...
</connection>
<reader name="myReader">
    <connection ref="myConnection" />
    ...
</reader>
<writer name="myWriter">
    <input ref="myReader" />
    <connection ref="myConnection" />
    target ref="myTable" />
</writer>

Connect to Database

Input: None Output: DataIntegrationConnection

Connections to the database are made using the <connection> pipeline step. The connection pipeline step is illustrated below.

<connection name="{NAME_OF_CONNECTION}" mode="read-write|read-only">
    <dataSource ref="{REFERENCE_TO_OTHER_DATASOURCE} connection="connectionStringName" />
</connection>

For example, to define a connection to the main SanteDB iCDR database:

<connection name="mainDatabase" mode="read-only">
   <dataSource ref="#org.santedb.bi.dataSource.main" />
</connection>

Execute in Transaction

Input: DataIntegrationConnection Output: IEnumerable

When executing multiple operations, it is often desirable to execute the output in a transaction. Transactions represent an atomic series of updates, and either all the components succeed, or none do. Transactions create a new sub-scope from the data flow which contains them.

<transaction name="{NAME_OF_TRANSACTION}">
    <connection ref="{NAME_OF_CONNECTION}" />
    <pipeline>
        <!-- Pipeline Here -->
    </pipeline>
</transaction>

The transaction requires that the connection be opened in read-write mode.

For example:

<connection name="outputConnection" mode="read-write">
   <dataSource ref="#org.santedb.bi.dataSource.warehouse" />
</connection>
<connection name="inputConnection" mode="read-only">
  <dataSource ref="#org.santedb.bi.dataSource.main" />
</connection>
<reader name="read_addresses">
   <connection ref="inputConnection" />
   <sql>
       <add>
          <![CDATA[  
              SELECT ENT_ID, MNEMONIC, VAL
              FROM ENT_ADDR_CMP_TBL
               INNER JOIN CD_VRSN_TBL ON (TYP_CD_ID = CD_ID)
            ]]>
        </add>
    </sql>
</reader>
<transaction name="main_transaction">
    <connection ref="warehouseDatabase" />
    <writer name="write_addresses">
        <intput ref="read_addresses" />
        <connection ref="outputConnection" />
        <target="ADDRESS_TABLE" />
    </writer>
    <filter name="reject_filter">
        <input ref="write_addresses" />
        <when column="$reject" op="eq" value="true" />
    </filter>
    <writer name="write_rejects">
        <input name="reject_filter" />
        <connection ref="outputConnection" />
        <target ref="ADDRESS_REJECTS" />
    </writer>
</transaction>        

Call Another Flow

Input: None Output: IEnumerable

The <call> pipeline instruction can be used to call another flow and pass parameter arguments to the called flow.

<call name="{NAME_OF_CALL}">
    <dataFlow ref="{OTHER_FLOW_TO_CALL}" />
    <args>
        <int|string|bool|uuid|date|date-time|ref name="NAME_OF_PARAMETER">
            <value />
        </int|string|bool|uuid|date|date-time|ref>
    </args>
</call>

For example, to call FOO_FLOW passing the input connection and a flag, and then output the result to the log:

<call name="call_foo">
    <dataFlow ref="FOO_FLOW" />
    <args>
        <ref name="connection">
            <value ref="inputConnection" />
        </ref>
        <bool name="ignoreOld">
            <value>true</value>
        </bool>
    </args>
</call>
<log name="log_foo">
    <input ref="call_foo" />
</log>

Data Reader

Input: DataIntegrationConnection Output: IEnumerable

A data reader is used to open a streaming result set based on a SQL query executed against a connection.

<reader name="{NAME_OF_RESULTING_READER_STREAM}">
    <connection ref="{CONNECTION_TO_READ_FROM}" />
    <sql>
        <add>
            <providers>
                <invariant>sqlite|FireirdSQL|npgsql</invariant>
            </providers>
            <![CDATA[ SQL TO EXECUTE ]]>
        </add>
    </sql>
    <schema ref="{REFERENCE_TO_TABLE}">
        <columns ... />
    </schema>
</reader>

For example, to read all identiifers in the SanteDB primary database.

<reader name="source_ent_id">
  <connection ref="input" />
  <schema>
      <column name="ENT_ID_ID" type="uuid"/>
      <column name="ENT_ID" type="uuid"/>
      <column name="VALUE" type="string"/>
      <column name="ISSUER" type="string"/>
      <column name="CHECK_DIGIT" type="string"/>
      <column name="ISSUED" type="date"/>
      <column name="EXPIRY" type="date"/>
  </schema>
  <sql>
    <add>
      <providers>
        <invariant>npgsql</invariant>
        <invariant>FirebirdSQL</invariant>
        <invariant>sqlite</invariant>
      </providers>
      <![CDATA[ 
                SELECT 
                    ENT_ID_TBL.ENT_ID_ID,
                    ENT_ID_TBL.ENT_ID,
                    ID_VAL AS VALUE,
                    ID_DMN_TBL.NSID AS ISSUER,
                    CHK_DGT AS CHECK_DIGIT,
                    ISS_DT AS ISSUED,
                    EXP_DT AS EXPIRY
                FROM
                    ENT_ID_TBL
                    INNER JOIN ID_DMN_TBL USING (DMN_ID)
                WHERE
                    ENT_ID_TBL.OBSLT_VRSN_SEQ_ID IS NULL
                    AND ID_DMN_TBL.OBSLT_UTC IS NULL
                ]]>
    </add>
  </sql>
 </reader>

Data Writer

Input: IEnumerable Output: IEnumerable

A data writer is used to write data to a connection. The output of the data writer is a stream of the records that were written.

<writer name="{NAME_OF_OUTPUT_STREAM}" 
    truncate="true|false"
    mode="insert|update|delete|insert-update"
    rejects="output|halt|log">
    <input ref="{NAME_OF_INPUT_STREAM}" />
    <connection ref="{NAME_OF_CONNECTION}" />
    <!-- If output to a schema table -->
    <target ref="{REF_OF_TABLE}" />
    <!-- If output to a temporary table -->
    <target temporary="true" name="name_of_temp_table">
        <column ... />
    </target>
</writer>

Records that could not be written to the output connection are streamed out of the data writer with an additional $reject and $reject.reason fields added to the output object. Use a <filter> or <split> pipeline component to filter for these records.

Column Transform

Input: IEnumerable Output: IEnumerable

A column mapping pipeline step is used to change the names of input data stream fields to other output stream field names, and to apply transforms.

<transform name="{NAME_OF_OUTPUT_STREAM}">
    <input ref="{NAME_OF_INPUT_STREAM}" />
    <map>
        <source name="{NAME_OF_INPUT_COLUMN}">
            <!-- If transforming according to an expression -->
            <transform>{C# EXPRESSION HERE}</transform>
            <!-- If mapping based on a lookup table or column -->
            <lookup joinColumn="{NAME_OF_COLUMN_IN_OTHER_TABLE}">
                <input ref="{INPUT_STREAM_NAME}" />
            </lookup>
        </source>
        <target name="{NAME_OF_TARGET_COLUMN}" />
    </map>
</transform>            

Using the lookup option requires loading of the entire source of the lookup into memory, and is intended for very small reference lookups. If a larger join is desired use a <join> pipeline component.

Crosstab

Input: IEnumerable Output: IEnumerable

The crosstab pipeline step allows developers to pivot a dataset based on an accumulator.

<crosstab name="pivot_names">
    <input ref="{NAME_OF_INPUT_DATA_STREAM}" />
    <pivot fn="first|last|sum|avg|min|max|count" 
        key="{COLUMN_REPRESENTING_THE_KEY}"
        value="{COLUMN_REPRESENTING_THE_ACCUMULATOR}"
        columnDef="{COLUMN_FOR_PIVOTING}">
        <columns>
            <add>{NAME_OF_COLUMNS_IN_OUTPUT_STREAM}</add>
        </columns>
    </pivot>
</crosstab>

The <input> stream MUST be sorted by the @key as the most significant sorting column. Failure to sort the input result set may result in the crosstab being unpredictable.

Output to Log

Input: IEnumerable Output: IEnumerable

For diagnostic purposes, it is often useful to log data and messages to the output. SanteDB data flows can log to one of the following places:

  • The application log (the trace log)

  • The registration for the particular execution of the datamart refresh

  • The console

Logs may be sourced from an input stream or may be standalone components.

<log priority="Informational|Verbose|Error|Warning"
    logTo="any|execution|console|trace">
    <input ref="{NAME_OF_STREAM_TO_USE_AS_LOG_INPUT}" />
    MESSAGE TO LOG
</log>

For example, to log a simple message to the console:

<log priority="Informational" logTo="console">{{ $principal }} is running me!</log>

To log the contents of an input stream to the execution log (which would require an administrator to log in to view):

<log priority="Verbose" logTo="execution">
    <input ref="patient_reader" />
    Mapped {{ ent_id }} successfully!
</log>

Filter

The filter object allows developers to filter an incoming result set and pass only those records which match the specified criteria to the next stage of the pipeline.

<filter name="{NAME_OF_FILTER_STREAM}">
    <input ref="{INPUT_STREAM_NAME}" />
    <all|any>
        <when field="{NAME_OF_FIELD}" op="eq|ne|gt|lt|gte|lte">
            <bool|string|int|uuid>value</bool|string|int|uuid>
        </when>
    </all|any>
</filter>

This logic can be used to filter out rejects and log them to a separate table.

<writer mode="insert" truncate="true" name="write_patients">
  <input ref="map_patients" />
  <connection ref="output" />
  <target ref="patients_tbl" />
</writer>
<!-- Filter for those rejected records -->
<filter name="filter_rejects">
  <input ref="write_patients" />
  <all>
    <when field="$reject" op="eq">
      <bool>true</bool>
    </when>
  </all>
</filter>
<!-- Transform the Rejects -->
<transform name="transform_rejects">
  <input ref="filter_rejects" />
  <map>
    <source name="patient_id" />
    <target name="patient_id" />
  </map>
  <map>
    <source name="$reject.reason" />
    <target name="REASON" />
  </map>
</transform>
<writer mode="insertUpdate" truncate="false" name="write_rejects">
  <input ref="transform_rejects" />
  <connection ref="output" />
  <target ref="rejected_patients_tbl" />
</writer>
<!-- OPTIONAL: HALT the entire pipeline if writing the rejects fails -->
<filter name="filter_rejects_from_write">
  <input ref="write_rejects" />
  <all>
    <when field="$reject" op="eq">
      <bool>true</bool>
    </when>
  </all>
</filter>
<!-- Halt on first failure to even persist a reject -->
<halt name="halt_rejects">
  <input ref="filter_rejects_from_write" />
  Committing patient reject {{patient_id}} failed.
</halt>

Union Result Sets

Two or more result sets can be unioned together. The result is a new enumerable with the union result sets in the order they were included.

<union name="{NAME_OF_UNION}">
    <input ref="{NAME_OF_FIRST_RESULT_SET}" />
    <with ref="{NAME_OF_OTHER_STREAM}" />
    <with...
</union>

Halt Execution

The result set can be halted at any time with the <halt> pipeline step. The HALT operation currently halts on the first encounter of a record being processed.

<halt name="{NAME_OF_HALT}">
    <input ref="{INPUT_STREAM}" />
    MESSAGE FOR YOUR HALT MESSAGE WITH {{ROW_NUMBER}} IN BRACES
</halt>

Last updated