Question

build pipeline in transformer

  • 5 January 2022
  • 8 replies
  • 126 views

I am trying to create transformer pipeline using python sdk , unable to connect transformer engine.. i am getting two id and url for sch.tramsformers command . Please help me


8 replies

Userlevel 3
Badge

Can you share the code you’re trying to run and what error you’re getting? 

 pipeline_builder = sch.get_pipeline_builder(engine_type='transformer', engine_url='xxxxxxyyyyyyyyyyy')

 

error is :

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.6/site-packages/streamsets/sdk/sch.py", line 169, in get_pipeline_builder
    data_collector_instance = authoring_engine._instance
  File "/usr/local/lib/python3.6/site-packages/streamsets/sdk/sch_models.py", line 1600, in _instance
    transformer_id=self.id if self._control_hub.use_websocket_tunneling else None)
  File "/usr/local/lib/python3.6/site-packages/streamsets/sdk/st.py", line 87, in __init__
    **kwargs)
  File "/usr/local/lib/python3.6/site-packages/streamsets/sdk/st_api.py", line 94, in __init__
    result = self._fetch_tunneling_instance_id()
  File "/usr/local/lib/python3.6/site-packages/streamsets/sdk/st_api.py", line 184, in _fetch_tunneling_instance_id
    wait_for_condition(_get_tunneling_instance_id, [self], timeout=300)
  File "/usr/local/lib/python3.6/site-packages/streamsets/sdk/utils.py", line 353, in wait_for_condition
    outcome = condition(*condition_args or [], **condition_kwargs or {})
  File "/usr/local/lib/python3.6/site-packages/streamsets/sdk/st_api.py", line 177, in _get_tunneling_instance_id
    self._tunneling_instance_id = response.json()['instanceId']
  File "/usr/local/lib/python3.6/site-packages/requests/models.py", line 910, in json
    return complexjson.loads(self.text, **kwargs)
  File "/usr/lib64/python3.6/json/__init__.py", line 354, in loads
    return _default_decoder.decode(s)
  File "/usr/lib64/python3.6/json/decoder.py", line 339, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib64/python3.6/json/decoder.py", line 357, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
 

 

Userlevel 3
Badge

Hm, that’s strange. When you pass the engine URL, it simply does a look-up of the URLs of engines in the list within Control Hub. Assuming that URL is correct and well-formed, I don’t know why it would fail like that.

 

If you’re a customer, please open a Support ticket and we can have an engineer take a look at this more closely in your environment to figure out what’s going on.

also one more question 

 

 

lets say i have created transformer pipeline in UI and if i download that UI code as jason and export into Python sdk will this get converted to python sdk code?

@dima any one can answer this?

Userlevel 3
Badge

Hi @krishnankannan,

 

No, at the moment there is no functionality that takes a pipeline JSON and generates its equivalent SDK for Python code.

Would you mind sharing some more details about how such functionality would benefit you and your use case?

  1. Directly import a pipeline from JSON, creating a new pipeline instance in Control Hub:

with open('./exported_from_sch.json', 'r') as input_file:    pipeline_json = json.load(input_file)pipeline = sch.import_pipeline(pipeline=pipeline_json,                               commit_message='Imported pipeline from JSON',                               name='My new pipeline')

 

 

based on help document i think we can export /import pipeline in sdk using json 

Userlevel 3
Badge

Yes, you can import and export pipeline files like that. I thought you were asking whether doing so could create SDK code that creates a pipeline without needing the JSON. If all you’re looking for is dealing with the JSONs themselves, the docs should point you in the right direction.

I’d still love to hear more about your use case, by the way. If you’re on our community Slack, feel free to drop me a DM.

Reply