Show us your Pipelines
Share an example of your Pipeline pattern. Upload a Screenshot and detailed description. No wrong answers. Do NOT share personal or credential information.
- 27 Topics
- 22 Replies
Streamsetters! 2 Community members will each earn a $250 Amazon Gift card by submitting the most impressive or creative Pipelines. Please share your pipeline within this category (Show us your Pipeline). It's easy to submit, Make sure to: Create a new Topic in the Show us your Pipeline category. (Each submission needs to have its own topic thread) Add an image of your sample pipeline. (MAKE SURE NO PERSONAL,CREDENTIALS, OR COMPANY INFO IS SCREEN SHOT OR SHARED) Add an explanation of 2-3 sentences of your pipeline use case and benefits. Lastly, Make it fun! Create something useful, fun, creative, and or wacky. The closing date of our 1st “Share your Pipelines” is March 31, 2022 Example Entry:By @Mike Carley Here are other sample Pipelines. https://github.com/streamsets/pipeline-libraryHow we will choose the three winners: Winner will be announced on Mar 31, 2022. The two winners will be graded on overall impressions, creativity, originality, and explantation. Winners will be
Hi,The above pipeline reads the CSV files uploaded into an S3 bucket for new contact file uploads. Based on the new contacts available as a CSV, it converts them into a JSON. But the critical part here is that it merges each batches (each batch size is 1000) into a single JSON of multiple records and finally make a single HTTP POST call to one of our internal system to carry out a CONTACT update call.
This pipeline is designed to orchestrate initial bulk load to change data capture workload from on-prem Oracle database to Snowflake Data Cloud. The pipeline takes into consideration the completion status of the initial bulk load job before proceeding to the next (CDC) step and also sends success/failure notifications. The pipeline also takes advantage of the platform’s event framework to automatically stop the pipeline when there’s no more data to be ingested.
Trying to build data pipeline for Azure SQL Server DB (CDC) as source and Azure Data bricks (Delta tables) as destinationI have referred data pipeline sample fromhttps://github.com/streamsets/pipeline-library/tree/master/datacollector/sample-pipelines/pipelines/SQLServer%20CDC%20to%20Delta%20Lake Getting below error for few records in Schema preview as-well:DELTA_LAKE_34 - Databricks Delta Lake load request failed: 'DELTA_LAKE_32 - Could not copy staged file 'sdc-4a076fce-7a73-45ba-8dd7-29e58848cf23.csv': java.sql.SQLException: [Simba][SparkJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: org.apache.hive.service.cli.HiveSQLException: Error running query: org.apache.spark.sql.AnalysisException: Unable to infer schema for CSV. It must be specified manually. Note : On Preview/Draft Run → Pipeline is able to capture changes from Source DB, successfully created files in stage (ADLS container) and created Delta tables at destination but it it fails to ingest re
I need to extract data from an API using OAuth2 connection. As per the data, they provide /cursor at the end of each page and that cursor can be used to get the records from next page.In Pagination tab, I used Link with Response field and tried to add Stop Condition with /cursor. but, not able to handle this scenario.Can someone please help. Thanks in Advance!!
Thank you to the 24 community members who submitted a pipeline! Our two winners for this quarters Share your Pipeline Contest is @swayam and @Bikram! (Stay on the look out for a DM from me.) Check out Swarm’s example pipeline, Carrying out image comparision using SDC pipeline and Amazon Image Rekognition. Where he compares the images received from facebook and twitter to determine if the 2 different profiles belongs to same customer or not. Bikram example Pipeline is, Read multiple csv from S3,Handle corrupt data,Mask the sensitive data and send Data to on premise and Denodo DB. His pipeline reads multiple .csv files from AWS s3 bucket and handles corrupt data if any in the file. The pipeline handles this while masking the data based on DB env and performs full data loads to the destination DB’s. Stay on the look out for our next Share your Pipeline Contest! We are looking to use Miroverse for ease of trying new example pipeline via one click.
Problem StatementVery often we find duplicate records being sent by customer while exporting data from their DWH and sending to us. To ensure data quality, we need to filter out the duplicate records before they enter into our platform for processing, computation and analysis.SolutionWe have addressed this using 2 separate pipelines.Pipeline-1: Files arriving in S3 gets picked up by this pipeline where for every record we calculate the MD5 Hash and make an entry in REDIS with KEY as “The MD5 hash value of the record” and VALUE as “Filename-RowNumber” of the current record being processed. Pipeline-2: The same file processed by Pipeline-1, again gets picked by Pipeline-2 and for every record it calculates the MD5 Hash and checks it’s value in REDIS to determine which RowNumber we need to process and which one we should drop as it’s a duplicate record.Pipeline-1: Pipeline-1: Calculating Hash and Inserting to RedisAs you may see above:CSV file picked from S3 origin, gets converted to JSO
Streaming data pipeline that potentially has updates to transactions. Look up for matching records in the database (postgres), filter transaction using a condition to determine if transactions are updates vs new. For updates, mask sensitive data and writes them to postgres db table. New transaction are written to an AWS bucket and unprocessed transactions (pipeline errors) are written to a kafka destination for further analysis.
We built this pipeline to compare the images received from facebook and twitter to determine if the 2 different profiles belongs to same customer ordifferent.We have a business case where we pull the social media comments from different users received from facebook and twitter. Apart from carrying out the sentiment analysis, we also need to know if same customer is being vocal in multiple platforms. Since customers normally don’t share their PII data, when we pull comments along with profile images from facebook and twitter, we store the images and consider the image matching as one of the criteria to determine the probability score of two comments received from different platfroms belongs to the same person or not.Before running the above pipeline, we have the pre-processed data with some probability score where the name, city, age, sentiments are already matched to come up with the best set of result to compare that is available in a database.In the above pipline,the origin is a MySQ
Problem StatementYou’ve been asking everybody you meet the age old question: What is your favorite dinosaur? You’ve been saving their responses in a series of delimited files that have this format. Tragically, the data you have meticulously gathered is dirty. Before you can execute your master plan to do whatever is that you were planning to do with this information, you must clean your dataset and resave it to a single clean file. The PipelineYou read in the Dino data with the Directory origin. You remove duplicates on the Dinosaur column using the Record Deduplicator You filter Dinos that have murdered people in the Jurassic Park film series for reasons that are you own using the Stream Selector. Finally, you save your Dino data to one single file on your local system. Your data is clean, filtered and ready for analysis. If you’re reading this, please comment with your favorite dinosaur. It’s uh...for science.
This pipeline is populating and updating a Slowly Changing Dimension for the NYC Citibike stations using Transformer for Snowflake. The pipeline is using StreamSets’ Slowly Changing Dimension stage that handles all of the logic for Type 1 and Type 2 dimensions. The SCD stage then produces the correct output along with the metadata that the Snowflake Destination uses to perform the merge.
Transformer for Snowflake: Pipeline to Create Fact TableThis pipeline uses the NYC Citibike data to populate a fact table enriched with station and weather data. Since this is using Transformer for Snowflake, none of the data leaves the Snowflake Data Cloud since the raw trips data is already loaded in Snowflake! The pipeline generates the SQL that is then executed on Snowflake, inserting the query results into the target tables. The pipeline is using weather data from Weather Source’s free Global Weather & Climate Data for BI data set on the Snowflake Data Marketplace so you can see the weather conditions for each ride.
Read 911 Event Data and Load to SnowflakeThis pipeline uses the HTTP client to read real time 911 event data from the Seattle Fire Department. The pipeline then flattens a nested JSON object It also uses the latitude and longitude of the incident location to lookup the Census block information from the Census Bureau’s geocoding API.Fun fact: While building this pipeline, an event came through for a dumpster fire currently happening on my block !
Scenario : Processing data in JSON format using Encrypt/decrypt processor and loading it to a destination. Steps to implement the pipeline without receiving Invalid ciphertext type error.1.First Pipeline reads data in JSON format sends it to the Encrypt/decrypt processor with action as Encrypt and load it to Kafka Producer with data format as JSON.Few columns are encrypted using Amazon KMS and fed to Kafka producer. 2.Second pipeline reads data from Kafka Consumer with data format as JSON.Field Type convertor is used to converts columns to Byte Array before feeding data into Encrypt/Decrypt processor because the Encrypt/Decrypt processor will only accept columns with data type as Byte Array.Columns that are converted to Byte Array are sent one by one into a Base64 Field Decoder. Please note that We can pass one field per Base64 Field Decoder.From the Base64 Field Decoders the columns are sent to Encrypt/decrypt processor with action as decrypt and finally the data is written to a file
Use Case: Receive Device data from various machines around the world either using (python scripts/Go lang/MQTT/Kafka/Files). All the data is landed on MQTT Topics. Streamsets pipeline will read the data from the MQTT Topics.Once device data is received from the MQTT Topics, will get more machine details (like Country/City/longitude/Latitude) using Geo IP.Raw data will be stored in S3.Processed Data will be stored into Databricks delta lake and elasticsearch for analysis. Sample analysis:Count all devices for a particular country and map them.Sample Data:battery_level,c02_level,cca2,cca3,cn,device_id,device_name,humidity,ip,latitude,lcd,longitude,scale,temp,timestamp1,234,US,USA,United States,1,meter-gauge-abcdef,10,220.127.116.11,38,green,-97,Celsius,14,1458444054093 Complete Pipeline:
Read multiple csv from S3,Handle corrupt data,Mask the sensitive data and send Data to on premise and Denodo DB
This is a generic pipeline that has been created to read multiple .csv files from AWS s3 bucket and handles corrupt data if any in the file and masks the data based on DB env and performs full and data load to the destination DB’s.Let's deep drive into each and every processor which is being used in the pipeline.Origin : AWS S3 : Read multiple .csv files from the bucket.Processors:Expression Evaluator : This is used for setting the pipeline execution time and user to identify who executes the pipeline and when.Field Type Converter: This is for converting time field to correct date time format (YYYY-MM-DD HH:MM:SS)Jython Evaluator : This is for handling corrupt data if any in the file (e.g “‘ street address ’”)Stream-sets Selector : This is for checking for which environment data needs to be masked before loading data to destination DB’s. In this case for Dev and Test env data will be masked because of GDPR rule we couldn’t load sensitive data into it but for Prod environment
Here's a Kafka Stream Processor pipeline that reads events from a "raw" topic, performs streaming transforms, and publishes the transformed events to a "refined" topic that multiple downstream clients subscribe to: Kafka Stream Processor Pipeline Here is the Stream Processor Pipeline’s placement within a StreamSets Topology that allows visualization and monitoring of the end-to-end data flow, with last-mile pipelines moving the refined events into Delta Lake, Snowflake, Elasticsearch, S3 and ADLS:Kafka Stream Processor with Multiple Consumers Users can extend the Topology using Transformer for Snowflake to perform push-down ETL on Snowflake using Snowpark, and Transformer on Databricks Spark to perform ETL on Delta Lake Push-down ETL on Snowflake and Databricks Delta Lake
I wanted to have a way to send chess game information from lichess.org (a popular chess server) to Elasticsearch and Snowflake to let me visualize statistics (e.g., how often does World Champion GM Magnus Carlsen lose to GM Daniel Naroditsky?) as well as to generate reports in Snowflake (e.g., when Magnus does lose, which openings does he tend to play?). I ended up accomplishing this with two pipelines: This pipeline is a batch pipeline that ingests data using the lichess REST API and pushes it to a Kafka topic. It uses an endpoint that allows for pulling down all games by username, so I’ve parameterized the username portion and then can use Control Hub jobs to pick which specific users I want to study. This pipeline consumes game data from my Kafka topic, does some basic cleanup of the data (adds a field for the name of the winner rather than the color of the winning pieces and converts timestamps from long to datetime) as well as some basic enrichment (it adds a field that calculates
In this pipeline we solve some fun data problem with the help of StreamSets Transformer . Transformer execution engine runs data pipelines on Apache Spark. We can run this pipeline on any spark cluster type Problem Statement: Find the average number of friends for each age and sort them in ascending order.We are given fake friend dataset of social networking platforms in CSV file format is stored in Google cloud storage id, Name, Age, Number of Friends0,Will,33,3851,Jean-Luc,26,22,Hugh,55,2213,Deanna,40,4654,Quark,68,215,Weyoun,59,3186,Gowron,37,2207,Will,54,307
MySQL binary logs provide a very efficient way to track data changes. It contain events that describe the modifications to data. In short, binary logs contain all the information that is required to trace the global status of the server right from the time it was started. This pipeline is design to propagates INSERT, UPDATE, and DELETE operations from MySQL BinLog to Snowflake.
Ordnance Survey (OS) produce a suite of products for the UK Market, One of these is Address Base Premium (ABP), Which is a set of details that describe buildings, businesses, residential and items you’d find on a map in detail, such as Lat/Long data, how the royal mail refers to it, and how the local authorities/councils describe and classify those items.Sounds good right? Well its not so nice of a set of data to deal with, It is shipped in 5 km batches of data in a single csv file, across 10 different schema patterns within it.. Nightmare? Nope! You know the secret of streamsets!Streamsets doesn't care about schema on read, That is the key to unlocking this… In the ABP Files, the first column has a record identifier, This tells us which schema and rules to process.This pipeline makes such a difficult issue to deal with normally, clear and transparent.You can watch a record come in from the raw file, read it with no header, Look at the first column. Process that to a given lane, That l
Data comes from a monitor device, with test results of different elements. Those elements have 3 values, their ID, Their Value and any error messages.The customer wants to see a flattened list of just Monitor Device and timestamp with a result of each element on seperate lines.Using the above we can achieve that, we import the data ignoring the header line. (hence labeling them with numbers) First we label the monitor “Parent” fields by using a field renamer We build a Empty map for us to correctly parse the records using expression evaluator we use a field mapping processor, to map those groups of 3 fields ( checking we only remap columns that are numeric) now we have groups, we split the groups into records using a field pivot processor This leaves a single group per record but as a group, Lets tidy that up with a field flattener so all the records are at the same level Finally we use a field renamer to label the 3 fields we have produced We ship that off to our secure storage faci
This pipeline is designed to capture inserts and updates (SCD Type II) being uploaded to a bucket on Amazon S3 for a slowly changing dimension table -- Customers. The pipeline creates new records with version set to 1 for new customers and with version set to (current version + 1) for existing customers. The customer records are then stored in Snowflake Data Cloud.
Already have an account? Login
Login to the community
No account yet? Create an account
Enter your username or e-mail address. We'll send you an e-mail with instructions to reset your password.