How to improve the performance of this data pipeline for my tensorflow model












8















I have a tensorflow model which I am training on google-colab. The actual model is more complex, but I condensed it into a reproducible example (removed saving/restoring, learning rate decay, asserts, tensorboard events, gradient clipping and so on). The model works reasonably (converges to acceptable loss) and I am looking for a way to speed up the training (iterations per second).



Currently on colab's GPU it takes 10 minutes to train for 1000 iteration. With my current batch size of 512 it means that the model processes ~850 examples per second (I would prefer to have a batch size of 512 unless other sizes provide reasonable speedup. By itself changing batch size does not change the speed).





So currently I have a data stored in tfrecord format: here is a 500Mb example file, the total data-size is ~0.5Tb. This data passes through a reasonably heavy preprocessing step (I can't do preprocessing beforehand as it will increase the size of my tfrecords way above what I can afford). Preprocessing is done via tf.data and the output tensors ((batch_size, 8, 8, 24) which is treated as NHWC, (batch_size, 10)) are passed into a model. The example colab does not contain a simplified model which serves just as an example.





I tried a few approaches to speedup the training:





  • manual device placement (data pre-processing on cpu, propagations on gpu), but all my attempts resulted in worse speed (from 10% to 50% increase).

  • improve data preprocessing. I reviewed tf.data video and data tutorials. I tried almost every technique from that tutorial got no improvement (decrease in speed from 0% to 15%). In particular I tried:


    • dataset.prefetch(...)

    • passing num_parallel_calls to map

    • combining map and batch in tf.contrib.data.map_and_batch

    • using parallel_interleave




The code related to data preprocessing is here (here is a full reproducible example with example data):



_keys_to_map = {
'd': tf.FixedLenFeature(, tf.string), # data
's': tf.FixedLenFeature(, tf.int64), # score
}


def _parser(record):][3]
parsed = tf.parse_single_example(record, _keys_to_map)
return parsed['d'], parsed['s']


def init_tfrecord_dataset():
files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
random.shuffle(files_train)

with tf.name_scope('tfr_iterator'):
ds = tf.data.TFRecordDataset(files_train) # define data from randomly ordered files
ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
ds = ds.map(_parser) # map them based on tfrecord format
ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
ds = ds.repeat() # iterate infinitely

return ds.make_initializable_iterator() # initialize the iterator


def iterator_to_data(iterator):
"""Creates a part of the graph which reads the raw data from an iterator and transforms it to a
data ready to be passed to model.

Args:
iterator - iterator. Created by `init_tfrecord_dataset`

Returns:
data_board - (BATCH_SIZE, 8, 8, 24) you can think about as NWHC for images.
data_flags - (BATCH_SIZE, 10)
combined_score - (BATCH_SIZE,)
"""

b = tf.constant((128, 64, 32, 16, 8, 4, 2, 1), dtype=tf.uint8, name='unpacked_const')

with tf.name_scope('tfr_parse'):
with tf.name_scope('packed_data'):
next_element = iterator.get_next()
data_packed, score_int = next_element
score = tf.cast(score_int, tf.float64, name='score_float')

# https://stackoverflow.com/q/45454470/1090562
with tf.name_scope('data_unpacked'):
data_unpacked = tf.reshape(tf.mod(tf.to_int32(tf.decode_raw(data_packed, tf.uint8)[:,:,None] // b), 2), [BATCH_SIZE, 1552], name='data_unpack')

with tf.name_scope('score'):
with tf.name_scope('is_mate'):
score_is_mate = tf.cast(tf.squeeze(tf.slice(data_unpacked, [0, 1546], [BATCH_SIZE, 1])), tf.float64, name='is_mate')
with tf.name_scope('combined'):
combined_score = (1 - score_is_mate) * VALUE_A * tf.tanh(score / VALUE_K) + score_is_mate * tf.sign(score) * (VALUE_A + (1 - VALUE_A) / (VALUE_B - 1) * tf.reduce_max(tf.stack([tf.zeros(BATCH_SIZE, dtype=tf.float64), VALUE_B - tf.abs(score)]), axis=0))


with tf.name_scope('board'):
with tf.name_scope('reshape_layers'):
data_board = tf.reshape(tf.slice(data_unpacked, [0, 0], [BATCH_SIZE, 8 * 8 * 24]), [BATCH_SIZE, 8, 8, 24], name='board_reshape')

with tf.name_scope('combine_layers'):
data_board = tf.cast(tf.stack([
data_board[:,:,:, 0],
data_board[:,:,:, 4],
data_board[:,:,:, 8],
data_board[:,:,:,12],
data_board[:,:,:,16],
data_board[:,:,:,20],
- data_board[:,:,:, 1],
- data_board[:,:,:, 5],
- data_board[:,:,:, 9],
- data_board[:,:,:,13],
- data_board[:,:,:,17],
- data_board[:,:,:,21],
data_board[:,:,:, 2],
data_board[:,:,:, 6],
data_board[:,:,:,10],
data_board[:,:,:,14],
data_board[:,:,:,18],
data_board[:,:,:,22],
- data_board[:,:,:, 3],
- data_board[:,:,:, 7],
- data_board[:,:,:,11],
- data_board[:,:,:,15],
- data_board[:,:,:,19],
- data_board[:,:,:,23],
], axis=3), tf.float64, name='board_compact')

with tf.name_scope('flags'):
data_flags = tf.cast(tf.slice(data_unpacked, [0, 1536], [BATCH_SIZE, 10]), tf.float64, name='flags')

return data_board, data_flags, combined_score




I am looking for practical solutions (I have tried significant amount of theoretical ideas) which can improve the the speed of training (in terms of examples/second). I am not looking for a way to improve the accuracy of the model (or modify the model) as this is just a test model.



I have spent significant amount of time trying to optimize this (and gave up). So I would be happy to award a bounty of 200 for a working solution with a nice explanation.










share|improve this question























  • Are you reading tfrecords from drive?

    – mlRocks
    Nov 30 '18 at 6:29











  • @mlRocks yes, I am reading it from gDrive. You can actually look at the full implementation in the full reproducible link from the question.

    – Salvador Dali
    Nov 30 '18 at 7:08











  • This may it be helpful :tensorflow.org/guide/performance/… github.com/tensorflow/tensorflow/issues/14857

    – i_th
    Dec 2 '18 at 19:49













  • @SalvadorDali it's the known problem. Because it's not a physical drive like attached to your computer, reading from it will be slow

    – mlRocks
    Dec 3 '18 at 6:15
















8















I have a tensorflow model which I am training on google-colab. The actual model is more complex, but I condensed it into a reproducible example (removed saving/restoring, learning rate decay, asserts, tensorboard events, gradient clipping and so on). The model works reasonably (converges to acceptable loss) and I am looking for a way to speed up the training (iterations per second).



Currently on colab's GPU it takes 10 minutes to train for 1000 iteration. With my current batch size of 512 it means that the model processes ~850 examples per second (I would prefer to have a batch size of 512 unless other sizes provide reasonable speedup. By itself changing batch size does not change the speed).





So currently I have a data stored in tfrecord format: here is a 500Mb example file, the total data-size is ~0.5Tb. This data passes through a reasonably heavy preprocessing step (I can't do preprocessing beforehand as it will increase the size of my tfrecords way above what I can afford). Preprocessing is done via tf.data and the output tensors ((batch_size, 8, 8, 24) which is treated as NHWC, (batch_size, 10)) are passed into a model. The example colab does not contain a simplified model which serves just as an example.





I tried a few approaches to speedup the training:





  • manual device placement (data pre-processing on cpu, propagations on gpu), but all my attempts resulted in worse speed (from 10% to 50% increase).

  • improve data preprocessing. I reviewed tf.data video and data tutorials. I tried almost every technique from that tutorial got no improvement (decrease in speed from 0% to 15%). In particular I tried:


    • dataset.prefetch(...)

    • passing num_parallel_calls to map

    • combining map and batch in tf.contrib.data.map_and_batch

    • using parallel_interleave




The code related to data preprocessing is here (here is a full reproducible example with example data):



_keys_to_map = {
'd': tf.FixedLenFeature(, tf.string), # data
's': tf.FixedLenFeature(, tf.int64), # score
}


def _parser(record):][3]
parsed = tf.parse_single_example(record, _keys_to_map)
return parsed['d'], parsed['s']


def init_tfrecord_dataset():
files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
random.shuffle(files_train)

with tf.name_scope('tfr_iterator'):
ds = tf.data.TFRecordDataset(files_train) # define data from randomly ordered files
ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
ds = ds.map(_parser) # map them based on tfrecord format
ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
ds = ds.repeat() # iterate infinitely

return ds.make_initializable_iterator() # initialize the iterator


def iterator_to_data(iterator):
"""Creates a part of the graph which reads the raw data from an iterator and transforms it to a
data ready to be passed to model.

Args:
iterator - iterator. Created by `init_tfrecord_dataset`

Returns:
data_board - (BATCH_SIZE, 8, 8, 24) you can think about as NWHC for images.
data_flags - (BATCH_SIZE, 10)
combined_score - (BATCH_SIZE,)
"""

b = tf.constant((128, 64, 32, 16, 8, 4, 2, 1), dtype=tf.uint8, name='unpacked_const')

with tf.name_scope('tfr_parse'):
with tf.name_scope('packed_data'):
next_element = iterator.get_next()
data_packed, score_int = next_element
score = tf.cast(score_int, tf.float64, name='score_float')

# https://stackoverflow.com/q/45454470/1090562
with tf.name_scope('data_unpacked'):
data_unpacked = tf.reshape(tf.mod(tf.to_int32(tf.decode_raw(data_packed, tf.uint8)[:,:,None] // b), 2), [BATCH_SIZE, 1552], name='data_unpack')

with tf.name_scope('score'):
with tf.name_scope('is_mate'):
score_is_mate = tf.cast(tf.squeeze(tf.slice(data_unpacked, [0, 1546], [BATCH_SIZE, 1])), tf.float64, name='is_mate')
with tf.name_scope('combined'):
combined_score = (1 - score_is_mate) * VALUE_A * tf.tanh(score / VALUE_K) + score_is_mate * tf.sign(score) * (VALUE_A + (1 - VALUE_A) / (VALUE_B - 1) * tf.reduce_max(tf.stack([tf.zeros(BATCH_SIZE, dtype=tf.float64), VALUE_B - tf.abs(score)]), axis=0))


with tf.name_scope('board'):
with tf.name_scope('reshape_layers'):
data_board = tf.reshape(tf.slice(data_unpacked, [0, 0], [BATCH_SIZE, 8 * 8 * 24]), [BATCH_SIZE, 8, 8, 24], name='board_reshape')

with tf.name_scope('combine_layers'):
data_board = tf.cast(tf.stack([
data_board[:,:,:, 0],
data_board[:,:,:, 4],
data_board[:,:,:, 8],
data_board[:,:,:,12],
data_board[:,:,:,16],
data_board[:,:,:,20],
- data_board[:,:,:, 1],
- data_board[:,:,:, 5],
- data_board[:,:,:, 9],
- data_board[:,:,:,13],
- data_board[:,:,:,17],
- data_board[:,:,:,21],
data_board[:,:,:, 2],
data_board[:,:,:, 6],
data_board[:,:,:,10],
data_board[:,:,:,14],
data_board[:,:,:,18],
data_board[:,:,:,22],
- data_board[:,:,:, 3],
- data_board[:,:,:, 7],
- data_board[:,:,:,11],
- data_board[:,:,:,15],
- data_board[:,:,:,19],
- data_board[:,:,:,23],
], axis=3), tf.float64, name='board_compact')

with tf.name_scope('flags'):
data_flags = tf.cast(tf.slice(data_unpacked, [0, 1536], [BATCH_SIZE, 10]), tf.float64, name='flags')

return data_board, data_flags, combined_score




I am looking for practical solutions (I have tried significant amount of theoretical ideas) which can improve the the speed of training (in terms of examples/second). I am not looking for a way to improve the accuracy of the model (or modify the model) as this is just a test model.



I have spent significant amount of time trying to optimize this (and gave up). So I would be happy to award a bounty of 200 for a working solution with a nice explanation.










share|improve this question























  • Are you reading tfrecords from drive?

    – mlRocks
    Nov 30 '18 at 6:29











  • @mlRocks yes, I am reading it from gDrive. You can actually look at the full implementation in the full reproducible link from the question.

    – Salvador Dali
    Nov 30 '18 at 7:08











  • This may it be helpful :tensorflow.org/guide/performance/… github.com/tensorflow/tensorflow/issues/14857

    – i_th
    Dec 2 '18 at 19:49













  • @SalvadorDali it's the known problem. Because it's not a physical drive like attached to your computer, reading from it will be slow

    – mlRocks
    Dec 3 '18 at 6:15














8












8








8


3






I have a tensorflow model which I am training on google-colab. The actual model is more complex, but I condensed it into a reproducible example (removed saving/restoring, learning rate decay, asserts, tensorboard events, gradient clipping and so on). The model works reasonably (converges to acceptable loss) and I am looking for a way to speed up the training (iterations per second).



Currently on colab's GPU it takes 10 minutes to train for 1000 iteration. With my current batch size of 512 it means that the model processes ~850 examples per second (I would prefer to have a batch size of 512 unless other sizes provide reasonable speedup. By itself changing batch size does not change the speed).





So currently I have a data stored in tfrecord format: here is a 500Mb example file, the total data-size is ~0.5Tb. This data passes through a reasonably heavy preprocessing step (I can't do preprocessing beforehand as it will increase the size of my tfrecords way above what I can afford). Preprocessing is done via tf.data and the output tensors ((batch_size, 8, 8, 24) which is treated as NHWC, (batch_size, 10)) are passed into a model. The example colab does not contain a simplified model which serves just as an example.





I tried a few approaches to speedup the training:





  • manual device placement (data pre-processing on cpu, propagations on gpu), but all my attempts resulted in worse speed (from 10% to 50% increase).

  • improve data preprocessing. I reviewed tf.data video and data tutorials. I tried almost every technique from that tutorial got no improvement (decrease in speed from 0% to 15%). In particular I tried:


    • dataset.prefetch(...)

    • passing num_parallel_calls to map

    • combining map and batch in tf.contrib.data.map_and_batch

    • using parallel_interleave




The code related to data preprocessing is here (here is a full reproducible example with example data):



_keys_to_map = {
'd': tf.FixedLenFeature(, tf.string), # data
's': tf.FixedLenFeature(, tf.int64), # score
}


def _parser(record):][3]
parsed = tf.parse_single_example(record, _keys_to_map)
return parsed['d'], parsed['s']


def init_tfrecord_dataset():
files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
random.shuffle(files_train)

with tf.name_scope('tfr_iterator'):
ds = tf.data.TFRecordDataset(files_train) # define data from randomly ordered files
ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
ds = ds.map(_parser) # map them based on tfrecord format
ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
ds = ds.repeat() # iterate infinitely

return ds.make_initializable_iterator() # initialize the iterator


def iterator_to_data(iterator):
"""Creates a part of the graph which reads the raw data from an iterator and transforms it to a
data ready to be passed to model.

Args:
iterator - iterator. Created by `init_tfrecord_dataset`

Returns:
data_board - (BATCH_SIZE, 8, 8, 24) you can think about as NWHC for images.
data_flags - (BATCH_SIZE, 10)
combined_score - (BATCH_SIZE,)
"""

b = tf.constant((128, 64, 32, 16, 8, 4, 2, 1), dtype=tf.uint8, name='unpacked_const')

with tf.name_scope('tfr_parse'):
with tf.name_scope('packed_data'):
next_element = iterator.get_next()
data_packed, score_int = next_element
score = tf.cast(score_int, tf.float64, name='score_float')

# https://stackoverflow.com/q/45454470/1090562
with tf.name_scope('data_unpacked'):
data_unpacked = tf.reshape(tf.mod(tf.to_int32(tf.decode_raw(data_packed, tf.uint8)[:,:,None] // b), 2), [BATCH_SIZE, 1552], name='data_unpack')

with tf.name_scope('score'):
with tf.name_scope('is_mate'):
score_is_mate = tf.cast(tf.squeeze(tf.slice(data_unpacked, [0, 1546], [BATCH_SIZE, 1])), tf.float64, name='is_mate')
with tf.name_scope('combined'):
combined_score = (1 - score_is_mate) * VALUE_A * tf.tanh(score / VALUE_K) + score_is_mate * tf.sign(score) * (VALUE_A + (1 - VALUE_A) / (VALUE_B - 1) * tf.reduce_max(tf.stack([tf.zeros(BATCH_SIZE, dtype=tf.float64), VALUE_B - tf.abs(score)]), axis=0))


with tf.name_scope('board'):
with tf.name_scope('reshape_layers'):
data_board = tf.reshape(tf.slice(data_unpacked, [0, 0], [BATCH_SIZE, 8 * 8 * 24]), [BATCH_SIZE, 8, 8, 24], name='board_reshape')

with tf.name_scope('combine_layers'):
data_board = tf.cast(tf.stack([
data_board[:,:,:, 0],
data_board[:,:,:, 4],
data_board[:,:,:, 8],
data_board[:,:,:,12],
data_board[:,:,:,16],
data_board[:,:,:,20],
- data_board[:,:,:, 1],
- data_board[:,:,:, 5],
- data_board[:,:,:, 9],
- data_board[:,:,:,13],
- data_board[:,:,:,17],
- data_board[:,:,:,21],
data_board[:,:,:, 2],
data_board[:,:,:, 6],
data_board[:,:,:,10],
data_board[:,:,:,14],
data_board[:,:,:,18],
data_board[:,:,:,22],
- data_board[:,:,:, 3],
- data_board[:,:,:, 7],
- data_board[:,:,:,11],
- data_board[:,:,:,15],
- data_board[:,:,:,19],
- data_board[:,:,:,23],
], axis=3), tf.float64, name='board_compact')

with tf.name_scope('flags'):
data_flags = tf.cast(tf.slice(data_unpacked, [0, 1536], [BATCH_SIZE, 10]), tf.float64, name='flags')

return data_board, data_flags, combined_score




I am looking for practical solutions (I have tried significant amount of theoretical ideas) which can improve the the speed of training (in terms of examples/second). I am not looking for a way to improve the accuracy of the model (or modify the model) as this is just a test model.



I have spent significant amount of time trying to optimize this (and gave up). So I would be happy to award a bounty of 200 for a working solution with a nice explanation.










share|improve this question














I have a tensorflow model which I am training on google-colab. The actual model is more complex, but I condensed it into a reproducible example (removed saving/restoring, learning rate decay, asserts, tensorboard events, gradient clipping and so on). The model works reasonably (converges to acceptable loss) and I am looking for a way to speed up the training (iterations per second).



Currently on colab's GPU it takes 10 minutes to train for 1000 iteration. With my current batch size of 512 it means that the model processes ~850 examples per second (I would prefer to have a batch size of 512 unless other sizes provide reasonable speedup. By itself changing batch size does not change the speed).





So currently I have a data stored in tfrecord format: here is a 500Mb example file, the total data-size is ~0.5Tb. This data passes through a reasonably heavy preprocessing step (I can't do preprocessing beforehand as it will increase the size of my tfrecords way above what I can afford). Preprocessing is done via tf.data and the output tensors ((batch_size, 8, 8, 24) which is treated as NHWC, (batch_size, 10)) are passed into a model. The example colab does not contain a simplified model which serves just as an example.





I tried a few approaches to speedup the training:





  • manual device placement (data pre-processing on cpu, propagations on gpu), but all my attempts resulted in worse speed (from 10% to 50% increase).

  • improve data preprocessing. I reviewed tf.data video and data tutorials. I tried almost every technique from that tutorial got no improvement (decrease in speed from 0% to 15%). In particular I tried:


    • dataset.prefetch(...)

    • passing num_parallel_calls to map

    • combining map and batch in tf.contrib.data.map_and_batch

    • using parallel_interleave




The code related to data preprocessing is here (here is a full reproducible example with example data):



_keys_to_map = {
'd': tf.FixedLenFeature(, tf.string), # data
's': tf.FixedLenFeature(, tf.int64), # score
}


def _parser(record):][3]
parsed = tf.parse_single_example(record, _keys_to_map)
return parsed['d'], parsed['s']


def init_tfrecord_dataset():
files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
random.shuffle(files_train)

with tf.name_scope('tfr_iterator'):
ds = tf.data.TFRecordDataset(files_train) # define data from randomly ordered files
ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
ds = ds.map(_parser) # map them based on tfrecord format
ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
ds = ds.repeat() # iterate infinitely

return ds.make_initializable_iterator() # initialize the iterator


def iterator_to_data(iterator):
"""Creates a part of the graph which reads the raw data from an iterator and transforms it to a
data ready to be passed to model.

Args:
iterator - iterator. Created by `init_tfrecord_dataset`

Returns:
data_board - (BATCH_SIZE, 8, 8, 24) you can think about as NWHC for images.
data_flags - (BATCH_SIZE, 10)
combined_score - (BATCH_SIZE,)
"""

b = tf.constant((128, 64, 32, 16, 8, 4, 2, 1), dtype=tf.uint8, name='unpacked_const')

with tf.name_scope('tfr_parse'):
with tf.name_scope('packed_data'):
next_element = iterator.get_next()
data_packed, score_int = next_element
score = tf.cast(score_int, tf.float64, name='score_float')

# https://stackoverflow.com/q/45454470/1090562
with tf.name_scope('data_unpacked'):
data_unpacked = tf.reshape(tf.mod(tf.to_int32(tf.decode_raw(data_packed, tf.uint8)[:,:,None] // b), 2), [BATCH_SIZE, 1552], name='data_unpack')

with tf.name_scope('score'):
with tf.name_scope('is_mate'):
score_is_mate = tf.cast(tf.squeeze(tf.slice(data_unpacked, [0, 1546], [BATCH_SIZE, 1])), tf.float64, name='is_mate')
with tf.name_scope('combined'):
combined_score = (1 - score_is_mate) * VALUE_A * tf.tanh(score / VALUE_K) + score_is_mate * tf.sign(score) * (VALUE_A + (1 - VALUE_A) / (VALUE_B - 1) * tf.reduce_max(tf.stack([tf.zeros(BATCH_SIZE, dtype=tf.float64), VALUE_B - tf.abs(score)]), axis=0))


with tf.name_scope('board'):
with tf.name_scope('reshape_layers'):
data_board = tf.reshape(tf.slice(data_unpacked, [0, 0], [BATCH_SIZE, 8 * 8 * 24]), [BATCH_SIZE, 8, 8, 24], name='board_reshape')

with tf.name_scope('combine_layers'):
data_board = tf.cast(tf.stack([
data_board[:,:,:, 0],
data_board[:,:,:, 4],
data_board[:,:,:, 8],
data_board[:,:,:,12],
data_board[:,:,:,16],
data_board[:,:,:,20],
- data_board[:,:,:, 1],
- data_board[:,:,:, 5],
- data_board[:,:,:, 9],
- data_board[:,:,:,13],
- data_board[:,:,:,17],
- data_board[:,:,:,21],
data_board[:,:,:, 2],
data_board[:,:,:, 6],
data_board[:,:,:,10],
data_board[:,:,:,14],
data_board[:,:,:,18],
data_board[:,:,:,22],
- data_board[:,:,:, 3],
- data_board[:,:,:, 7],
- data_board[:,:,:,11],
- data_board[:,:,:,15],
- data_board[:,:,:,19],
- data_board[:,:,:,23],
], axis=3), tf.float64, name='board_compact')

with tf.name_scope('flags'):
data_flags = tf.cast(tf.slice(data_unpacked, [0, 1536], [BATCH_SIZE, 10]), tf.float64, name='flags')

return data_board, data_flags, combined_score




I am looking for practical solutions (I have tried significant amount of theoretical ideas) which can improve the the speed of training (in terms of examples/second). I am not looking for a way to improve the accuracy of the model (or modify the model) as this is just a test model.



I have spent significant amount of time trying to optimize this (and gave up). So I would be happy to award a bounty of 200 for a working solution with a nice explanation.







python tensorflow tensorflow-datasets






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 22 '18 at 4:59









Salvador DaliSalvador Dali

112k81486579




112k81486579













  • Are you reading tfrecords from drive?

    – mlRocks
    Nov 30 '18 at 6:29











  • @mlRocks yes, I am reading it from gDrive. You can actually look at the full implementation in the full reproducible link from the question.

    – Salvador Dali
    Nov 30 '18 at 7:08











  • This may it be helpful :tensorflow.org/guide/performance/… github.com/tensorflow/tensorflow/issues/14857

    – i_th
    Dec 2 '18 at 19:49













  • @SalvadorDali it's the known problem. Because it's not a physical drive like attached to your computer, reading from it will be slow

    – mlRocks
    Dec 3 '18 at 6:15



















  • Are you reading tfrecords from drive?

    – mlRocks
    Nov 30 '18 at 6:29











  • @mlRocks yes, I am reading it from gDrive. You can actually look at the full implementation in the full reproducible link from the question.

    – Salvador Dali
    Nov 30 '18 at 7:08











  • This may it be helpful :tensorflow.org/guide/performance/… github.com/tensorflow/tensorflow/issues/14857

    – i_th
    Dec 2 '18 at 19:49













  • @SalvadorDali it's the known problem. Because it's not a physical drive like attached to your computer, reading from it will be slow

    – mlRocks
    Dec 3 '18 at 6:15

















Are you reading tfrecords from drive?

– mlRocks
Nov 30 '18 at 6:29





Are you reading tfrecords from drive?

– mlRocks
Nov 30 '18 at 6:29













@mlRocks yes, I am reading it from gDrive. You can actually look at the full implementation in the full reproducible link from the question.

– Salvador Dali
Nov 30 '18 at 7:08





@mlRocks yes, I am reading it from gDrive. You can actually look at the full implementation in the full reproducible link from the question.

– Salvador Dali
Nov 30 '18 at 7:08













This may it be helpful :tensorflow.org/guide/performance/… github.com/tensorflow/tensorflow/issues/14857

– i_th
Dec 2 '18 at 19:49







This may it be helpful :tensorflow.org/guide/performance/… github.com/tensorflow/tensorflow/issues/14857

– i_th
Dec 2 '18 at 19:49















@SalvadorDali it's the known problem. Because it's not a physical drive like attached to your computer, reading from it will be slow

– mlRocks
Dec 3 '18 at 6:15





@SalvadorDali it's the known problem. Because it's not a physical drive like attached to your computer, reading from it will be slow

– mlRocks
Dec 3 '18 at 6:15












2 Answers
2






active

oldest

votes


















4





+200









The suggestion from hampi to profile your training job is a good one, and may be necessary to understand the actual bottlenecks in your pipeline. The other suggestions in the Input Pipeline performance guide should be useful as well.



However, there is another possible "quick fix" that might be useful. In some cases, the amount of work in a Dataset.map() transformation can be very small, and dominated by the cost of invoking the function for each element. In those cases, we often try to vectorize the map function, and move it after the Dataset.batch() transformation, in order to invoke the function fewer times (1/512 as many times, in this case), and perform larger—and potentially easier-to-parallelize—operations on each batch. Fortunately, your pipeline can be vectorized as follows:



def _batch_parser(record_batch):
# NOTE: Use `tf.parse_example()` to operate on batches of records.
parsed = tf.parse_example(record_batch, _keys_to_map)
return parsed['d'], parsed['s']

def init_tfrecord_dataset():
files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
random.shuffle(files_train)

with tf.name_scope('tfr_iterator'):
ds = tf.data.TFRecordDataset(files_train) # define data from randomly ordered files
ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
# NOTE: Change begins here.
ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
ds = ds.map(_batch_parser) # map batches based on tfrecord format
# NOTE: Change ends here.
ds = ds.repeat() # iterate infinitely

return ds.make_initializable_iterator() # initialize the iterator


Currently, vectorization is a change that you have to make manually, but the tf.data team are working on an optimization pass that provides automatic vectorization.






share|improve this answer
























  • Thank you very much. Based on my tests (on a bigger dataset) this gives me ~3-5% speed-up (not sure whether this is statistically significant or a random fluctuation). Not as much as I was hoping for, but very good for a 3 lines change.

    – Salvador Dali
    Dec 4 '18 at 7:15



















4














I have a couple of suggestions:



1) After creating the batch, the entire batch is processed by the iterator_to_data() function. This isn't really distributing the task on multiple threads, atleast not at the api level. Instead, you could try something like this in the init_tfrecord_dataset() function:



ds = tf.data.TFRecordDataset(files_train)      # define data from randomly ordered files
ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
ds = ds.map(_parser)
ds = ds.map(map_func=iterator_to_data, num_parallel_calls=FLAGS.num_preprocessing_threads)
ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
ds = ds.repeat()


you might also want to change a few lines in the iterator_to_data() fucntion as the input argument is not a iterator with the above changes.



2) You might also want to get the profiling information using something like tf.train.ProfilerHook. This can tell you if the bottleneck is with the cpu or gpu. For example, if the bottleneck is with the CPU, you could see GPU ops waiting for memcpyHtoD op to complete.






share|improve this answer

























    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53424152%2fhow-to-improve-the-performance-of-this-data-pipeline-for-my-tensorflow-model%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    2 Answers
    2






    active

    oldest

    votes








    2 Answers
    2






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    4





    +200









    The suggestion from hampi to profile your training job is a good one, and may be necessary to understand the actual bottlenecks in your pipeline. The other suggestions in the Input Pipeline performance guide should be useful as well.



    However, there is another possible "quick fix" that might be useful. In some cases, the amount of work in a Dataset.map() transformation can be very small, and dominated by the cost of invoking the function for each element. In those cases, we often try to vectorize the map function, and move it after the Dataset.batch() transformation, in order to invoke the function fewer times (1/512 as many times, in this case), and perform larger—and potentially easier-to-parallelize—operations on each batch. Fortunately, your pipeline can be vectorized as follows:



    def _batch_parser(record_batch):
    # NOTE: Use `tf.parse_example()` to operate on batches of records.
    parsed = tf.parse_example(record_batch, _keys_to_map)
    return parsed['d'], parsed['s']

    def init_tfrecord_dataset():
    files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
    random.shuffle(files_train)

    with tf.name_scope('tfr_iterator'):
    ds = tf.data.TFRecordDataset(files_train) # define data from randomly ordered files
    ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
    # NOTE: Change begins here.
    ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
    ds = ds.map(_batch_parser) # map batches based on tfrecord format
    # NOTE: Change ends here.
    ds = ds.repeat() # iterate infinitely

    return ds.make_initializable_iterator() # initialize the iterator


    Currently, vectorization is a change that you have to make manually, but the tf.data team are working on an optimization pass that provides automatic vectorization.






    share|improve this answer
























    • Thank you very much. Based on my tests (on a bigger dataset) this gives me ~3-5% speed-up (not sure whether this is statistically significant or a random fluctuation). Not as much as I was hoping for, but very good for a 3 lines change.

      – Salvador Dali
      Dec 4 '18 at 7:15
















    4





    +200









    The suggestion from hampi to profile your training job is a good one, and may be necessary to understand the actual bottlenecks in your pipeline. The other suggestions in the Input Pipeline performance guide should be useful as well.



    However, there is another possible "quick fix" that might be useful. In some cases, the amount of work in a Dataset.map() transformation can be very small, and dominated by the cost of invoking the function for each element. In those cases, we often try to vectorize the map function, and move it after the Dataset.batch() transformation, in order to invoke the function fewer times (1/512 as many times, in this case), and perform larger—and potentially easier-to-parallelize—operations on each batch. Fortunately, your pipeline can be vectorized as follows:



    def _batch_parser(record_batch):
    # NOTE: Use `tf.parse_example()` to operate on batches of records.
    parsed = tf.parse_example(record_batch, _keys_to_map)
    return parsed['d'], parsed['s']

    def init_tfrecord_dataset():
    files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
    random.shuffle(files_train)

    with tf.name_scope('tfr_iterator'):
    ds = tf.data.TFRecordDataset(files_train) # define data from randomly ordered files
    ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
    # NOTE: Change begins here.
    ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
    ds = ds.map(_batch_parser) # map batches based on tfrecord format
    # NOTE: Change ends here.
    ds = ds.repeat() # iterate infinitely

    return ds.make_initializable_iterator() # initialize the iterator


    Currently, vectorization is a change that you have to make manually, but the tf.data team are working on an optimization pass that provides automatic vectorization.






    share|improve this answer
























    • Thank you very much. Based on my tests (on a bigger dataset) this gives me ~3-5% speed-up (not sure whether this is statistically significant or a random fluctuation). Not as much as I was hoping for, but very good for a 3 lines change.

      – Salvador Dali
      Dec 4 '18 at 7:15














    4





    +200







    4





    +200



    4




    +200





    The suggestion from hampi to profile your training job is a good one, and may be necessary to understand the actual bottlenecks in your pipeline. The other suggestions in the Input Pipeline performance guide should be useful as well.



    However, there is another possible "quick fix" that might be useful. In some cases, the amount of work in a Dataset.map() transformation can be very small, and dominated by the cost of invoking the function for each element. In those cases, we often try to vectorize the map function, and move it after the Dataset.batch() transformation, in order to invoke the function fewer times (1/512 as many times, in this case), and perform larger—and potentially easier-to-parallelize—operations on each batch. Fortunately, your pipeline can be vectorized as follows:



    def _batch_parser(record_batch):
    # NOTE: Use `tf.parse_example()` to operate on batches of records.
    parsed = tf.parse_example(record_batch, _keys_to_map)
    return parsed['d'], parsed['s']

    def init_tfrecord_dataset():
    files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
    random.shuffle(files_train)

    with tf.name_scope('tfr_iterator'):
    ds = tf.data.TFRecordDataset(files_train) # define data from randomly ordered files
    ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
    # NOTE: Change begins here.
    ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
    ds = ds.map(_batch_parser) # map batches based on tfrecord format
    # NOTE: Change ends here.
    ds = ds.repeat() # iterate infinitely

    return ds.make_initializable_iterator() # initialize the iterator


    Currently, vectorization is a change that you have to make manually, but the tf.data team are working on an optimization pass that provides automatic vectorization.






    share|improve this answer













    The suggestion from hampi to profile your training job is a good one, and may be necessary to understand the actual bottlenecks in your pipeline. The other suggestions in the Input Pipeline performance guide should be useful as well.



    However, there is another possible "quick fix" that might be useful. In some cases, the amount of work in a Dataset.map() transformation can be very small, and dominated by the cost of invoking the function for each element. In those cases, we often try to vectorize the map function, and move it after the Dataset.batch() transformation, in order to invoke the function fewer times (1/512 as many times, in this case), and perform larger—and potentially easier-to-parallelize—operations on each batch. Fortunately, your pipeline can be vectorized as follows:



    def _batch_parser(record_batch):
    # NOTE: Use `tf.parse_example()` to operate on batches of records.
    parsed = tf.parse_example(record_batch, _keys_to_map)
    return parsed['d'], parsed['s']

    def init_tfrecord_dataset():
    files_train = glob.glob(DIR_TFRECORDS + '*.tfrecord')
    random.shuffle(files_train)

    with tf.name_scope('tfr_iterator'):
    ds = tf.data.TFRecordDataset(files_train) # define data from randomly ordered files
    ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
    # NOTE: Change begins here.
    ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
    ds = ds.map(_batch_parser) # map batches based on tfrecord format
    # NOTE: Change ends here.
    ds = ds.repeat() # iterate infinitely

    return ds.make_initializable_iterator() # initialize the iterator


    Currently, vectorization is a change that you have to make manually, but the tf.data team are working on an optimization pass that provides automatic vectorization.







    share|improve this answer












    share|improve this answer



    share|improve this answer










    answered Dec 2 '18 at 23:03









    mrrymrry

    96.2k12276334




    96.2k12276334













    • Thank you very much. Based on my tests (on a bigger dataset) this gives me ~3-5% speed-up (not sure whether this is statistically significant or a random fluctuation). Not as much as I was hoping for, but very good for a 3 lines change.

      – Salvador Dali
      Dec 4 '18 at 7:15



















    • Thank you very much. Based on my tests (on a bigger dataset) this gives me ~3-5% speed-up (not sure whether this is statistically significant or a random fluctuation). Not as much as I was hoping for, but very good for a 3 lines change.

      – Salvador Dali
      Dec 4 '18 at 7:15

















    Thank you very much. Based on my tests (on a bigger dataset) this gives me ~3-5% speed-up (not sure whether this is statistically significant or a random fluctuation). Not as much as I was hoping for, but very good for a 3 lines change.

    – Salvador Dali
    Dec 4 '18 at 7:15





    Thank you very much. Based on my tests (on a bigger dataset) this gives me ~3-5% speed-up (not sure whether this is statistically significant or a random fluctuation). Not as much as I was hoping for, but very good for a 3 lines change.

    – Salvador Dali
    Dec 4 '18 at 7:15













    4














    I have a couple of suggestions:



    1) After creating the batch, the entire batch is processed by the iterator_to_data() function. This isn't really distributing the task on multiple threads, atleast not at the api level. Instead, you could try something like this in the init_tfrecord_dataset() function:



    ds = tf.data.TFRecordDataset(files_train)      # define data from randomly ordered files
    ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
    ds = ds.map(_parser)
    ds = ds.map(map_func=iterator_to_data, num_parallel_calls=FLAGS.num_preprocessing_threads)
    ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
    ds = ds.repeat()


    you might also want to change a few lines in the iterator_to_data() fucntion as the input argument is not a iterator with the above changes.



    2) You might also want to get the profiling information using something like tf.train.ProfilerHook. This can tell you if the bottleneck is with the cpu or gpu. For example, if the bottleneck is with the CPU, you could see GPU ops waiting for memcpyHtoD op to complete.






    share|improve this answer






























      4














      I have a couple of suggestions:



      1) After creating the batch, the entire batch is processed by the iterator_to_data() function. This isn't really distributing the task on multiple threads, atleast not at the api level. Instead, you could try something like this in the init_tfrecord_dataset() function:



      ds = tf.data.TFRecordDataset(files_train)      # define data from randomly ordered files
      ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
      ds = ds.map(_parser)
      ds = ds.map(map_func=iterator_to_data, num_parallel_calls=FLAGS.num_preprocessing_threads)
      ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
      ds = ds.repeat()


      you might also want to change a few lines in the iterator_to_data() fucntion as the input argument is not a iterator with the above changes.



      2) You might also want to get the profiling information using something like tf.train.ProfilerHook. This can tell you if the bottleneck is with the cpu or gpu. For example, if the bottleneck is with the CPU, you could see GPU ops waiting for memcpyHtoD op to complete.






      share|improve this answer




























        4












        4








        4







        I have a couple of suggestions:



        1) After creating the batch, the entire batch is processed by the iterator_to_data() function. This isn't really distributing the task on multiple threads, atleast not at the api level. Instead, you could try something like this in the init_tfrecord_dataset() function:



        ds = tf.data.TFRecordDataset(files_train)      # define data from randomly ordered files
        ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
        ds = ds.map(_parser)
        ds = ds.map(map_func=iterator_to_data, num_parallel_calls=FLAGS.num_preprocessing_threads)
        ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
        ds = ds.repeat()


        you might also want to change a few lines in the iterator_to_data() fucntion as the input argument is not a iterator with the above changes.



        2) You might also want to get the profiling information using something like tf.train.ProfilerHook. This can tell you if the bottleneck is with the cpu or gpu. For example, if the bottleneck is with the CPU, you could see GPU ops waiting for memcpyHtoD op to complete.






        share|improve this answer















        I have a couple of suggestions:



        1) After creating the batch, the entire batch is processed by the iterator_to_data() function. This isn't really distributing the task on multiple threads, atleast not at the api level. Instead, you could try something like this in the init_tfrecord_dataset() function:



        ds = tf.data.TFRecordDataset(files_train)      # define data from randomly ordered files
        ds = ds.shuffle(buffer_size=10000) # select elements randomly from the buffer
        ds = ds.map(_parser)
        ds = ds.map(map_func=iterator_to_data, num_parallel_calls=FLAGS.num_preprocessing_threads)
        ds = ds.batch(BATCH_SIZE, drop_remainder=True) # group elements in batch (remove batch of less than BATCH_SIZE)
        ds = ds.repeat()


        you might also want to change a few lines in the iterator_to_data() fucntion as the input argument is not a iterator with the above changes.



        2) You might also want to get the profiling information using something like tf.train.ProfilerHook. This can tell you if the bottleneck is with the cpu or gpu. For example, if the bottleneck is with the CPU, you could see GPU ops waiting for memcpyHtoD op to complete.







        share|improve this answer














        share|improve this answer



        share|improve this answer








        edited Dec 4 '18 at 7:17









        Salvador Dali

        112k81486579




        112k81486579










        answered Nov 22 '18 at 15:26









        hampihampi

        1164




        1164






























            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53424152%2fhow-to-improve-the-performance-of-this-data-pipeline-for-my-tensorflow-model%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            404 Error Contact Form 7 ajax form submitting

            How to know if a Active Directory user can login interactively

            TypeError: fit_transform() missing 1 required positional argument: 'X'