streamsx.eventstore package

IBM Streams Event Store integration

For details of implementing applications in Python for IBM Streams including IBM Cloud Private for Data and the Streaming Analytics service running on IBM Cloud see:

This package exposes SPL operators in the com.ibm.streamsx.eventstore toolkit as Python methods.

Overview

Provides a function to insert IBM Streams tuple data into a table in IBM Db2 Event Store.

IBM Db2 Event Store is an in-memory database designed to rapidly ingest and analyze streamed data in event-driven applications. It provides the fabric for fast data with its ability to process massive volume of events in real-time, coupled with optimization for streamed data performance for advanced analytics and actionable insights.

Sample

A simple example of a Streams application inserting rows to a table in a Db2 Event Store database:

from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology.context import submit
from streamsx.topology.context import ContextTypes
import streamsx.spl.toolkit
import streamsx.eventstore as es

topo = Topology('EventStoreSample')

# provide connection endpoint information in format <HostIP:Port from JDBC URL>;<SCALA connection URL>
es_connection = 'HostIP:Port1;HostIP:Port2'

# generate sample tuples with the schema of the target table
s = topo.source([1,2,3,4,5,6,7,8,9])
schema=StreamSchema('tuple<int32 id, rstring name>').as_tuple()
s = s.map(lambda x : (x,'X'+str(x*2)), schema=schema)

# insert tuple data into table as rows
res = es.insert(s, connection=es_connection, database='TESTDB', table='SampleTable', schema_name='sample', primary_key='id')

submit (ContextTypes.DISTRIBUTED, topo)
# The Streams job is kept running.
streamsx.eventstore.insert(stream, table, schema_name=None, database=None, connection=None, user=None, password=None, config=None, batch_size=None, front_end_connection_flag=None, max_num_active_batches=None, partitioning_key=None, primary_key=None, truststore=None, truststore_password=None, keystore=None, keystore_password=None, plugin_name=None, schema=None, name=None)

Inserts tuple into a table using Db2 Event Store Scala API.

Important: The tuple field types and positions in the IBM Streams schema must match the field names in your IBM Db2 Event Store table schema exactly.

Creates the table if the table does not exist. Set the primary_key and/or partitioning_key in case the table needs to be created.

Example of a Streams application inserting rows to a table in a Db2 Event Store database::
# provide connection endpoint information in format <HostIP:Port from JDBC URL>;<SCALA connection URL> es_connection = ‘HostIP:Port1;HostIP:Port2’ # generate sample tuples with the schema of the target table s = topo.source([1,2,3,4,5,6,7,8,9]) schema=StreamSchema(‘tuple<int32 id, rstring name>’).as_tuple() s = s.map(lambda x : (x,’X’+str(x*2)), schema=schema) # insert tuple data into table as rows res = es.insert(s, connection=es_connection, database=’TESTDB’, table=’SampleTable’, schema_name=’sample’, primary_key=’id’)
Parameters:
  • stream (Stream) – Stream of tuples containing the fields to be inserted as a row. Supports streamsx.topology.schema.StreamSchema (schema for a structured stream) as input. The tuple attribute types and positions in the IBM Streams schema must match the field names in your IBM Db2 Event Store table schema exactly.
  • table (str) – The name of the table into which you want to insert rows.
  • schema_name (str) – The name of the table schema name of the table into which to insert data.
  • database (str) – The name of the database, as defined in IBM Db2 Event Store. Alternative this parameter can be set with function configure_connection().
  • connection (str) – The set of IP addresses and port numbers needed to connect to IBM Db2 Event Store. Alternative this parameter can be set with function configure_connection().
  • user (str) – Name of the IBM Db2 Event Store User in order to connect. Alternative this parameter can be set with function configure_connection().
  • password (str) – Password for the IBM Db2 Event Store User in order to connect. Alternative this parameter can be set with function configure_connection().
  • config (str) – The name of the application configuration. Value returned by the function configure_connection().
  • batch_size (int) – The number of rows that will be batched in the operator before the batch is inserted into IBM Db2 Event Store by using the batchInsertAsync method. If you do not specify this parameter, the batchSize defaults to the estimated number of rows that could fit into an 8K memory page.
  • front_end_connection_flag (bool) – Set to True to connect through a Secure Gateway (for Event Store Enterprise Edition version >= 1.1.2 and Developer Edition version > 1.1.4)
  • max_num_active_batches (int) – The number of batches that can be filled and inserted asynchronously. The default is 1.
  • partitioning_key (str) – Partitioning key for the table. A string of attribute names separated by commas. The partitioning_key parameter is used only, if the table does not yet exist in the IBM Db2 Event Store database.
  • primary_key (str) – Primary key for the table. A string of attribute names separated by commas. The order of the attribute names defines the order of entries in the primary key for the IBM Db2 Event Store table. The primary_key parameter is used only, if the table does not yet exist in the IBM Db2 Event Store database.
  • truststore (str) – Path to the trust store file for the SSL connection.
  • truststore_password (str) – Password for the trust store file given by the truststore parameter. Alternative this parameter can be set with function configure_connection().
  • keystore (str) – Path to the key store file for the SSL connection.
  • keystore_password (str) – Password for the key store file given by the keystore parameter. Alternative this parameter can be set with function configure_connection().
  • plugin_name (str) – The plug-in name for the SSL connection. The default value is IBMPrivateCloudAuth.
  • schema (StreamSchema) – Schema for returned stream. Expects a Boolean attribute called “_Inserted_” in the output stream. This attribute is set to true if the data was successfully inserted and false if the insert failed. Input stream attributes are forwarded to the output stream if present in schema.
  • name (str) – Sink name in the Streams context, defaults to a generated name.
Returns:

Stream termination or Output Stream if schema parameter is specified. This output port is intended to output the information on whether a tuple was successful or not when it was inserted into the database.

Return type:

streamsx.topology.topology.Sink

streamsx.eventstore.configure_connection(instance, name='eventstore', database=None, connection=None, user=None, password=None, keystore_password=None, truststore_password=None)

Configures IBM Streams for a connection to IBM Db2 Event Store database.

Creates an application configuration object containing the required properties with connection information.

Example for creating a configuration for a Streams instance with connection details:

    from streamsx.rest import Instance     import streamsx.topology.context     from icpd_core import icpd_util

    cfg=icpd_util.get_service_instance_details(name=’your-streams-instance’)     cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False     instance = Instance.of_service(cfg)     app_cfg = configure_connection(instance, database=’TESTDB’, connection=’HostIP:Port1;HostIP:Port2’, user=’db2-user’, password=’db2-password’)

Parameters:
  • instance (streamsx.rest_primitives.Instance) – IBM Streams instance object.
  • name (str) – Name of the application configuration
  • database (str) – The name of the database, as defined in IBM Db2 Event Store.
  • connection (str) – The set of IP addresses and port numbers needed to connect to IBM Db2 Event Store, format: <HostIP:Port from JDBC URL>;<SCALA connection URL>
  • user (str) – Name of the IBM Db2 Event Store User in order to connect.
  • password (str) – Password for the IBM Db2 Event Store User in order to connect.
  • keystore_password (str) – Password for key store file.
  • truststore_password (str) – Password for trust store file.
Returns:

Name of the application configuration.

Indices and tables