streamsx.eventstore package

IBM Streams Event Store integration

For details of implementing applications in Python for IBM Streams including IBM Cloud Private for Data and the Streaming Analytics service running on IBM Cloud see:

This package exposes SPL operators in the com.ibm.streamsx.eventstore toolkit as Python methods.

Overview

Provides a function to insert IBM Streams tuple data into a table in IBM Db2 Event Store.

IBM Db2 Event Store is an in-memory database designed to rapidly ingest and analyze streamed data in event-driven applications. It provides the fabric for fast data with its ability to process massive volume of events in real-time, coupled with optimization for streamed data performance for advanced analytics and actionable insights.

Sample

A simple example of a Streams application inserting rows to a table in a Db2 Event Store database:

from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology.context import submit
from streamsx.topology.context import ContextTypes
import streamsx.spl.toolkit
import streamsx.eventstore as es

topo = Topology('EventStoreSample')

# add event store toolkit corresponding to your Event Store installation (developer/enterprise edition) and version
streamsx.spl.toolkit.add_toolkit(topo, 'PATH_TO_STREAMSX_EVENTSTORE_TOOLKIT_LOCATION')

# provide connection endpoint information
es_connection = 'HostIP:Port'

# generate sample tuples with the schema of the target table
s = topo.source([1,2,3,4,5,6,7,8,9])
schema=StreamSchema('tuple<int32 id, rstring name>').as_tuple()
s = s.map(lambda x : (x,'X'+str(x*2)), schema=schema)

# insert tuple data into table as rows
res = es.insert(s, connection=es_connection, database='TESTDB', table='SampleTable', primary_key='id')

submit (ContextTypes.DISTRIBUTED, topo)
# The Streams job is kept running.
streamsx.eventstore.insert(stream, connection, database, table, user=None, password=None, config=None, batch_size=None, front_end_connection_flag=None, max_num_active_batches=None, partitioning_key=None, primary_key=None, schema=None, name=None)

Inserts tuple into a table using Db2 Event Store Scala API.

Important: The tuple field types and positions in the IBM Streams schema must match the field names in your IBM Db2 Event Store table schema exactly.

Creates the table if the table does not exist. Set the primary_key and/or partitioning_key in case the table needs to be created.

Parameters:
  • stream (Stream) – Stream of tuples containing the fields to be inserted as a row. Supports streamsx.topology.schema.StreamSchema (schema for a structured stream) as input. The tuple attribute types and positions in the IBM Streams schema must match the field names in your IBM Db2 Event Store table schema exactly.
  • connection (str) – The set of IP addresses and port numbers needed to connect to IBM Db2 Event Store.
  • database (str) – The name of the database, as defined in IBM Db2 Event Store.
  • table (str) – The name of the table into which you want to insert rows.
  • user (str) – Name of the IBM Db2 Event Store User in order to connect.
  • password (str) – Password for the IBM Db2 Event Store User in order to connect.
  • config (str) – The name of the application configuration. If you specify parameter values in the configuration object, they override the values of user and password parameters. Supported properties in the application configuration are: “eventStoreUser” and “eventStorePassword”.
  • batch_size (int) – The number of rows that will be batched in the operator before the batch is inserted into IBM Db2 Event Store by using the batchInsertAsync method. If you do not specify this parameter, the batchSize defaults to the estimated number of rows that could fit into an 8K memory page.
  • front_end_connection_flag (bool) – Set to True to connect through a Secure Gateway (for Event Store Enterprise Edition version >= 1.1.2 and Developer Edition version > 1.1.4)
  • max_num_active_batches (int) – The number of batches that can be filled and inserted asynchronously. The default is 1.
  • partitioning_key (str) – Partitioning key for the table. A string of attribute names separated by commas. The partitioning_key parameter is used only, if the table does not yet exist in the IBM Db2 Event Store database.
  • primary_key (str) – Primary key for the table. A string of attribute names separated by commas. The order of the attribute names defines the order of entries in the primary key for the IBM Db2 Event Store table. The primary_key parameter is used only, if the table does not yet exist in the IBM Db2 Event Store database.
  • schema (StreamSchema) – Schema for returned stream. Expects a Boolean attribute called “_Inserted_” in the output stream. This attribute is set to true if the data was successfully inserted and false if the insert failed. Input stream attributes are forwarded to the output stream if present in schema.
  • name (str) – Sink name in the Streams context, defaults to a generated name.
Returns:

Stream termination or Output Stream if schema parameter is specified. This output port is intended to output the information on whether a tuple was successful or not when it was inserted into the database.

Return type:

streamsx.topology.topology.Sink

Indices and tables