Attempting to call pipeline_builder.build() on an exported pipeline JSON fails in the Python SDK.


Userlevel 4
Badge

Issue:

Attempting to make use of streamsets.sdk.sch_models.PipelineBuilder's build()​ while using an exported copy of a pipeline's JSON fails with the following error: 

2020-02-07T18:45:15.3379205Z Traceback (most recent call last):
2020-02-07T18:45:15.3379645Z File "/azp/agent/_work/_temp/66b4ba43-e92a-4988-81c9-10e37c05a857.py", line 44, in <module>
2020-02-07T18:45:15.3379764Z sch.publish_pipeline(pipeline, commit_message=commit_msg)
2020-02-07T18:45:15.3380141Z File "/usr/local/lib/python3.6/site-packages/streamsets/sdk/sch.py", line 276, in publish_pipeline
2020-02-07T18:45:15.3380264Z response_envelope = (executors.get(id=pipeline.sdc_id)
2020-02-07T18:45:15.3380645Z File "/usr/local/lib/python3.6/site-packages/streamsets/sdk/utils.py", line 355, in get
2020-02-07T18:45:15.3380790Z for k, v in kwargs.items())))
2020-02-07T18:45:15.3381094Z ValueError: Instance (id=1595495e-b2c4-11e9-b194-077782d90fb2) is not in list

 

Symptom:

In a pipeline's JSON export, there is an 'sdcId' field that stores the ID of the SDC instance from which the pipeline was exported. 
Making use of streamsets.sdk.sch_models.PipelineBuilder's build() functionality implies that the pipeline JSON being used stores an 'sdcId' that exists (and is registered) within the Control Hub instance that the build() call is being made against.

In this particular scenario, the build() call was made against a Control Hub instance using a pipeline that had been exported from an SDC instance that not registered with said Control Hub. As a result, the 'sdcId' field references an unknown SDC and thus streamsets.sdk.sch_models.PipelineBuilder's build() attempts to build a pipeline on an sdcId that is unknown to it.

 

Solution:

In order for the build to succeed, the imported pipeline JSON needs to make reference to an SDC instance that exists and is registered with Control Hub. In order to modify the JSON in-place and make the necessary adjustment, the following can be used (this example looks up the desired SDC instance by label first):

from streamsets.sdk.sch import ControlHub
import json

#Establish a connection to SCH and generate the SCH object we need
sch = ControlHub('https://cloud.streamsets.com', username="user", password="password")

#Set the SDC labels for the instance we're interested in looking up
sdc_labels = ['labelOne','labelTwo']
dc = sch.data_collectors.get(labels=sdc_labels)

#Load the pipeline JSON that was exported previously
with open("file.json", 'r', encoding='utf-8') as input_file:
 pipeline_json = json.load(input_file)

#Modify the pipeline's JSON in place, and replace the 'sdcId' field with the 'id' of the SDC instance we looked up
pipeline_json['pipelineConfig']['info']['sdcId'] = dc.id

#Build the rest of the pipeline as normal
sch_pb = sch.get_pipeline_builder(data_collector=dc)
sch_pb.import_pipeline(pipeline=pipeline_json)
pipeline = sch_pb.build(title="sdk test")
pipeline._pipeline_definition['metadata'].update({'labels': ['newSDK','pipelineLabelSDK']})
sch.publish_pipeline(pipeline)

 


0 replies

Be the first to reply!

Reply