Skip to main content

LocalFS, HDFS and ADLS destinations are basically the same connectors, they use different auth methods and different underlying file systems but all of them use the same code underneath. Hence  the configurations and its behaviour we are going discuss here will be applicable to LocalFS, HDFS and ADLS destinations.

Directory template: Files will be stored based on Directory Template configuration, where you can define the granularity of the files written as the tooltip defines: "Template for the creation of output directories. Valid variables are ${YYYY()}, ${MM()}, ${DD()}, ${hh()}, ${mm()}, ${ss()} and {record:value(“/field”)} for values in a field. Directories are created based on the smallest time unit variable used."

Time Basis: In order to write to a file based on time, that time can be configured in Time Basis, here you can select local timestamp or pick some record value/header which has timestamp value(Eg: ${record:value("/Timestamp")}). 

Late Record Time Limit (secs): Late Records basically means that if a record which is supposed to belong to a given file (if you are using a timestamp in the record to assign it to one file), if the file has been closed, and a record arrives we will include it in the file if the configured Late Record Time Limit (secs) has not been elapsed yet

Apart from the above, you can configure multiple triggers to close a file Max Records in FileMax File Size (MB)Idle Timeout . But for our understanding let us assume these are at default values. 

 

Example-1:

Time Basis is set to time:now() which is the record processing time
Late Record Time Limit : 3600secs(60mins)
Directory Template: /tmp/out/${YYYY()}-${MM()}-${DD()}-${hh()}

Assuming the current date as 2022-03-12

First record arrives at 11:05,
    Create directory /tmp/out/2022-03-12-11
    create file : _tmp_sdc-7ed1a69c-064d-11ec-a1f3-098290104c3b_0
    file close time : (current hour evaluated in directory path template + 1) + 60mins (late record time limit) = 12:59
    Is record late: current_time - processing_time

                        = time:now() - time:now()

                        = 0 < 3600secs => FALSE

    record is written to _tmp_sdc-7ed1a69c-064d-11ec-a1f3-098290104c3b_0

New record arrives at 11:35,
    Directory hour path 11 already exists

    Is record late: current_time - processing_time

                        = time:now() - time:now()

                        = 0 < 3600secs => FALSE

    write to _tmp_sdc-7ed1a69c-064d-11ec-a1f3-098290104c3b_0

New record arrives at 12:01,
    Directory hour path 12 doesn't exist
        create directory /tmp/out/2022-03-12-12
        create file _tmp_sdc-7ed1a69c-064d-11ec-a1f3-098290104c3b_
    file close time : 12+1(current hour evaluated in directory path template + 1) + 60mins(late record time limit) = 13:59

    Is record late: time:now() - time:now()

                        = 0 < 3600secs => FALSE

    record is written to _tmp_sdc-7ed1a69c-064d-11ec-a1f3-098290104c3b_

New record arrives at 12:55,
    Directory hour path 12 exists
    is record late: time:now() - time:now() = 0 < 3600secs ==> FALSE
    write to _tmp_sdc-7ed1a69c-064d-11ec-a1f3-098290104c3b_

At 12:59, file _tmp_sdc-7ed1a69c-064d-11ec-a1f3-098290104c3b_ is renamed to sdc-7ed1a69c-064d-11ec-a1f3-098290104c3b_3aae26cc-4ab2-44cb-9512-62b73e430ad7

 

Example-2:

Time Basis is set to time:now() which is the record processing time
late record time limit : 3900 Secs

Directory Path Template : /tmp/out/single_output_file

First Record arrives at 2:15,

    Create directory /tmp/out if it doesn’t exist
    File close time: 1 (current hour +1 ) + 65mins(late record time limit) = 3:20

    Create file _tmp_sdc-7ed1a69c-064d-11ec-a1f3-098290104c3b_0

    Is record late: current_time - processing_time

                        = time:now() - time:now()

                        = 0 < 3900secs => FALSE

    write record to _tmp_sdc-7ed1a69c-064d-11ec-a1f3-098290104c3b_0
Next Record arrives at 2:20, 

    /tmp/out directory exists already. 

    Is record late: current_time - processing_time

                        = time:now() - time:now()

                        = 0 < 3900secs => FALSE

 Write record to _tmp_sdc-7ed1a69c-064d-11ec-a1f3-098290104c3b_0

Next Record arrives at 3:14,    

Is record late: current_time - processing_time

                        = time:now() - time:now()

                        = 0 < 3900secs => FALSE


At 3:20, _tmp_sdc-7ed1a69c-064d-11ec-a1f3-098290104c3b_0 file is renamed to /tmp/out/single_output_file


Next Record arrives at 3:21:
    /tmp/out directory exists

    Create file _tmp_sdc-7ed1a69c-064d-11ec-a1f3-098290104c3b_0
    File close time: 4 (current hour +1 ) + 65mins(late record time limit) = 4.26

 

We can understand from above that if we set Time Basis as time:now() which is processing time there is no record that is late. If we set record time as the Time Basis we can see Late records processing in action and will try add another post for the same.

Be the first to reply!

Reply