streamsx.eventstore package

Event Store integration for IBM Streams

For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:

Overview

Provides classes and functions to insert IBM Streams tuple data into a table in IBM Db2 Event Store.

IBM Db2 Event Store is an in-memory database designed to rapidly ingest and analyze streamed data in event-driven applications. It provides the fabric for fast data with its ability to process massive volume of events in real-time, coupled with optimization for streamed data performance for advanced analytics and actionable insights.

Sample

A simple example of a Streams application inserting rows to a table in a Db2 Event Store database:

from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology.context import submit
from streamsx.topology.context import ContextTypes
import streamsx.spl.toolkit
import streamsx.eventstore as es

topo = Topology('EventStoreSample')

# provide connection endpoint information in format <HostIP:Port from JDBC URL>;<SCALA connection URL>
es_connection = 'HostIP:Port1;HostIP:Port2'

# generate sample tuples with the schema of the target table
s = topo.source([1,2,3,4,5,6,7,8,9])
schema=StreamSchema('tuple<int32 id, rstring name>').as_tuple()
s = s.map(lambda x : (x,'X'+str(x*2)), schema=schema)

# insert tuple data into table as rows
s.for_each(es.Insert(connection=es_connection, database='TESTDB', table='SampleTable', schema_name='sample', primary_key='id', partitioning_key='id', truststore=es_truststore, keystore=es_keystore))

submit (ContextTypes.DISTRIBUTED, topo)
# The Streams job is kept running.
class streamsx.eventstore.Insert(table, schema_name=None, database=None, connection=None, user=None, password=None, config=None, batch_size=None, front_end_connection_flag=None, max_num_active_batches=None, partitioning_key=None, primary_key=None, truststore=None, truststore_password=None, keystore=None, keystore_password=None, plugin_name=None, plugin_flag=None, ssl_connection=None)

Bases: streamsx.topology.composite.ForEach

Inserts tuple into a table using Db2 Event Store Scala API.

Important: The tuple field types and positions in the IBM Streams schema must match the field names in your IBM Db2 Event Store table schema exactly.

Creates the table if the table does not exist. Set the primary_key and partitioning_key in case the table needs to be created.

Example of a Streams application inserting rows to a table in a Db2 Event Store database:

import streamsx.eventstore as es

# generate sample tuples with the schema of the target table
s = topo.source([1,2,3,4,5,6,7,8,9])
schema=StreamSchema('tuple<int32 id, rstring name>').as_tuple()
s = s.map(lambda x : (x,'X'+str(x*2)), schema=schema)

# insert tuple data into table as rows
s.for_each(es.Insert(config='eventstore', table='SampleTable', schema_name='sample', primary_key='id', partitioning_key='id'))
Parameters
  • table (str) – The name of the table into which you want to insert rows.

  • schema_name (str) – The name of the table schema name of the table into which to insert data.

  • database (str) – The name of the database, as defined in IBM Db2 Event Store. Alternative this parameter can be set with function configure_connection().

  • connection (str) – The set of IP addresses and port numbers needed to connect to IBM Db2 Event Store. Alternative this parameter can be set with function configure_connection().

  • user (str) – Name of the IBM Db2 Event Store User in order to connect. Alternative this parameter can be set with function configure_connection().

  • password (str) – Password for the IBM Db2 Event Store User in order to connect. Alternative this parameter can be set with function configure_connection().

  • config (str) – The name of the application configuration. Value returned by the function configure_connection().

  • batch_size (int) – The number of rows that will be batched in the operator before the batch is inserted into IBM Db2 Event Store by using the batchInsertAsync method. If you do not specify this parameter, the batchSize defaults to the estimated number of rows that could fit into an 8K memory page.

  • front_end_connection_flag (bool) – Set to True to connect through a Secure Gateway (for Event Store Enterprise Edition version >= 1.1.2 and Developer Edition version > 1.1.4)

  • max_num_active_batches (int) – The number of batches that can be filled and inserted asynchronously. The default is 1.

  • partitioning_key (str) – Partitioning key for the table. A string of attribute names separated by commas. The partitioning_key parameter is used only, if the table does not yet exist in the IBM Db2 Event Store database.

  • primary_key (str) – Primary key for the table. A string of attribute names separated by commas. The order of the attribute names defines the order of entries in the primary key for the IBM Db2 Event Store table. The primary_key parameter is used only, if the table does not yet exist in the IBM Db2 Event Store database.

  • truststore (str) – Path to the trust store file for the SSL connection.

  • truststore_password (str) – Password for the trust store file given by the truststore parameter. Alternative this parameter can be set with function configure_connection().

  • keystore (str) – Path to the key store file for the SSL connection.

  • keystore_password (str) – Password for the key store file given by the keystore parameter. Alternative this parameter can be set with function configure_connection().

  • plugin_name (str) – The plug-in name for the SSL connection. The default value is IBMIAMauth.

  • plugin_flag (str|bool) – Set “false” or False to disable SSL plugin. If not specified, the default is use plugin.

  • ssl_connection (str|bool) – Set “false” or False to disable SSL connection. If not specified the default is SSL enabled.

Returns

Stream termination

Return type

streamsx.topology.topology.Sink

New in version 2.8.

populate(topology, stream, name, **options) → streamsx.topology.topology.Sink

Populate the topology with this composite for each transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

sink = input_stream.for_each(myForEachComposite)
Parameters
  • topology – Topology containing the composite map.

  • stream – Stream to be transformed.

  • name – Name passed into for_each.

  • **options – Future options passed to for_each.

Returns

Termination for this composite transformation of stream.

Return type

Sink

class streamsx.eventstore.SQLStatement(credentials, **options)

Bases: streamsx.database._database.JDBCStatement

Runs a SQL statement using Db2 Event Store client driver and JDBC database interface.

The statement is called once for each input tuple received. Result sets that are produced by the statement are emitted as output stream tuples.

This class bases streamsx.database.JDBCStatement, includes the JDBC driver (‘ibm-event_2.11-1.0.jar’) and sets defaults for Db2 Event Store database:

  • jdbc_driver_class = ‘COM.ibm.db2os390.sqlj.jdbc.DB2SQLJDriver’

  • ssl_connection = True

  • keystore_type = ‘PKCS12’

  • truststore_type = ‘PKCS12’

  • plugin_name = ‘IBMIAMauth’

  • security_mechanism = 15

Example with “select count” statement and defined output schema with attribute TOTAL having the result of the query:

import streamsx.eventstore as es

sample_schema = StreamSchema('tuple<int32 TOTAL, rstring string>')
sql_query = 'SELECT COUNT(*) AS TOTAL FROM SAMPLE.TAB1'
query = topo.source([sql_query]).as_string()
res = s.map(es.SQLStatement(credentials='eventstore'), schema=sample_schema)
res.print()
Parameters
  • credentials (dict|str) – The credentials of the IBM cloud Db2 warehouse service in JSON or the name of the application configuration.

  • options (kwargs) – The additional optional parameters as variable keyword arguments.

Returns

Result stream.

Return type

streamsx.topology.topology.Stream

Note

This function requires an outgoing Internet connection to download the driver if jdbc_driver_lib is not specified

New in version 2.7.

streamsx.eventstore.configure_connection(instance, name='eventstore', database=None, connection=None, user=None, password=None, keystore_password=None, truststore_password=None, plugin_name=None, plugin_flag=None, ssl_connection=None)

Configures IBM Streams for a connection to IBM Db2 Event Store database.

Creates an application configuration object containing the required properties with connection information.

Example for creating a configuration for a Streams instance with connection details:

from icpd_core import icpd_util
from streamsx.rest_primitives import Instance
import streamsx.eventstore as es

cfg=icpd_util.get_service_instance_details(name='your-streams-instance')
cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
instance = Instance.of_service(cfg)
app_cfg = es.configure_connection(instance, database='TESTDB', connection='HostIP:Port1;HostIP:Port2', user='db2-user', password='db2-password')
Parameters
  • instance (streamsx.rest_primitives.Instance) – IBM Streams instance object.

  • name (str) – Name of the application configuration

  • database (str) – The name of the database, as defined in IBM Db2 Event Store.

  • connection (str) – The set of IP addresses and port numbers needed to connect to IBM Db2 Event Store, format: <HostIP:Port from JDBC URL>;<SCALA connection URL>

  • user (str) – Name of the IBM Db2 Event Store User in order to connect.

  • password (str) – Password for the IBM Db2 Event Store User in order to connect.

  • keystore_password (str) – Password for key store file.

  • truststore_password (str) – Password for trust store file.

  • plugin_name (str) – The plug-in name for the SSL connection.

  • plugin_flag (str) – Set “false” to disable SSL plugin. If not specified the default is plugin is used.

  • ssl_connection (str) – Set “false” to disable SSL connection. If not specified the default is SSL enabled.

Returns

Name of the application configuration.

streamsx.eventstore.download_toolkit(url=None, target_dir=None)

Downloads the latest Eventstore toolkit from GitHub.

Example for updating the Eventstore toolkit for your topology with the latest toolkit from GitHub:

import streamsx.eventstore as es
# download Eventstore toolkit from GitHub
eventstore_toolkit_location = es.download_toolkit()
# add the toolkit to topology
streamsx.spl.toolkit.add_toolkit(topology, eventstore_toolkit_location)

Example for updating the topology with a specific version of the Eventstore toolkit using a URL:

import streamsx.eventstore as es
url220 = 'https://github.com/IBMStreams/streamsx.eventstore/releases/download/v2.2.0/streamsx.eventstore.toolkits-2.2.0-20190731-0640.tgz'
eventstore_toolkit_location = es.download_toolkit(url=url220)
streamsx.spl.toolkit.add_toolkit(topology, eventstore_toolkit_location)
Parameters
  • url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.

  • target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is None a location relative to the system temporary directory is chosen.

Returns

the location of the downloaded Eventstore toolkit

Return type

str

Note

This function requires an outgoing Internet connection

New in version 2.5.

streamsx.eventstore.insert(stream, table, schema_name=None, database=None, connection=None, user=None, password=None, config=None, batch_size=None, front_end_connection_flag=None, max_num_active_batches=None, partitioning_key=None, primary_key=None, truststore=None, truststore_password=None, keystore=None, keystore_password=None, plugin_name=None, plugin_flag=None, ssl_connection=None, schema=None, name=None)

Inserts tuple into a table using Db2 Event Store Scala API.

Important: The tuple field types and positions in the IBM Streams schema must match the field names in your IBM Db2 Event Store table schema exactly.

Creates the table if the table does not exist. Set the primary_key and partitioning_key in case the table needs to be created.

Example of a Streams application inserting rows to a table in a Db2 Event Store database:

# provide connection endpoint information in format <HostIP:Port from JDBC URL>;<SCALA connection URL>
es_connection = 'HostIP:Port1;HostIP:Port2'
# generate sample tuples with the schema of the target table
s = topo.source([1,2,3,4,5,6,7,8,9])
schema=StreamSchema('tuple<int32 id, rstring name>').as_tuple()
s = s.map(lambda x : (x,'X'+str(x*2)), schema=schema)
# insert tuple data into table as rows
res = es.insert(s, connection=es_connection, database='TESTDB', table='SampleTable', schema_name='sample', primary_key='id', partitioning_key='id')
Parameters
  • stream (streamsx.topology.topology.Stream) – Stream of tuples containing the fields to be inserted as a row. Supports streamsx.topology.schema.StreamSchema (schema for a structured stream) as input. The tuple attribute types and positions in the IBM Streams schema must match the field names in your IBM Db2 Event Store table schema exactly.

  • table (str) – The name of the table into which you want to insert rows.

  • schema_name (str) – The name of the table schema name of the table into which to insert data.

  • database (str) – The name of the database, as defined in IBM Db2 Event Store. Alternative this parameter can be set with function configure_connection().

  • connection (str) – The set of IP addresses and port numbers needed to connect to IBM Db2 Event Store. Alternative this parameter can be set with function configure_connection().

  • user (str) – Name of the IBM Db2 Event Store User in order to connect. Alternative this parameter can be set with function configure_connection().

  • password (str) – Password for the IBM Db2 Event Store User in order to connect. Alternative this parameter can be set with function configure_connection().

  • config (str) – The name of the application configuration. Value returned by the function configure_connection().

  • batch_size (int) – The number of rows that will be batched in the operator before the batch is inserted into IBM Db2 Event Store by using the batchInsertAsync method. If you do not specify this parameter, the batchSize defaults to the estimated number of rows that could fit into an 8K memory page.

  • front_end_connection_flag (bool) – Set to True to connect through a Secure Gateway (for Event Store Enterprise Edition version >= 1.1.2 and Developer Edition version > 1.1.4)

  • max_num_active_batches (int) – The number of batches that can be filled and inserted asynchronously. The default is 1.

  • partitioning_key (str) – Partitioning key for the table. A string of attribute names separated by commas. The partitioning_key parameter is used only, if the table does not yet exist in the IBM Db2 Event Store database.

  • primary_key (str) – Primary key for the table. A string of attribute names separated by commas. The order of the attribute names defines the order of entries in the primary key for the IBM Db2 Event Store table. The primary_key parameter is used only, if the table does not yet exist in the IBM Db2 Event Store database.

  • truststore (str) – Path to the trust store file for the SSL connection.

  • truststore_password (str) – Password for the trust store file given by the truststore parameter. Alternative this parameter can be set with function configure_connection().

  • keystore (str) – Path to the key store file for the SSL connection.

  • keystore_password (str) – Password for the key store file given by the keystore parameter. Alternative this parameter can be set with function configure_connection().

  • plugin_name (str) – The plug-in name for the SSL connection. The default value is IBMIAMauth.

  • plugin_flag (str|bool) – Set “false” or False to disable SSL plugin. If not specified, the default is use plugin.

  • ssl_connection (str|bool) – Set “false” or False to disable SSL connection. If not specified the default is SSL enabled.

  • schema (streamsx.topology.schema.StreamSchema) – Schema for returned stream. Expects a Boolean attribute called _Inserted_ in the output stream. This attribute is set to true if the data was successfully inserted and false if the insert failed. Input stream attributes are forwarded to the output stream if present in schema.

  • name (str) – Sink name in the Streams context, defaults to a generated name.

Returns

Stream termination or Output Stream if schema parameter is specified. This output port is intended to output the information on whether a tuple was successful or not when it was inserted into the database.

Return type

streamsx.topology.topology.Sink

Deprecated since version 2.8.0: Use the Insert.

streamsx.eventstore.run_statement(stream, credentials, truststore, keystore, truststore_password=None, keystore_password=None, schema=None, sql=None, sql_attribute=None, sql_params=None, transaction_size=1, jdbc_driver_class='COM.ibm.db2os390.sqlj.jdbc.DB2SQLJDriver', jdbc_driver_lib=None, ssl_connection=True, keystore_type='PKCS12', truststore_type='PKCS12', plugin_name='IBMIAMauth', security_mechanism=15, vm_arg=None, name=None)

Runs a SQL statement using Db2 Event Store client driver and JDBC database interface.

The statement is called once for each input tuple received. Result sets that are produced by the statement are emitted as output stream tuples.

This function includes the JDBC driver (‘ibm-event_2.11-1.0.jar’) for Db2 Event Store database (‘COM.ibm.db2os390.sqlj.jdbc.DB2SQLJDriver’) in the application bundle per default.

Supports two ways to specify the statement:

  • Statement is part of the input stream. You can specify which input stream attribute contains the statement with the sql_attribute argument. If input stream is of type CommonSchema.String, then you don’t need to specify the sql_attribute argument.

  • Statement is given with the sql argument. The statement can contain parameter markers that are set with input stream attributes specified by sql_params argument.

Example with “select count” statement and defined output schema with attribute TOTAL having the result of the query:

import streamsx.eventstore as es

sample_schema = StreamSchema('tuple<int32 TOTAL, rstring string>')
sql_query = 'SELECT COUNT(*) AS TOTAL FROM SAMPLE.TAB1'
query = topo.source([sql_query]).as_string()
res = es.run_statement(query, credentials=credentials, schema=sample_schema)
Parameters
  • stream (streamsx.topology.topology.Stream) – Stream of tuples containing the SQL statements or SQL statement parameter values. Supports streamsx.topology.schema.StreamSchema (schema for a structured stream) or CommonSchema.String as input.

  • credentials (dict|str) – The credentials of the IBM cloud Db2 warehouse service in JSON or the name of the application configuration.

  • truststore (str) – Path to the trust store file for the SSL connection.

  • keystore (str) – Path to the key store file for the SSL connection.

  • truststore_password (str) – Password for the trust store file given by the truststore parameter.

  • keystore_password (str) – Password for the key store file given by the keystore parameter.

  • schema (StreamSchema) – Schema for returned stream. Defaults to input stream schema if not set.

  • sql (str) – String containing the SQL statement. Use this as alternative option to sql_attribute parameter.

  • sql_attribute (str) – Name of the input stream attribute containing the SQL statement. Use this as alternative option to sql parameter.

  • sql_params (str) – The values of SQL statement parameters. These values and SQL statement parameter markers are associated in lexicographic order. For example, the first parameter marker in the SQL statement is associated with the first sql_params value.

  • transaction_size (int) – The number of tuples to commit per transaction. The default value is 1.

  • jdbc_driver_class (str) – The default driver is for Db2 Event Store database ‘COM.ibm.db2os390.sqlj.jdbc.DB2SQLJDriver’.

  • jdbc_driver_lib (str) – Path to the JDBC driver library file. Specify the jar filename with absolute path, containing the class given with jdbc_driver_class parameter. Per default the ‘ibm-event_2.11-1.0.jar’ is added to the ‘opt’ directory in the application bundle.

  • ssl_connection (bool) – Use SSL connection, default is True

  • keystore_type (str) – Type of the key store file, default is PKCS12.

  • truststore_type (str) – Type of the key store file, default is PKCS12.

  • plugin_name (str) – Name of the security plugin, default is ‘IBMIAMauth’.

  • security_mechanism (int) – Value of the security mechanism, default is 15 (com.ibm.db2.jcc.DB2BaseDataSource.PLUGIN_SECURITY).

  • vm_arg (str) – Arbitrary JVM arguments can be passed to the Streams operator.

  • name (str) – Sink name in the Streams context, defaults to a generated name.

Returns

Result stream.

Return type

streamsx.topology.topology.Stream

Note

This function requires an outgoing Internet connection to download the driver if jdbc_driver_lib is not specified

New in version 2.4.

Deprecated since version 2.7.0: Use the SQLStatement.

Indices and tables