Dataflow/apache beam - how to access current filename when passing in pattern?












5














I have seen this question answered before on stack overflow (https://stackoverflow.com/questions/29983621/how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow), but not since apache beam has added splittable dofn functionality for python. How would I access the filename of the current file being processed when passing in a file pattern to a gcs bucket?



I want to pass the filename into my transform function:



with beam.Pipeline(options=pipeline_options) as p:                              
lines = p | ReadFromText('gs://url to file')


data = (
lines
| 'Jsonify' >> beam.Map(jsonify)
| 'Unnest' >> beam.FlatMap(unnest)
| 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(
'project_id:dataset_id.table_name', schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)


Ultimately, what I want to do is pass the filename into my transform function when I transform each row of the json (see this and then use the filename to do a lookup in a different BQ table to get a value). I think once I manage to know how to get the filename I will be able to figure out the side input part in order to do the lookup in the bq table and get the unique value.










share|improve this question




















  • 1




    You can take a look at this (mind the solution in the question too)
    – Guillem Xercavins
    Nov 23 at 19:29












  • @GuillemXercavins your example is reading an entire file as a string right? If I am reading a newline delimited json like in this post and want to use the filename and look up something in another bq table do you have any suggestions as to how to do that
    – calisurfer
    Nov 24 at 0:46










  • don't see how exactly it is BQ related - but if you insist :o) ...
    – Mikhail Berlyant
    Nov 24 at 3:45












  • @MikhailBerlyant I want to take in side inputs from BQ when I can get the file name :/
    – calisurfer
    Nov 24 at 4:33










  • obviously BQ is part of your overall project but I still don't see how BQ related to your particular issue!
    – Mikhail Berlyant
    Nov 24 at 17:24
















5














I have seen this question answered before on stack overflow (https://stackoverflow.com/questions/29983621/how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow), but not since apache beam has added splittable dofn functionality for python. How would I access the filename of the current file being processed when passing in a file pattern to a gcs bucket?



I want to pass the filename into my transform function:



with beam.Pipeline(options=pipeline_options) as p:                              
lines = p | ReadFromText('gs://url to file')


data = (
lines
| 'Jsonify' >> beam.Map(jsonify)
| 'Unnest' >> beam.FlatMap(unnest)
| 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(
'project_id:dataset_id.table_name', schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)


Ultimately, what I want to do is pass the filename into my transform function when I transform each row of the json (see this and then use the filename to do a lookup in a different BQ table to get a value). I think once I manage to know how to get the filename I will be able to figure out the side input part in order to do the lookup in the bq table and get the unique value.










share|improve this question




















  • 1




    You can take a look at this (mind the solution in the question too)
    – Guillem Xercavins
    Nov 23 at 19:29












  • @GuillemXercavins your example is reading an entire file as a string right? If I am reading a newline delimited json like in this post and want to use the filename and look up something in another bq table do you have any suggestions as to how to do that
    – calisurfer
    Nov 24 at 0:46










  • don't see how exactly it is BQ related - but if you insist :o) ...
    – Mikhail Berlyant
    Nov 24 at 3:45












  • @MikhailBerlyant I want to take in side inputs from BQ when I can get the file name :/
    – calisurfer
    Nov 24 at 4:33










  • obviously BQ is part of your overall project but I still don't see how BQ related to your particular issue!
    – Mikhail Berlyant
    Nov 24 at 17:24














5












5








5


0





I have seen this question answered before on stack overflow (https://stackoverflow.com/questions/29983621/how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow), but not since apache beam has added splittable dofn functionality for python. How would I access the filename of the current file being processed when passing in a file pattern to a gcs bucket?



I want to pass the filename into my transform function:



with beam.Pipeline(options=pipeline_options) as p:                              
lines = p | ReadFromText('gs://url to file')


data = (
lines
| 'Jsonify' >> beam.Map(jsonify)
| 'Unnest' >> beam.FlatMap(unnest)
| 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(
'project_id:dataset_id.table_name', schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)


Ultimately, what I want to do is pass the filename into my transform function when I transform each row of the json (see this and then use the filename to do a lookup in a different BQ table to get a value). I think once I manage to know how to get the filename I will be able to figure out the side input part in order to do the lookup in the bq table and get the unique value.










share|improve this question















I have seen this question answered before on stack overflow (https://stackoverflow.com/questions/29983621/how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow), but not since apache beam has added splittable dofn functionality for python. How would I access the filename of the current file being processed when passing in a file pattern to a gcs bucket?



I want to pass the filename into my transform function:



with beam.Pipeline(options=pipeline_options) as p:                              
lines = p | ReadFromText('gs://url to file')


data = (
lines
| 'Jsonify' >> beam.Map(jsonify)
| 'Unnest' >> beam.FlatMap(unnest)
| 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(
'project_id:dataset_id.table_name', schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)


Ultimately, what I want to do is pass the filename into my transform function when I transform each row of the json (see this and then use the filename to do a lookup in a different BQ table to get a value). I think once I manage to know how to get the filename I will be able to figure out the side input part in order to do the lookup in the bq table and get the unique value.







python google-cloud-platform google-bigquery google-cloud-dataflow apache-beam






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 24 at 3:42

























asked Nov 21 at 2:42









calisurfer

2299




2299








  • 1




    You can take a look at this (mind the solution in the question too)
    – Guillem Xercavins
    Nov 23 at 19:29












  • @GuillemXercavins your example is reading an entire file as a string right? If I am reading a newline delimited json like in this post and want to use the filename and look up something in another bq table do you have any suggestions as to how to do that
    – calisurfer
    Nov 24 at 0:46










  • don't see how exactly it is BQ related - but if you insist :o) ...
    – Mikhail Berlyant
    Nov 24 at 3:45












  • @MikhailBerlyant I want to take in side inputs from BQ when I can get the file name :/
    – calisurfer
    Nov 24 at 4:33










  • obviously BQ is part of your overall project but I still don't see how BQ related to your particular issue!
    – Mikhail Berlyant
    Nov 24 at 17:24














  • 1




    You can take a look at this (mind the solution in the question too)
    – Guillem Xercavins
    Nov 23 at 19:29












  • @GuillemXercavins your example is reading an entire file as a string right? If I am reading a newline delimited json like in this post and want to use the filename and look up something in another bq table do you have any suggestions as to how to do that
    – calisurfer
    Nov 24 at 0:46










  • don't see how exactly it is BQ related - but if you insist :o) ...
    – Mikhail Berlyant
    Nov 24 at 3:45












  • @MikhailBerlyant I want to take in side inputs from BQ when I can get the file name :/
    – calisurfer
    Nov 24 at 4:33










  • obviously BQ is part of your overall project but I still don't see how BQ related to your particular issue!
    – Mikhail Berlyant
    Nov 24 at 17:24








1




1




You can take a look at this (mind the solution in the question too)
– Guillem Xercavins
Nov 23 at 19:29






You can take a look at this (mind the solution in the question too)
– Guillem Xercavins
Nov 23 at 19:29














@GuillemXercavins your example is reading an entire file as a string right? If I am reading a newline delimited json like in this post and want to use the filename and look up something in another bq table do you have any suggestions as to how to do that
– calisurfer
Nov 24 at 0:46




@GuillemXercavins your example is reading an entire file as a string right? If I am reading a newline delimited json like in this post and want to use the filename and look up something in another bq table do you have any suggestions as to how to do that
– calisurfer
Nov 24 at 0:46












don't see how exactly it is BQ related - but if you insist :o) ...
– Mikhail Berlyant
Nov 24 at 3:45






don't see how exactly it is BQ related - but if you insist :o) ...
– Mikhail Berlyant
Nov 24 at 3:45














@MikhailBerlyant I want to take in side inputs from BQ when I can get the file name :/
– calisurfer
Nov 24 at 4:33




@MikhailBerlyant I want to take in side inputs from BQ when I can get the file name :/
– calisurfer
Nov 24 at 4:33












obviously BQ is part of your overall project but I still don't see how BQ related to your particular issue!
– Mikhail Berlyant
Nov 24 at 17:24




obviously BQ is part of your overall project but I still don't see how BQ related to your particular issue!
– Mikhail Berlyant
Nov 24 at 17:24












1 Answer
1






active

oldest

votes


















2





+50









I tried to implement a solution with the previously cited case. There, as well as in other approaches such as this one they also get a list of file names but load all the file into a single element which might not scale well with large files. Therefore, I looked into adding the filename to each record.



As input I used two csv files:





$ gsutil cat gs://$BUCKET/countries1.csv
id,country
1,sweden
2,spain

gsutil cat gs://$BUCKET/countries2.csv
id,country
3,italy
4,france


Using GCSFileSystem.match we can access metadata_list to retrieve FileMetadata containing the file path and size in bytes. In my example:





[FileMetadata(gs://BUCKET_NAME/countries1.csv, 29),
FileMetadata(gs://BUCKET_NAME/countries2.csv, 29)]


The code is:





result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]


We will read each of the matching files into a different PCollection. As we don't know the number of files a priori we need to create programmatically a list of names for each PCollection (p0, p1, ..., pN-1) and ensure that we have unique labels for each step ('Read file 0', 'Read file 1', etc.):





variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]


Then we proceed to read each different file into its corresponding PCollection with ReadFromText and then we call the AddFilenamesFn ParDo to associate each record with the filename.





for i in range(len(result)):   
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)


where AddFilenamesFn is:





class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with filename and row"""
def process(self, element, file_path):
file_name = file_path.split("/")[-1]
yield {'filename':file_name, 'row':element}


My first approach was using a Map function directly which results in simpler code. However, result[i].path was resolved at the end of the loop and each record was incorrectly mapped to the last file of the list:





globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))


Finally, we flatten all the PCollections into one:





merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten()


and we check the results by logging the elements:





INFO:root:{'filename': u'countries2.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries2.csv', 'row': u'3,italy'}
INFO:root:{'filename': u'countries2.csv', 'row': u'4,france'}
INFO:root:{'filename': u'countries1.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries1.csv', 'row': u'1,sweden'}
INFO:root:{'filename': u'countries1.csv', 'row': u'2,spain'}


I tested this with both DirectRunner and DataflowRunner for Python SDK 2.8.0.



I hope this addresses the main issue here and you can continue by integrating BigQuery into your full use case now. You might need to use the Python Client Library for that, I wrote a similar Java example.



Full code:





import argparse, logging
from operator import add

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem

class GCSFileReader:
"""Helper class to read gcs files"""
def __init__(self, gcs):
self.gcs = gcs

class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with filename and row"""
def process(self, element, file_path):
file_name = file_path.split("/")[-1]
# yield (file_name, element) # use this to return a tuple instead
yield {'filename':file_name, 'row':element}

# just logging output to visualize results
def write_res(element):
logging.info(element)
return element

def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)

p = beam.Pipeline(options=PipelineOptions(pipeline_args))
gcs = GCSFileSystem(PipelineOptions(pipeline_args))
gcs_reader = GCSFileReader(gcs)

# in my case I am looking for files that start with 'countries'
BUCKET='BUCKET_NAME'
result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
result = reduce(add, result)

# create each input PCollection name and unique step labels
variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

# load each input file into a separate PCollection and add filename to each row
for i in range(len(result)):
# globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

# flatten all PCollections into a single one
merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)

p.run()

if __name__ == '__main__':
run()





share|improve this answer





















  • Thanks so much, this worked! It works if I merge the pcollections after and it also works if i do for I in range(len(result)): p | ... and then write to big query in the for loop. Is there a benefit to merging and then writing to big query all together?
    – calisurfer
    Nov 25 at 19:33










  • Glad to be of help. I flattened the PCollections together in case we want to apply some operations across data from different files (such as a GroupByKey, etc.) while still keeping track of its origin. In terms of performance, I think it could also help to evenly distribute the load among workers if the GCS files can have very disparate sizes but that would depend on the exact data and full use case. Feel free to omit the flatten step if there is no actual need for it -process each file separately- and the pipeline works well without it.
    – Guillem Xercavins
    Nov 26 at 16:50











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53404579%2fdataflow-apache-beam-how-to-access-current-filename-when-passing-in-pattern%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









2





+50









I tried to implement a solution with the previously cited case. There, as well as in other approaches such as this one they also get a list of file names but load all the file into a single element which might not scale well with large files. Therefore, I looked into adding the filename to each record.



As input I used two csv files:





$ gsutil cat gs://$BUCKET/countries1.csv
id,country
1,sweden
2,spain

gsutil cat gs://$BUCKET/countries2.csv
id,country
3,italy
4,france


Using GCSFileSystem.match we can access metadata_list to retrieve FileMetadata containing the file path and size in bytes. In my example:





[FileMetadata(gs://BUCKET_NAME/countries1.csv, 29),
FileMetadata(gs://BUCKET_NAME/countries2.csv, 29)]


The code is:





result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]


We will read each of the matching files into a different PCollection. As we don't know the number of files a priori we need to create programmatically a list of names for each PCollection (p0, p1, ..., pN-1) and ensure that we have unique labels for each step ('Read file 0', 'Read file 1', etc.):





variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]


Then we proceed to read each different file into its corresponding PCollection with ReadFromText and then we call the AddFilenamesFn ParDo to associate each record with the filename.





for i in range(len(result)):   
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)


where AddFilenamesFn is:





class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with filename and row"""
def process(self, element, file_path):
file_name = file_path.split("/")[-1]
yield {'filename':file_name, 'row':element}


My first approach was using a Map function directly which results in simpler code. However, result[i].path was resolved at the end of the loop and each record was incorrectly mapped to the last file of the list:





globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))


Finally, we flatten all the PCollections into one:





merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten()


and we check the results by logging the elements:





INFO:root:{'filename': u'countries2.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries2.csv', 'row': u'3,italy'}
INFO:root:{'filename': u'countries2.csv', 'row': u'4,france'}
INFO:root:{'filename': u'countries1.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries1.csv', 'row': u'1,sweden'}
INFO:root:{'filename': u'countries1.csv', 'row': u'2,spain'}


I tested this with both DirectRunner and DataflowRunner for Python SDK 2.8.0.



I hope this addresses the main issue here and you can continue by integrating BigQuery into your full use case now. You might need to use the Python Client Library for that, I wrote a similar Java example.



Full code:





import argparse, logging
from operator import add

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem

class GCSFileReader:
"""Helper class to read gcs files"""
def __init__(self, gcs):
self.gcs = gcs

class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with filename and row"""
def process(self, element, file_path):
file_name = file_path.split("/")[-1]
# yield (file_name, element) # use this to return a tuple instead
yield {'filename':file_name, 'row':element}

# just logging output to visualize results
def write_res(element):
logging.info(element)
return element

def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)

p = beam.Pipeline(options=PipelineOptions(pipeline_args))
gcs = GCSFileSystem(PipelineOptions(pipeline_args))
gcs_reader = GCSFileReader(gcs)

# in my case I am looking for files that start with 'countries'
BUCKET='BUCKET_NAME'
result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
result = reduce(add, result)

# create each input PCollection name and unique step labels
variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

# load each input file into a separate PCollection and add filename to each row
for i in range(len(result)):
# globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

# flatten all PCollections into a single one
merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)

p.run()

if __name__ == '__main__':
run()





share|improve this answer





















  • Thanks so much, this worked! It works if I merge the pcollections after and it also works if i do for I in range(len(result)): p | ... and then write to big query in the for loop. Is there a benefit to merging and then writing to big query all together?
    – calisurfer
    Nov 25 at 19:33










  • Glad to be of help. I flattened the PCollections together in case we want to apply some operations across data from different files (such as a GroupByKey, etc.) while still keeping track of its origin. In terms of performance, I think it could also help to evenly distribute the load among workers if the GCS files can have very disparate sizes but that would depend on the exact data and full use case. Feel free to omit the flatten step if there is no actual need for it -process each file separately- and the pipeline works well without it.
    – Guillem Xercavins
    Nov 26 at 16:50
















2





+50









I tried to implement a solution with the previously cited case. There, as well as in other approaches such as this one they also get a list of file names but load all the file into a single element which might not scale well with large files. Therefore, I looked into adding the filename to each record.



As input I used two csv files:





$ gsutil cat gs://$BUCKET/countries1.csv
id,country
1,sweden
2,spain

gsutil cat gs://$BUCKET/countries2.csv
id,country
3,italy
4,france


Using GCSFileSystem.match we can access metadata_list to retrieve FileMetadata containing the file path and size in bytes. In my example:





[FileMetadata(gs://BUCKET_NAME/countries1.csv, 29),
FileMetadata(gs://BUCKET_NAME/countries2.csv, 29)]


The code is:





result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]


We will read each of the matching files into a different PCollection. As we don't know the number of files a priori we need to create programmatically a list of names for each PCollection (p0, p1, ..., pN-1) and ensure that we have unique labels for each step ('Read file 0', 'Read file 1', etc.):





variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]


Then we proceed to read each different file into its corresponding PCollection with ReadFromText and then we call the AddFilenamesFn ParDo to associate each record with the filename.





for i in range(len(result)):   
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)


where AddFilenamesFn is:





class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with filename and row"""
def process(self, element, file_path):
file_name = file_path.split("/")[-1]
yield {'filename':file_name, 'row':element}


My first approach was using a Map function directly which results in simpler code. However, result[i].path was resolved at the end of the loop and each record was incorrectly mapped to the last file of the list:





globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))


Finally, we flatten all the PCollections into one:





merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten()


and we check the results by logging the elements:





INFO:root:{'filename': u'countries2.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries2.csv', 'row': u'3,italy'}
INFO:root:{'filename': u'countries2.csv', 'row': u'4,france'}
INFO:root:{'filename': u'countries1.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries1.csv', 'row': u'1,sweden'}
INFO:root:{'filename': u'countries1.csv', 'row': u'2,spain'}


I tested this with both DirectRunner and DataflowRunner for Python SDK 2.8.0.



I hope this addresses the main issue here and you can continue by integrating BigQuery into your full use case now. You might need to use the Python Client Library for that, I wrote a similar Java example.



Full code:





import argparse, logging
from operator import add

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem

class GCSFileReader:
"""Helper class to read gcs files"""
def __init__(self, gcs):
self.gcs = gcs

class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with filename and row"""
def process(self, element, file_path):
file_name = file_path.split("/")[-1]
# yield (file_name, element) # use this to return a tuple instead
yield {'filename':file_name, 'row':element}

# just logging output to visualize results
def write_res(element):
logging.info(element)
return element

def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)

p = beam.Pipeline(options=PipelineOptions(pipeline_args))
gcs = GCSFileSystem(PipelineOptions(pipeline_args))
gcs_reader = GCSFileReader(gcs)

# in my case I am looking for files that start with 'countries'
BUCKET='BUCKET_NAME'
result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
result = reduce(add, result)

# create each input PCollection name and unique step labels
variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

# load each input file into a separate PCollection and add filename to each row
for i in range(len(result)):
# globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

# flatten all PCollections into a single one
merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)

p.run()

if __name__ == '__main__':
run()





share|improve this answer





















  • Thanks so much, this worked! It works if I merge the pcollections after and it also works if i do for I in range(len(result)): p | ... and then write to big query in the for loop. Is there a benefit to merging and then writing to big query all together?
    – calisurfer
    Nov 25 at 19:33










  • Glad to be of help. I flattened the PCollections together in case we want to apply some operations across data from different files (such as a GroupByKey, etc.) while still keeping track of its origin. In terms of performance, I think it could also help to evenly distribute the load among workers if the GCS files can have very disparate sizes but that would depend on the exact data and full use case. Feel free to omit the flatten step if there is no actual need for it -process each file separately- and the pipeline works well without it.
    – Guillem Xercavins
    Nov 26 at 16:50














2





+50







2





+50



2




+50




I tried to implement a solution with the previously cited case. There, as well as in other approaches such as this one they also get a list of file names but load all the file into a single element which might not scale well with large files. Therefore, I looked into adding the filename to each record.



As input I used two csv files:





$ gsutil cat gs://$BUCKET/countries1.csv
id,country
1,sweden
2,spain

gsutil cat gs://$BUCKET/countries2.csv
id,country
3,italy
4,france


Using GCSFileSystem.match we can access metadata_list to retrieve FileMetadata containing the file path and size in bytes. In my example:





[FileMetadata(gs://BUCKET_NAME/countries1.csv, 29),
FileMetadata(gs://BUCKET_NAME/countries2.csv, 29)]


The code is:





result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]


We will read each of the matching files into a different PCollection. As we don't know the number of files a priori we need to create programmatically a list of names for each PCollection (p0, p1, ..., pN-1) and ensure that we have unique labels for each step ('Read file 0', 'Read file 1', etc.):





variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]


Then we proceed to read each different file into its corresponding PCollection with ReadFromText and then we call the AddFilenamesFn ParDo to associate each record with the filename.





for i in range(len(result)):   
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)


where AddFilenamesFn is:





class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with filename and row"""
def process(self, element, file_path):
file_name = file_path.split("/")[-1]
yield {'filename':file_name, 'row':element}


My first approach was using a Map function directly which results in simpler code. However, result[i].path was resolved at the end of the loop and each record was incorrectly mapped to the last file of the list:





globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))


Finally, we flatten all the PCollections into one:





merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten()


and we check the results by logging the elements:





INFO:root:{'filename': u'countries2.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries2.csv', 'row': u'3,italy'}
INFO:root:{'filename': u'countries2.csv', 'row': u'4,france'}
INFO:root:{'filename': u'countries1.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries1.csv', 'row': u'1,sweden'}
INFO:root:{'filename': u'countries1.csv', 'row': u'2,spain'}


I tested this with both DirectRunner and DataflowRunner for Python SDK 2.8.0.



I hope this addresses the main issue here and you can continue by integrating BigQuery into your full use case now. You might need to use the Python Client Library for that, I wrote a similar Java example.



Full code:





import argparse, logging
from operator import add

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem

class GCSFileReader:
"""Helper class to read gcs files"""
def __init__(self, gcs):
self.gcs = gcs

class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with filename and row"""
def process(self, element, file_path):
file_name = file_path.split("/")[-1]
# yield (file_name, element) # use this to return a tuple instead
yield {'filename':file_name, 'row':element}

# just logging output to visualize results
def write_res(element):
logging.info(element)
return element

def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)

p = beam.Pipeline(options=PipelineOptions(pipeline_args))
gcs = GCSFileSystem(PipelineOptions(pipeline_args))
gcs_reader = GCSFileReader(gcs)

# in my case I am looking for files that start with 'countries'
BUCKET='BUCKET_NAME'
result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
result = reduce(add, result)

# create each input PCollection name and unique step labels
variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

# load each input file into a separate PCollection and add filename to each row
for i in range(len(result)):
# globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

# flatten all PCollections into a single one
merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)

p.run()

if __name__ == '__main__':
run()





share|improve this answer












I tried to implement a solution with the previously cited case. There, as well as in other approaches such as this one they also get a list of file names but load all the file into a single element which might not scale well with large files. Therefore, I looked into adding the filename to each record.



As input I used two csv files:





$ gsutil cat gs://$BUCKET/countries1.csv
id,country
1,sweden
2,spain

gsutil cat gs://$BUCKET/countries2.csv
id,country
3,italy
4,france


Using GCSFileSystem.match we can access metadata_list to retrieve FileMetadata containing the file path and size in bytes. In my example:





[FileMetadata(gs://BUCKET_NAME/countries1.csv, 29),
FileMetadata(gs://BUCKET_NAME/countries2.csv, 29)]


The code is:





result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]


We will read each of the matching files into a different PCollection. As we don't know the number of files a priori we need to create programmatically a list of names for each PCollection (p0, p1, ..., pN-1) and ensure that we have unique labels for each step ('Read file 0', 'Read file 1', etc.):





variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]


Then we proceed to read each different file into its corresponding PCollection with ReadFromText and then we call the AddFilenamesFn ParDo to associate each record with the filename.





for i in range(len(result)):   
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)


where AddFilenamesFn is:





class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with filename and row"""
def process(self, element, file_path):
file_name = file_path.split("/")[-1]
yield {'filename':file_name, 'row':element}


My first approach was using a Map function directly which results in simpler code. However, result[i].path was resolved at the end of the loop and each record was incorrectly mapped to the last file of the list:





globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))


Finally, we flatten all the PCollections into one:





merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten()


and we check the results by logging the elements:





INFO:root:{'filename': u'countries2.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries2.csv', 'row': u'3,italy'}
INFO:root:{'filename': u'countries2.csv', 'row': u'4,france'}
INFO:root:{'filename': u'countries1.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries1.csv', 'row': u'1,sweden'}
INFO:root:{'filename': u'countries1.csv', 'row': u'2,spain'}


I tested this with both DirectRunner and DataflowRunner for Python SDK 2.8.0.



I hope this addresses the main issue here and you can continue by integrating BigQuery into your full use case now. You might need to use the Python Client Library for that, I wrote a similar Java example.



Full code:





import argparse, logging
from operator import add

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem

class GCSFileReader:
"""Helper class to read gcs files"""
def __init__(self, gcs):
self.gcs = gcs

class AddFilenamesFn(beam.DoFn):
"""ParDo to output a dict with filename and row"""
def process(self, element, file_path):
file_name = file_path.split("/")[-1]
# yield (file_name, element) # use this to return a tuple instead
yield {'filename':file_name, 'row':element}

# just logging output to visualize results
def write_res(element):
logging.info(element)
return element

def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)

p = beam.Pipeline(options=PipelineOptions(pipeline_args))
gcs = GCSFileSystem(PipelineOptions(pipeline_args))
gcs_reader = GCSFileReader(gcs)

# in my case I am looking for files that start with 'countries'
BUCKET='BUCKET_NAME'
result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
result = reduce(add, result)

# create each input PCollection name and unique step labels
variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

# load each input file into a separate PCollection and add filename to each row
for i in range(len(result)):
# globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))
globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

# flatten all PCollections into a single one
merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)

p.run()

if __name__ == '__main__':
run()






share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 24 at 15:33









Guillem Xercavins

1,2681316




1,2681316












  • Thanks so much, this worked! It works if I merge the pcollections after and it also works if i do for I in range(len(result)): p | ... and then write to big query in the for loop. Is there a benefit to merging and then writing to big query all together?
    – calisurfer
    Nov 25 at 19:33










  • Glad to be of help. I flattened the PCollections together in case we want to apply some operations across data from different files (such as a GroupByKey, etc.) while still keeping track of its origin. In terms of performance, I think it could also help to evenly distribute the load among workers if the GCS files can have very disparate sizes but that would depend on the exact data and full use case. Feel free to omit the flatten step if there is no actual need for it -process each file separately- and the pipeline works well without it.
    – Guillem Xercavins
    Nov 26 at 16:50


















  • Thanks so much, this worked! It works if I merge the pcollections after and it also works if i do for I in range(len(result)): p | ... and then write to big query in the for loop. Is there a benefit to merging and then writing to big query all together?
    – calisurfer
    Nov 25 at 19:33










  • Glad to be of help. I flattened the PCollections together in case we want to apply some operations across data from different files (such as a GroupByKey, etc.) while still keeping track of its origin. In terms of performance, I think it could also help to evenly distribute the load among workers if the GCS files can have very disparate sizes but that would depend on the exact data and full use case. Feel free to omit the flatten step if there is no actual need for it -process each file separately- and the pipeline works well without it.
    – Guillem Xercavins
    Nov 26 at 16:50
















Thanks so much, this worked! It works if I merge the pcollections after and it also works if i do for I in range(len(result)): p | ... and then write to big query in the for loop. Is there a benefit to merging and then writing to big query all together?
– calisurfer
Nov 25 at 19:33




Thanks so much, this worked! It works if I merge the pcollections after and it also works if i do for I in range(len(result)): p | ... and then write to big query in the for loop. Is there a benefit to merging and then writing to big query all together?
– calisurfer
Nov 25 at 19:33












Glad to be of help. I flattened the PCollections together in case we want to apply some operations across data from different files (such as a GroupByKey, etc.) while still keeping track of its origin. In terms of performance, I think it could also help to evenly distribute the load among workers if the GCS files can have very disparate sizes but that would depend on the exact data and full use case. Feel free to omit the flatten step if there is no actual need for it -process each file separately- and the pipeline works well without it.
– Guillem Xercavins
Nov 26 at 16:50




Glad to be of help. I flattened the PCollections together in case we want to apply some operations across data from different files (such as a GroupByKey, etc.) while still keeping track of its origin. In terms of performance, I think it could also help to evenly distribute the load among workers if the GCS files can have very disparate sizes but that would depend on the exact data and full use case. Feel free to omit the flatten step if there is no actual need for it -process each file separately- and the pipeline works well without it.
– Guillem Xercavins
Nov 26 at 16:50


















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.





Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


Please pay close attention to the following guidance:


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53404579%2fdataflow-apache-beam-how-to-access-current-filename-when-passing-in-pattern%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

404 Error Contact Form 7 ajax form submitting

How to know if a Active Directory user can login interactively

Refactoring coordinates for Minecraft Pi buildings written in Python