Tutorial

Zero to StreamSets

  • 18 October 2023
  • 0 replies
  • 63 views

  • Anonymous
  • 0 replies

An in-depth guide into starting StreamSets, deploying engines, and running pipelines.

 

Overall architecture recap

The control plane of StreamSets Platform, Control Hub, hosted in the cloud. The data plane encompasses StreamSets Data Collectors (SDCs), which should be installed in a customer’s network, be it in the cloud or on-premise.

9uR2IiArj6ExjFIAQEUeo0S6vIqADAF1EennOANbezWxFjuEX39jPTD2Ukpdawn3ck9E_iLUvEpRigW0cj23bFU5XV-LYuD7y3foCyV5XWBdLq5ttj7QlBNAMW7ESUbBknpPI1V6000kXLQqxLhe8yo

 

 

Consider these options for SDC installation (not all of them are recommended for production scenario):

 

Tarball (binary download)

Docker container

Windows PC

not supported

 

OK – the fastest to implement

Linux/Mac

OK with configured prerequisites

 

Depending on your choice, find below instructions for major steps of tarball and Docker options.

 

Provide and prepare your machine

Use your own laptop or create a VM in the cloud of your choice. VM size should be determined by the minimum requirements for SDC installation: 2 vCores, 1GB RAM, 6GB disk space.

If on Linux, run a system update command, like yum update or apt-get update

If your machine requires an HTTP proxy to access the Internet, make note of the proxy details, you will need them.

 

Docker option

Instal Docker Desktop on your Mac or Windows  – https://docs.docker.com/desktop/

Or install on Install Docker Engine on a Linux VM – https://docs.docker.com/engine/install/

Tarball option

  1. If not already installed, install Java 8, 11 or 17. Installation details differ between Linux versions, not mentioning MacOS, and it would be impractical to copy them here.
  2. Configure the Open File Limit.

In Linux, it means adding a line like the following to the file /etc/security/limits.conf (you need local admin rights for that):

*                -    nofile          32768

(you will need to log off and log on for the change to have effect)

In MacOS, the process has few more steps and is best covered by this link:  https://docs.streamsets.com/portal/platform-datacollector/latest/datacollector/UserGuide/Installation/Requirements.html#concept_al3_qz5_jz

 

Configure a Streamsets Platform Tenant

Sign-up for StreamSets Free Trial

 

On any computer (it doesn’t have to be the machine for SDC installation), proceed to https://cloud.login.streamsets.com/signup and complete sign-up.

 

Confirm your registration via the link sent to you by email, then log on to the Platform. On your first logon you will need to specify:

  • the region of the Platform instance: there are instances in North America, Europe and AsiaPacific, but only the former two available for Free Trial;
  • Your Company name (you can change it later if needed);
  • Agree to Terms and Conditions.

Create a new Deployment

 

Menu Set Up -> Deployments -> Click on “Create a deployment” link.

 

Enter the new Deployment’s name.

yJsSVTzm8Y6A-VVZh-vXIvy1ZLD_vG6UBeuh2-oXpIcgD6voF8H360FjeIY-O7vUxQqP7p9Yj3gZcdvZgZnxOVFhvLq7_vuqm7RKxlz-ewYK7a06Ab88Urmbev9ZEV5aHSnaTEDjE1KoObLp1fn4Luc

 

Leave everything else default, for example, engine version may be higher than in the screenshot. Click Save & Next.

 

The Configure Deployment screen requires two actions: add required libraries, and configure proxy settings (this bit can be skipped if no proxy is used).

Ko_DnIfSMcwNi8IbNI4MEl4J_mil-rrQgUfgebZlsi_TAaFV5vLaQdrJYEhUinzDtlo-mhNiMay97o9t_kUFtJ5GJYiBlxejU5SqGX5rNq8yWO57y_hPJyX1fW3eiMZHculfLUbTWAHHXDHR8ant5cU

 

To add libraries, click on the link next to Stage Libraries. Then search for JDBC (it will be required when building a pipeline) and click the big “+” on any stage in the resulting list – all of them use the same library:

uLsaXXUOpTCyvRVxexlwz_Oeoqp4qvDZl6dnSh5fk0nGssCFmvcnS8gtk-JJNiTyWT2LHAWdEH4U0qwQ033ap7muwLsJO9xzgns5yi1H6nkHrjwYu7_IW8S8DxnhjVbaHDrtqIuOeV_rRtTyYzkAe8k

Click Ok to add this library, then click Ok to exit to the deployment wizard. If your SDC is not behind a proxy, just leave everything else default and click Save & Next again.

 

If your SDC is behind a proxy, click the link next to Advanced Configuration and go to the Proxy tab like in the screenshots below.

dwG0Eu0--ILEICmR0Ydjk_6GTXatkKQn15LqdqQW3pjbsLN_2NFbb9BIjaB_MoSAJjPczOPCm6eF4IYZVGXbESsBfhvduQjFyIOqs94SK0nniakjQg5PMLF_trdGSkswW080dBkHncPuFA9BN8GDmx8

Fill in the required details for accessing your proxy.

 

In the next screen, select your Install Type for your data collector engine. Click Start & Next for the next screen, then click Start & Generate Install Script on the final one. Copy the install script into your clipboard.

 

Install Data Collector

Docker option

The generated script is just a command to run a Docker container:

docker run -d  -e http_proxy= -e https_proxy= -e STREAMSETS_DEPLOYMENT_SCH_URL=https://eu01.hub.streamsets.com -e STREAMSETS_DEPLOYMENT_ID=<some random string> -e STREAMSETS_DEPLOYMENT_TOKEN=<really long random string> -e ENGINE_SHUTDOWN_TIMEOUT=10 streamsets/datacollector:5.6.1

 

You may make some changes to this command, for example:

–  add “sudo” in the beginning, unless the Docker engine is configured to work with non-root accounts;

– container name, e.g. “--name sdc561”;

volume mapping, if local file system is going to be used.

 

Run this command in the terminal/command window on your VM or laptop.

Tarball option

The generated script will look like this (but if you had configured a proxy the script will include all those details):

 http_proxy= https_proxy= bash -c 'set -eo pipefail; curl -fsS https://eu01.hub.streamsets.com/streamsets-engine-install.sh | bash -s -- --deployment-id="<some random string>" --deployment-token="<really long random string>" --sch-url="https://eu01.hub.streamsets.com" --engine-shutdown-timeout="10" --foreground '

 

Run this command in the terminal window for your VM or Mac. The easiest/fastest (and not production-recommended) option is to run this script as your own regular user and accept all defaults that the script suggests. 

 

Verify Data Collector availability

At the end of either Docker or Tarball install, you should have your SDC application running. 1-2 minutes after the SDC starts up, you should also be able to see it in the list of Engines in the Platform UI:

9E9xzY_JjbL5kMKD8D0igXv3g0w9vJt5-WghgBs-Q9ubpOHlw2XETbFY4NTAZbHMdmNg24YnBnRrQdZupYjNjXKSZY81TyG7rYdqh9Jlo7iCUenhsJxypg-JOAaYbMKw4Ajl7htD6P03t3-qdlBCdCk

 

Develop your first pipelines

The use case covered below is scraping semi-structured data from the Web to a relational storage.

 

To follow the “datalake zones” approach, we will scrape semi-structured data into the raw “zone”, and then process it to extract select information.

 

In the Platform UI, open menu Build -> Pipelines. 

Click on Create Pipeline link. 

dCBsVCjP_CtfcPqfMIxRr0vbd_GVd2jJQYaeVLuItP0oIhuU23S5TBepdiyGo_Gi0-kHCGq2Z_FiWIpGr3K8AY2am-H5Q_5DKXPqijsbUOnkJBAz6qGNi4m3463wHgMo9WzD_IDCKOTFsHDBa7CSXTA

Maintain the pipeline’s Name, then click Next.

 

gjZ7WXcM8XSauhrG680h7eG-b6h1fQrO2p7fVXIOv3TjSvsue9X2DG6y7Rxt_Wt9i1FAsh_uTEWd9d78qGfHkiEGusHJYnmCcQGos4RIkxho35l1L1qity1iq2FL_cnA4N3kcv10UIV1jwybTB-bTq8

 

Select the SDC you’ve just installed as your Authoring Engine, and click Save & Open in Canvas.

C8ONFUvZ5PsmMTctcEz2li_RPhXkySblg-G6TZspqvP1n4lT5BGmVzHt2B8_Ci8leuw41eua7qQaPDZD3KsUCnA9y0pt_BiwZxOXNTRuZX6ho0XXK-3mimBuEsV4VcMWTq8Z409Rj4BeimH2jTU7YHM

In the canvas, click on Add Stage and find HTTP Client origin (you can use search). The origin’s configuration will open automatically. 

 

QufyhP-UZ4H6vN3zyulOdbAcB6KCwUCJVc7mCvqbyi0VoQgx_E34kND3A5YMpPkCWoYG5oZKZtXd5odAZctzoVFU6EjnfDGqH1tJrAHuasKawN6vPUbpVbzXltwtH1SU5yvfV7-XWYxK2vSiu2UIMmo

Maintain configuration:

Set the expected incoming Data Format to JSON format, and Max Object Length (max size of a single record) to 10000 characters:

rEQtc3ss31VoLolOmhLPQIw_5RNSz2-1_sshtp-SpHRPdz3QTQblyfSvVgn6U9GM6spgnMAciUJmTpySie-7rfHTxwHKOgzxk4B7O3BGqO-KSaN54DnNMJ-0M93hWMjvp6MegQC_zh6PonCngWND7y8

 

Click on Add Stage and select Local FS destination. Set the target location to /tmp/raw_weather:

6W_FLX2iZFAq8e7uAg7XEXN92C51Arvr1NhFKtEFu6W0_8XHVvAxxOHDjraoeRYZg19w2FDiLnYC9mpMQUXv6U4srLYTGXDtHQRE2dv2uv2DLmjUYMOKroqzNfPFa8VE6MTvYys9gxqEGEVghBwrCsk

… and specify the output Data Format (choose JSON).

 

Execute the pipeline, it should finish within a minute, processing 1 record and generating a file in /tmp/raw_weather directory (if your SDC runs in a Docker container, the file will be created in that container, too).

 

Create a second pipeline, and use Directory origin, use /tmp/raw_weather in Files Directory configuration setting, * as files pattern, and Last Modified Timestamp for Read Order. 

iaDj-PN547_N_JOE3KjtPBsK-PkKAZ9ogl6YEEMwo4UN6-VdM5OFk6eX0sEh9jqI3sJJLezJkfT5wDrVJT8xbmBMIm2GbCGG9-yqNxpfxdV4xjCoqOLDqYms2AHn3UcZsPlLlXO2XmOsa2GHd6Ulh98

 

Set Data Format for JSON again, and set max record size to 10000.

 

Let’s have a look at the data. Click on Preview button in the toolbar, ans select Run Preview action:

183JpM4Jcgjpgl73UTxcO8mtZMohR60101sOjpYfkmJkpOJNFaRt9-5wVna9NmzMkHZG7mx7gQNPqBHQiaOvJX4bA0ZIVnDmv_Rk758PDlaZOyu6oactuM6K1H5ebOVoSzkDKNdgPYHrsVfOLRSae8U

After a few seconds, you will see data preview in the lower half of the screen. The data is nested:

4uWhapZ4cXXFfwwjau-6yqTTQOGYpSkPmIjKbPDFTTPHOCnEoVHfDWLIQJk8kjtbC4SMDN89yyy_HZXAfOO4v8tpcldTJ-ysmGRVgkhv80i0UJ_bF3M1R32XjT9-l_lkpkx_Jz9mZPWenefXH2LMTvY

Expand the hourly node. It contains several lists – columns of values, where the first values from each list would form the first record, etc. If only there was a way to zip those values together..

 

Add a new stage Field Zip, specify /hourly/time as the first field to zip, and /hourly/temperature_2m as the second field, set the zipped field to /zipped:

 

3jHl66QA3y1SbD8S194CaZEu4pPOGcXLdaoapWo988Bl2SVRsyM9JG-N6-WCdVIYQdKToSnlkF-OMFqdrITE7vlM0opCe7tE09Jug8hnZBMva0UYc4AAGOA4Pb8kwRU4lXalCpoUtTrglj6SeGsJMBA

You may want to run Preview again to check how resulting data looks. To zip in more fields, you need to chain more Field Zip processors, but at the moment let’s just proceed with only two.

 

As we don’t need the /hourly field anymore, let’s drop it using Field Remover stage. Getting rid of unneeded fields early helps to keep the memory footprint of the pipeline low.

pQHeYazFVWE3G5vftXbqvDDDIE0CFj3lBVrythnzw_L702GbV9LcLqjZbUYJ-ASTg0C2B519bZTGYiH6IdGn8a5TcYuWbCNhTKrK2-AMZuITSVUlEtfLhwbjFYKyuHW61NiXB9RjkLA3d4cpS2tShiU

The zipped list of values still resides in a single record. In a relational database model, each measurement should be a separate record – This is achieved with the Field Pivoter processor. The field to pivot is /zipped, and let’s put the pivoted result to /hourly_measures.

QS2NXdk2iJy7Q_j7ENHqtd7zO5m2OM4DEJYnTbjUiE_-QlHWk4LeGSxpoDzIqJ9R557oPTUNCOZY0yvlJJF7CmUlKv76Lbi1afmht-Z_swejDMorCMxoE01ZJgIFIvVxqdHa3aWlLeqUHxa0xwsR2YQ

 

The last thing to do is convert nested fields to flat records suitable for loading into a relational database. Add Field Flattener  processor and configure it to flatten the entire record. To simplify further processing, use underscore as the name separator, instead of the default dot.

kenipEXHjETu8QpLYZjCrqJ_T42RiXT7yw6mO_XWXn-kQn8V1TBHSgnsN3pDTxhxZUHMh7ZAJ_Mm2emX3VBtXJsR8tUJXn79kLddkUY1FkeG48MooJYU95lBchuwkeEwBy76M83nAMe9Ng3M-88l-aQ

 

At this stage, the data is already suitable for relational database load. One extra step that might be required is renaming fields with Field Renamer to match the target schema. 

 

Finish your pipeline with a destination stage of your choice, e.g. Local FS to output flattened data to a directory on SDC machine – by now, you should be able to configure it without much guidance. Alternatively, to output to a SQL database, use JDBC Producer – the example below demonstrates how to configure it for Azure SQL Database. 

Mo04Ffrmib1WnMulhPn-3kN1-pANztTMer-ZX3bt1DXsg4NefQCcDX9DYIQvcD07UebaYDIFx1N5N0CE9Sw9JX41jcYs5vlWq3Ty6WW9lxxOTRvltx7rCOaghpq3KdZL_VUC9aM3nVJfRAW1PCbw5XI

That’s it, your pipeline to process scraped nested data is ready. Before running it, make sure to create the target table in the database:

create table dbo.weather (

latitude real not null,

longitude real not null,

generationtime_ms real null,

utc_offset_seconds int null,

timezone nvarchar(3),

timezone_abbreviation nvarchar(3),

elevation real null,

current_weather_temperature real not null,

current_weather_windspeed real not null,

current_weather_winddirection real not null,

current_weather_weathercode int null,

current_weather_is_day int null,

current_weather_time nvarchar(16) null,

hourly_units_time nvarchar(10) null,

hourly_units_temperature_2m nvarchar(3) null,

hourly_units_relativehumidity_2m nvarchar(3) null,

hourly_units_windspeed_10m nvarchar(6) null,

hourly_measures_time nvarchar(16) not null,

hourly_measures_temperature_2m real not null

);

 

Start the pipeline, and wait until it reports 168 processed records and pauses. Actually, it’s not a pause, the pipeline is listening for new files to arrive, and you can test it by running the first (data scraping) pipeline in a new browser window and then returning to this one: from 168 processed records, the number will change to 336. You can manually stop the pipeline by clicking the Stop button.

 

Conclusion And Next Steps

Congratulations, you have created a new StreamSets Platform tenant, installed a Data Collector engine and used it to extract, transform and load weather data! If nothing unexpected happened, you should have done all this in less than an hour.

There are a couple of things here to reflect about. 

This ETL process can be executed as a batch. For this, you need to create a Job for each pipeline, and then create an orchestration pipeline running transformation/load after extraction. To make the second pipeline stop after all available data is processed, a Pipeline Finisher would be required. This might be a good option if the source is updated infrequently, or subsequent analytics does not require most recent data.

Alternatively, this process could be used for more modern, continuous data delivery with decoupled architecture. In the first pipeline, use Polling mode of HTTP Client origin, to make it scrape data at regular intervals. Create and start Jobs for both pipelines – they are not tightly coupled, but the second pipeline would process a new file every time after the first one creates it. Meanwhile, the storage in the middle doesn’t have to be file-based: these pipelines can be easily changed to use a queueing application like Kafka or Amazon Kinesis

Here some more ideas to explore, in no special order:

  • Platform management features, like version control or Python SDK,
  • other data sources and destinations, like databases (batch and change data capture) or cloud storage/services,
  • if Snowflake is the central data storage of choice, build enterprise data warehouse using pipeline created in Transformer for Snowflake.



 

 

 


0 replies

Be the first to reply!

Reply