Question

using SDK python to edit JDBC stop event query

  • 28 March 2023
  • 2 replies
  • 25 views

I had a JDBC connection change that I want to use in a JDBC stop event query. I am hoping to programmatically update the connection in existing pipelines using the python SDK package. I am getting an error publishing the pipeline after updating the connection in the stop event. Has anyone else run into this, or found a solution? 

Note: I have been able to successfully update the connection in Hive Query stop events, and publish the pipeline without error. 


2 replies

Userlevel 4
Badge

Hello @emily.suchan - Thank you for reaching out to the StreamSets community. Would you mind sharing the python code that you are using in the stop event section?

This is a created function used where configurations refers to a dictionary containing the target connection information. 

def start_stop_event_update(pipeline,event,stage,configurations, target_connection):

    is_event_updated = False

    if pipeline._data_internal['executorType'] =='COLLECTOR':

        for config_key,target_config_value in configurations.items():

            # check if configuration is for library and needs to be updated

            if (config_key == 'library') and (target_config_value != event['library']):

                if stage == 'start_event':

                    stage_call = 'startEventStage'

                else:

                    stage_call = 'stopEventStage'

                pipeline.configuration[stage_call] = target_config_value+pipeline.configuration[stage_call][pipeline.configuration[stage_call].find('::'):]

                event['library'] = target_config_value

                is_event_updated = True

                logger.info("start/stop event library has been updated. Please verify connection information is pointing to PCA")

            # check if configuration is not for library

            elif (config_key != 'library'):

                if event['configuration'] == 'config.script':

                    pass

                else:

                    current_config_value = event['configuration'][0]['value']

                # check value is a parameter

                    is_parameter,parameter_key,parameter_value = check_if_parameter(pipeline,current_config_value)

                # if value is a parameter and not target value, update parameter

                    if (is_parameter) and (parameter_value != target_config_value):

                        parameter_update(pipeline,parameter_key,target_config_value)

                        is_event_updated = True

                    # if value is not a parameter and is not target value, update

                    elif (not is_parameter) and (current_config_value != target_config_value):

                        event['configuration'][0]['value'] = target_connection

                        is_event_updated = True

            return event, is_event_updated

    else:

        pass

   

This is the code snippet in the loop:

for stage, configurations in stage_configuration_update_dict.items():

            stop_event = pipeline._pipeline_definition['stopEventStages'][0]

            is_event_updated = False

            if (stage == stop_event['instanceName']):

                print(stage)

                if stage == 'HiveQuery_StopEventStage':

target_connection = ‘’

                    pipeline._pipeline_definition['stopEventStages'][0], is_event_updated = start_stop_event_update(pipeline,stop_event,stage,configurations, target_connection)

                elif stage == 'JDBCQuery_StopEventStage':

target_connection = ‘’

                    pipeline._pipeline_definition['stopEventStages'][0], is_event_updated = start_stop_event_update(pipeline,stop_event,stage,configurations, target_connection)

if is event_updated == True:

control_hub.publish_pipeline(pipeline, commit_message = ‘’)

Reply