Welcome to the StreamSets Community
Here's a Kafka Stream Processor pipeline that reads events from a "raw" topic, performs streaming transforms, and publishes the transformed events to a "refined" topic that multiple downstream clients subscribe to: Kafka Stream Processor Pipeline Here is the Stream Processor Pipeline’s placement within a StreamSets Topology that allows visualization and monitoring of the end-to-end data flow, with last-mile pipelines moving the refined events into Delta Lake, Snowflake, Elasticsearch, S3 and ADLS:Kafka Stream Processor with Multiple Consumers Users can extend the Topology using Transformer for Snowflake to perform push-down ETL on Snowflake using Snowpark, and Transformer on Databricks Spark to perform ETL on Delta Lake Push-down ETL on Snowflake and Databricks Delta Lake
Read multiple csv from S3,Handle corrupt data,Mask the sensitive data and send Data to on premise and Denodo DB
This is a generic pipeline that has been created to read multiple .csv files from AWS s3 bucket and handles corrupt data if any in the file and masks the data based on DB env and performs full and data load to the destination DB’s.Let's deep drive into each and every processor which is being used in the pipeline.Origin : AWS S3 : Read multiple .csv files from the bucket.Processors:Expression Evaluator : This is used for setting the pipeline execution time and user to identify who executes the pipeline and when.Field Type Converter: This is for converting time field to correct date time format (YYYY-MM-DD HH:MM:SS)Jython Evaluator : This is for handling corrupt data if any in the file (e.g “‘ street address ’”)Stream-sets Selector : This is for checking for which environment data needs to be masked before loading data to destination DB’s. In this case for Dev and Test env data will be masked because of GDPR rule we couldn’t load sensitive data into it but for Prod environment
Hi everyone, As of December 8th 2021 we have sunset ask.streamsets.com. You may have some questions and this post is to help answer. Why did we close down ask.streamsets.com? As we continue to grow our community the ask.streamsets.com website did not meet our level of support we wanted to provide to the community. What content was migrated over? The top 30 relevant topics were migrated over. Most questions you may have that once was found at ask.streamsets.com can be found in our up to date docs. https://streamsets.com/support/documentation-overview/Also, at this same time we have un-gated and made free over 400 knowledge base articles from our support portal. Do I maintain my status and points once had on ask.streamsets.com?Sorry, no. We wish the systems allowed this function. Going forward, Please check out our leaderboard https://community.streamsets.com/leaderboard?period=thisWeek and Where do I post my questions going forward? Post your questions and start a conversations her
Use Case: Receive Device data from various machines around the world either using (python scripts/Go lang/MQTT/Kafka/Files). All the data is landed on MQTT Topics. Streamsets pipeline will read the data from the MQTT Topics.Once device data is received from the MQTT Topics, will get more machine details (like Country/City/longitude/Latitude) using Geo IP.Raw data will be stored in S3.Processed Data will be stored into Databricks delta lake and elasticsearch for analysis. Sample analysis:Count all devices for a particular country and map them.Sample Data:battery_level,c02_level,cca2,cca3,cn,device_id,device_name,humidity,ip,latitude,lcd,longitude,scale,temp,timestamp1,234,US,USA,United States,1,meter-gauge-abcdef,10,18.104.22.168,38,green,-97,Celsius,14,1458444054093 Complete Pipeline:
Scenario : Processing data in JSON format using Encrypt/decrypt processor and loading it to a destination. Steps to implement the pipeline without receiving Invalid ciphertext type error.1.First Pipeline reads data in JSON format sends it to the Encrypt/decrypt processor with action as Encrypt and load it to Kafka Producer with data format as JSON.Few columns are encrypted using Amazon KMS and fed to Kafka producer. 2.Second pipeline reads data from Kafka Consumer with data format as JSON.Field Type convertor is used to converts columns to Byte Array before feeding data into Encrypt/Decrypt processor because the Encrypt/Decrypt processor will only accept columns with data type as Byte Array.Columns that are converted to Byte Array are sent one by one into a Base64 Field Decoder. Please note that We can pass one field per Base64 Field Decoder.From the Base64 Field Decoders the columns are sent to Encrypt/decrypt processor with action as decrypt and finally the data is written to a file
Currently the platform does not allow you the members to change this on your own. The team is looking to allow this functionality. I know this is frustrating. To work around this, please email Drew.Kreiger@streamsets.com to change your email and or other profile questions/needs. -Drew
Read 911 Event Data and Load to SnowflakeThis pipeline uses the HTTP client to read real time 911 event data from the Seattle Fire Department. The pipeline then flattens a nested JSON object It also uses the latitude and longitude of the incident location to lookup the Census block information from the Census Bureau’s geocoding API.Fun fact: While building this pipeline, an event came through for a dumpster fire currently happening on my block !
Transformer for Snowflake: Pipeline to Create Fact TableThis pipeline uses the NYC Citibike data to populate a fact table enriched with station and weather data. Since this is using Transformer for Snowflake, none of the data leaves the Snowflake Data Cloud since the raw trips data is already loaded in Snowflake! The pipeline generates the SQL that is then executed on Snowflake, inserting the query results into the target tables. The pipeline is using weather data from Weather Source’s free Global Weather & Climate Data for BI data set on the Snowflake Data Marketplace so you can see the weather conditions for each ride.
This pipeline is populating and updating a Slowly Changing Dimension for the NYC Citibike stations using Transformer for Snowflake. The pipeline is using StreamSets’ Slowly Changing Dimension stage that handles all of the logic for Type 1 and Type 2 dimensions. The SCD stage then produces the correct output along with the metadata that the Snowflake Destination uses to perform the merge.
This pipeline is designed to handle (embrace!) data drift while ingesting web logs from AWS S3 and then transforming and enriching them before storing the curated data in Snowflake Data Cloud. The data drift alert is triggered if/when the data being ingested is missing a key field IP Address which is crucial for downstream analytics.
This pipeline is designed to orchestrate initial bulk load to change data capture workload from on-prem Oracle database to Snowflake Data Cloud. The pipeline takes into consideration the completion status of the initial bulk load job before proceeding to the next (CDC) step and also sends success/failure notifications. The pipeline also takes advantage of the platform’s event framework to automatically stop the pipeline when there’s no more data to be ingested.
This pipeline is designed to ingest data from Amazon S3 and prepare it for training a ML model using PySpark custom processor. Once the Gradient Boosted model is trained, the model artifacts, features, accuracy of the model and other metrics are registered as an experiment in MLflow. (The pipeline runs on Databricks cluster which comes bundled with MLflow server.)
This pipeline is designed to capture inserts and updates (SCD Type II) being uploaded to a bucket on Amazon S3 for a slowly changing dimension table -- Customers. The pipeline creates new records with version set to 1 for new customers and with version set to (current version + 1) for existing customers. The customer records are then stored in Snowflake Data Cloud.
Happy New Years everyone! I want to say thank you to each of you for all your hard work and contributions to our StreamSets Community. We are close to welcoming our 300th member! Taking a look back at last year we launched our new community platform with the addition of knowledge base articles, a monthly community-led newsletter, bi-annual member feedback survey, a new and improved StreamSets Academy, and much much more. Looking ahead into 2022 we are excited to launch; Virtual meetups, Pipelines and patterns examples, and much more from the feedback we received from the member feedback survey. (Click image to expand)
Hi Everyone, Thank you to those who completed the survey. This has been really helpful to understand what we are doing an awesome job on and where we can improve. We wanted to share the responses and pulse of the community today, which you’ll find in the representative responses below. We will be conducting these surveys on a quarterly basis throughout the year. You can find them on the right side of each community page labeled as “Feedback”. (REPORT) Short Answer Question #1: Thank you for your honest feedback. How can we make the community more helpful for you? Responses With more documentation of how we implement for streaming and transformer Here are some helpful docs and academy course. Not able to open ask.streamsets.com As ofDec 8, 2021ask.streamsets.com has been sunset. Going forward Community.streamsets.com is the go-to community forum and knowledge base managed and hosted by StreamSets. We have migrated the top ask.streamsets.com to our new platform and ungat
PrefaceFetching online and offline data across different channels to understand your end-users behavior is a primary topic for any enterprise-level brand. Customer 360 view of an end customer gives enormous insight to marketing leaders who can then prepare their strategy to engage and entice these users. A full-stack CDP (Customer Data Platform) helps meet the needs of marketing leaders and analytic folks.A stable, reliable, and scalable data processing stack is the de facto need for any enterprise CDP solution, and to set up that, data ingestion and extraction architecture need to be solid. StreamSets Data Collector (SDC) has become the preferred tool for many data engineers and data stewards to achieve the ETL and ELT-related needs in this area.In the traditional approach, the ETL engineers normally have definitive requirements in a place where they know the origin, transformation rules, and destination. Based on the requirements, engineers develop their pipelines and deploy them to
Streamsetters! We did not have anyone share Industry News / Helpful Reads for our 3rd edition. It's as easy as adding links below. Anything is game. No wrong answers! Here are a few ideas to get you going. Please share your links within this thread: Industry Blogs and Articles. Events others in the community others should join. Exciting personal millstones SS use cases and how others could benefit from your findings. Other forms of media you think the community will benefit from. :) Let's make this a collaborative newsletter, everyone! -Drew Kreiger
Data comes from a monitor device, with test results of different elements. Those elements have 3 values, their ID, Their Value and any error messages.The customer wants to see a flattened list of just Monitor Device and timestamp with a result of each element on seperate lines.Using the above we can achieve that, we import the data ignoring the header line. (hence labeling them with numbers) First we label the monitor “Parent” fields by using a field renamer We build a Empty map for us to correctly parse the records using expression evaluator we use a field mapping processor, to map those groups of 3 fields ( checking we only remap columns that are numeric) now we have groups, we split the groups into records using a field pivot processor This leaves a single group per record but as a group, Lets tidy that up with a field flattener so all the records are at the same level Finally we use a field renamer to label the 3 fields we have produced We ship that off to our secure storage faci
Ordnance Survey (OS) produce a suite of products for the UK Market, One of these is Address Base Premium (ABP), Which is a set of details that describe buildings, businesses, residential and items you’d find on a map in detail, such as Lat/Long data, how the royal mail refers to it, and how the local authorities/councils describe and classify those items.Sounds good right? Well its not so nice of a set of data to deal with, It is shipped in 5 km batches of data in a single csv file, across 10 different schema patterns within it.. Nightmare? Nope! You know the secret of streamsets!Streamsets doesn't care about schema on read, That is the key to unlocking this… In the ABP Files, the first column has a record identifier, This tells us which schema and rules to process.This pipeline makes such a difficult issue to deal with normally, clear and transparent.You can watch a record come in from the raw file, read it with no header, Look at the first column. Process that to a given lane, That l
In this pipeline we solve some fun data problem with the help of StreamSets Transformer . Transformer execution engine runs data pipelines on Apache Spark. We can run this pipeline on any spark cluster type Problem Statement: Find the average number of friends for each age and sort them in ascending order.We are given fake friend dataset of social networking platforms in CSV file format is stored in Google cloud storage id, Name, Age, Number of Friends0,Will,33,3851,Jean-Luc,26,22,Hugh,55,2213,Deanna,40,4654,Quark,68,215,Weyoun,59,3186,Gowron,37,2207,Will,54,307
In this issue we have many new and exciting community updates, engaging programs, and a new contest where you can win a $250 gift card! Check out others' submissions.Before we get into it, I would like to introduce everyone to our new Developer Advocate to the StreamSets team and our community, Brenna. She will be creating engaging content around the StreamSets product line though blog posts, tutorials, sample code, and present at meetups!Now let’s get into it, Data Rock Stars!What's NewHot new Ebook: Data Engineers' Handbook for Snowflake Win $250 by participating in the “Share Your Pipelines” contest! New Slack Channel - #community-newsletter-share-your-findings Over 400 knowledge base articles, previously not seen by our external community, have been added to the community platform DataOps Summit CFP opens 2/22. Stay tuned!Industry News/Helpful Reads:Stories about Data Engineering on Medium The vast majority of data engineers are burnt out. Those working in healthcare are no except
I wanted to have a way to send chess game information from lichess.org (a popular chess server) to Elasticsearch and Snowflake to let me visualize statistics (e.g., how often does World Champion GM Magnus Carlsen lose to GM Daniel Naroditsky?) as well as to generate reports in Snowflake (e.g., when Magnus does lose, which openings does he tend to play?). I ended up accomplishing this with two pipelines: This pipeline is a batch pipeline that ingests data using the lichess REST API and pushes it to a Kafka topic. It uses an endpoint that allows for pulling down all games by username, so I’ve parameterized the username portion and then can use Control Hub jobs to pick which specific users I want to study. This pipeline consumes game data from my Kafka topic, does some basic cleanup of the data (adds a field for the name of the winner rather than the color of the winning pieces and converts timestamps from long to datetime) as well as some basic enrichment (it adds a field that calculates
Hey, everyone. We are very excited to share new automation support for self-managed engine installation using Ansible with our DataOps platform!Why does this project exist?We know there are some folks who love Ansible or other configuration management tools to automate installation of Streamsets Engines to allow seamless integration with our products. The DataOps Platform helps you to manage all your engines using Environments and Deployments and we want to make it easier for folks to leverage these concepts and automate their workloads.What are the Benefits?DataOps platform’s easy-to-use REST API has been updated and now makes it effortless to integrate with your favorite automation tools. This feature is live on cloud.login.streamsets.com. We've made it possible to create Deployments and start Data Collector and Transformer engines from those Deployments all using an Ansible playbook.How can I get it?To give it a try, clone our sample Ansible playbook on Github, then follow the REA
Already have an account? Login
Login to the community
No account yet? Create an account
Enter your username or e-mail address. We'll send you an e-mail with instructions to reset your password.