Confusing error during Apache Beam's `process_outputs` on Google Dataflow worker












0














I'm running the following successful Apache Beam test pipeline on Google Dataflow. It uses Datastore as a source and sink. Many entities in our database are assigned to namespaces. This pipeline is meant to perform _do_work() on all entities of a certain kind in the given namespaces. Note, a similar test pipeline that does the same thing to non-namespaced entities also runs successfully:



import apache_beam as beam
from apache_beam.io.gcp.datastore.v1 import helper as apache_helper
from apache_beam.io.gcp.datastore.v1 import datastoreio
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper

from .pipelines.dataflow_settings import (
PROJECT, NAMESPACES_PER_PIPELINE
)

class NamespacedDatastoreMigration(_DatastoreMigrationBase):
"""
Map a do-function over a query multiplexed across several namespaces.

The inheritor must implement the following:
- a PROJECT class attribute
- a do-function (_do_work())
- a method to get the namespaces across which to shard the query (
get_namespaces())
"""
_NAMESPACES_PER_PIPELINE = NAMESPACES_PER_PIPELINE # 25

def __init__(self, argv, migration_history_obj, model):
super(NamespacedDatastoreMigration, self).__init__(argv, migration_history_obj, model)
self._namespaces = self.get_namespaces()
self._pipelines = self._create_pipelines(argv)

def get_namespaces(self):
query_pb = query_pb2.Query()
helper.set_kind(query_pb, "__namespace__")
client = apache_helper.get_datastore(PROJECT)
namespace_entities = apache_helper.fetch_entities(PROJECT, '', query_pb, client)

namespaces =
for n in namespace_entities:
# Get namespace name or id
key_path = n.key.path[-1]
if key_path.HasField('id'):
name_or_id = key_path.id
else:
name_or_id = key_path.name

# Avoid duplicates and test namespaces
if len(str(name_or_id)) > 1 and name_or_id not in namespaces:
namespaces.append(name_or_id)

return namespaces

def run(self):
for pipeline in self._pipelines:
pipeline.run()

def _create_pipelines(self, argv):
pipelines =
for namespaces in zip(*[iter(self._namespaces)] * self._NAMESPACES_PER_PIPELINE):
p = beam.Pipeline(argv=argv)
(
(
p | 'ReadNamespace_{}'.format(
ns
) >> datastoreio.ReadFromDatastore(
project=self.PROJECT,
query=self.query(),
namespace=ns
)
for ns in namespaces
)
| 'JoinNamespaceEntities' >> beam.Flatten()
| self.__class__.__name__ >> beam.FlatMap(self._do_work)
| self._get_sink()
)
pipelines.append(p)

return pipelines

model = "App"
NamespacedDatastoreMigration(
argv,
kwargs.get('migration_history_obj'), # Irrelevant here
model # Entity kind
).run()


where argv is:



argv = [
'--project={0}'.format(PROJECT),
'--job_name=' + name, # A human readable descriptor that's been cleaned
'--staging_location=gs://{0}/migrations/'.format(BUCKET),
'--temp_location=gs://{0}/migrations/'.format(BUCKET),
'--setup_file=./setup.py',
'--runner=DataflowRunner'
]


This is based on the subclass:



class _DatastoreMigrationBase(object):
PROJECT = PROJECT

def __init__(self, argv, migration_history_obj, model):
self.migration_history_obj = migration_history_obj

if not model:
raise Exception('This operation requires a model class name.')
self.model = model

def query(self):
# Instantiate a filter protobuf
filter_pb = query_pb2.Filter()

# Get all non-deleted model instances
helper.set_property_filter(
filter_pb,
'deleted',
query_pb2.PropertyFilter.EQUAL,
False
)

# Instantiate a query protobuf
query_pb = query_pb2.Query(
filter=filter_pb
)
helper.set_kind(query_pb, self.model)

return query_pb

def _get_source(self):
return 'DatastoreRead' >> datastoreio.ReadFromDatastore(
self.PROJECT,
self.query(),
)

@staticmethod
def _do_work(entity):
return entity

def _get_sink(self):
return 'WriteToDatastore' >> datastoreio.WriteToDatastore(
self.PROJECT
)


However, when I subclass NamespacedDatastoreMigration like so:



from ..helpers import create_argv
from ..mappers import NamespacedDatastoreMigration


class CampaignActionField(NamespacedDatastoreMigration):
@staticmethod
def _do_work(entity):
target_url = entity.properties.get('target_url').string_value
message = entity.properties.get('message').string_value
path = entity.properties.get('path').string_value
if target_url and not message and not path:
entity.properties.get('action').string_value = 'webhook'

return entity


model = "Campaign" # Entity kind
CampaignActionField(
create_argv(kwargs.get('name')), # "ED-2275 Campaign action field"
kwargs.get('migration_history_obj'), # Irrelevant here
model
).run()


and this new pipeline runs on Dataflow, it fails. At first, it starts out okay. By that I mean I see the following INFO logs:



2018-11-20 (11:02:57) Worker configuration: n1-standard-1 in us-central1-b.

2018-11-20 (11:03:15) Starting 1 workers in us-central1-b.


# SEVERAL OF THE FOLLOWING FOR DIFFERENT NAMESPACES:

2018-11-20 (11:03:15) Executing operation ReadNamespace_xxxx_1/GroupByKey/Create

2018-11-20 (11:03:17) Executing operation ReadNamespace_xxxx_1/UserQuery/Read+ReadNamespace_xxxx_1/SplitQuery+ReadNa...

2018-11-20 (11:05:58) Executing operation ReadNamespace_xxxx_1/GroupByKey/Close


And then I get this traceback:



Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 156, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
def start(self):
File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.scoped_process_state:
File "dataflow_worker/shuffle_operations.py", line 66, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.shuffle_source.reader() as reader:
File "dataflow_worker/shuffle_operations.py", line 70, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/shuffle_operations.py", line 229, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
with self.scoped_process_state:
File "dataflow_worker/shuffle_operations.py", line 236, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
self.output(wvalue.with_value((k, wvalue.value)))
File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 717, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 763, in apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
TypeError: 'Entity' object is not iterable [while running 's152-c260']


I assume this has something to do with the difference between the two _do_work() functions in NamespacedDatastoreMigration and CampaignActionField, since the former is successful while the latter fails, and _do_work() is the only difference between them (besides the entity kind being transformed). But I can't think of what exactly is going wrong and how to get around it. Does anyone have any thoughts?










share|improve this question





























    0














    I'm running the following successful Apache Beam test pipeline on Google Dataflow. It uses Datastore as a source and sink. Many entities in our database are assigned to namespaces. This pipeline is meant to perform _do_work() on all entities of a certain kind in the given namespaces. Note, a similar test pipeline that does the same thing to non-namespaced entities also runs successfully:



    import apache_beam as beam
    from apache_beam.io.gcp.datastore.v1 import helper as apache_helper
    from apache_beam.io.gcp.datastore.v1 import datastoreio
    from google.cloud.proto.datastore.v1 import query_pb2
    from googledatastore import helper

    from .pipelines.dataflow_settings import (
    PROJECT, NAMESPACES_PER_PIPELINE
    )

    class NamespacedDatastoreMigration(_DatastoreMigrationBase):
    """
    Map a do-function over a query multiplexed across several namespaces.

    The inheritor must implement the following:
    - a PROJECT class attribute
    - a do-function (_do_work())
    - a method to get the namespaces across which to shard the query (
    get_namespaces())
    """
    _NAMESPACES_PER_PIPELINE = NAMESPACES_PER_PIPELINE # 25

    def __init__(self, argv, migration_history_obj, model):
    super(NamespacedDatastoreMigration, self).__init__(argv, migration_history_obj, model)
    self._namespaces = self.get_namespaces()
    self._pipelines = self._create_pipelines(argv)

    def get_namespaces(self):
    query_pb = query_pb2.Query()
    helper.set_kind(query_pb, "__namespace__")
    client = apache_helper.get_datastore(PROJECT)
    namespace_entities = apache_helper.fetch_entities(PROJECT, '', query_pb, client)

    namespaces =
    for n in namespace_entities:
    # Get namespace name or id
    key_path = n.key.path[-1]
    if key_path.HasField('id'):
    name_or_id = key_path.id
    else:
    name_or_id = key_path.name

    # Avoid duplicates and test namespaces
    if len(str(name_or_id)) > 1 and name_or_id not in namespaces:
    namespaces.append(name_or_id)

    return namespaces

    def run(self):
    for pipeline in self._pipelines:
    pipeline.run()

    def _create_pipelines(self, argv):
    pipelines =
    for namespaces in zip(*[iter(self._namespaces)] * self._NAMESPACES_PER_PIPELINE):
    p = beam.Pipeline(argv=argv)
    (
    (
    p | 'ReadNamespace_{}'.format(
    ns
    ) >> datastoreio.ReadFromDatastore(
    project=self.PROJECT,
    query=self.query(),
    namespace=ns
    )
    for ns in namespaces
    )
    | 'JoinNamespaceEntities' >> beam.Flatten()
    | self.__class__.__name__ >> beam.FlatMap(self._do_work)
    | self._get_sink()
    )
    pipelines.append(p)

    return pipelines

    model = "App"
    NamespacedDatastoreMigration(
    argv,
    kwargs.get('migration_history_obj'), # Irrelevant here
    model # Entity kind
    ).run()


    where argv is:



    argv = [
    '--project={0}'.format(PROJECT),
    '--job_name=' + name, # A human readable descriptor that's been cleaned
    '--staging_location=gs://{0}/migrations/'.format(BUCKET),
    '--temp_location=gs://{0}/migrations/'.format(BUCKET),
    '--setup_file=./setup.py',
    '--runner=DataflowRunner'
    ]


    This is based on the subclass:



    class _DatastoreMigrationBase(object):
    PROJECT = PROJECT

    def __init__(self, argv, migration_history_obj, model):
    self.migration_history_obj = migration_history_obj

    if not model:
    raise Exception('This operation requires a model class name.')
    self.model = model

    def query(self):
    # Instantiate a filter protobuf
    filter_pb = query_pb2.Filter()

    # Get all non-deleted model instances
    helper.set_property_filter(
    filter_pb,
    'deleted',
    query_pb2.PropertyFilter.EQUAL,
    False
    )

    # Instantiate a query protobuf
    query_pb = query_pb2.Query(
    filter=filter_pb
    )
    helper.set_kind(query_pb, self.model)

    return query_pb

    def _get_source(self):
    return 'DatastoreRead' >> datastoreio.ReadFromDatastore(
    self.PROJECT,
    self.query(),
    )

    @staticmethod
    def _do_work(entity):
    return entity

    def _get_sink(self):
    return 'WriteToDatastore' >> datastoreio.WriteToDatastore(
    self.PROJECT
    )


    However, when I subclass NamespacedDatastoreMigration like so:



    from ..helpers import create_argv
    from ..mappers import NamespacedDatastoreMigration


    class CampaignActionField(NamespacedDatastoreMigration):
    @staticmethod
    def _do_work(entity):
    target_url = entity.properties.get('target_url').string_value
    message = entity.properties.get('message').string_value
    path = entity.properties.get('path').string_value
    if target_url and not message and not path:
    entity.properties.get('action').string_value = 'webhook'

    return entity


    model = "Campaign" # Entity kind
    CampaignActionField(
    create_argv(kwargs.get('name')), # "ED-2275 Campaign action field"
    kwargs.get('migration_history_obj'), # Irrelevant here
    model
    ).run()


    and this new pipeline runs on Dataflow, it fails. At first, it starts out okay. By that I mean I see the following INFO logs:



    2018-11-20 (11:02:57) Worker configuration: n1-standard-1 in us-central1-b.

    2018-11-20 (11:03:15) Starting 1 workers in us-central1-b.


    # SEVERAL OF THE FOLLOWING FOR DIFFERENT NAMESPACES:

    2018-11-20 (11:03:15) Executing operation ReadNamespace_xxxx_1/GroupByKey/Create

    2018-11-20 (11:03:17) Executing operation ReadNamespace_xxxx_1/UserQuery/Read+ReadNamespace_xxxx_1/SplitQuery+ReadNa...

    2018-11-20 (11:05:58) Executing operation ReadNamespace_xxxx_1/GroupByKey/Close


    And then I get this traceback:



    Traceback (most recent call last):
    File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
    work_executor.execute()
    File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 156, in execute
    op.start()
    File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    def start(self):
    File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    with self.scoped_start_state:
    File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    with self.scoped_process_state:
    File "dataflow_worker/shuffle_operations.py", line 66, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    with self.shuffle_source.reader() as reader:
    File "dataflow_worker/shuffle_operations.py", line 70, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    self.output(windowed_value)
    File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
    File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
    File "dataflow_worker/shuffle_operations.py", line 229, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
    with self.scoped_process_state:
    File "dataflow_worker/shuffle_operations.py", line 236, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
    self.output(wvalue.with_value((k, wvalue.value)))
    File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
    File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
    File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
    File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
    File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
    File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
    File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
    File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
    File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
    File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
    File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
    File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
    File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
    File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
    File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
    File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
    File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
    File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
    File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
    File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
    File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
    File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
    File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
    File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
    File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
    File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
    File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
    File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
    File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
    File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
    File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
    File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
    File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
    File "apache_beam/runners/common.py", line 717, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_with_traceback(new_exn)
    File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
    File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
    File "apache_beam/runners/common.py", line 763, in apache_beam.runners.common._OutputProcessor.process_outputs
    for result in results:
    TypeError: 'Entity' object is not iterable [while running 's152-c260']


    I assume this has something to do with the difference between the two _do_work() functions in NamespacedDatastoreMigration and CampaignActionField, since the former is successful while the latter fails, and _do_work() is the only difference between them (besides the entity kind being transformed). But I can't think of what exactly is going wrong and how to get around it. Does anyone have any thoughts?










    share|improve this question



























      0












      0








      0







      I'm running the following successful Apache Beam test pipeline on Google Dataflow. It uses Datastore as a source and sink. Many entities in our database are assigned to namespaces. This pipeline is meant to perform _do_work() on all entities of a certain kind in the given namespaces. Note, a similar test pipeline that does the same thing to non-namespaced entities also runs successfully:



      import apache_beam as beam
      from apache_beam.io.gcp.datastore.v1 import helper as apache_helper
      from apache_beam.io.gcp.datastore.v1 import datastoreio
      from google.cloud.proto.datastore.v1 import query_pb2
      from googledatastore import helper

      from .pipelines.dataflow_settings import (
      PROJECT, NAMESPACES_PER_PIPELINE
      )

      class NamespacedDatastoreMigration(_DatastoreMigrationBase):
      """
      Map a do-function over a query multiplexed across several namespaces.

      The inheritor must implement the following:
      - a PROJECT class attribute
      - a do-function (_do_work())
      - a method to get the namespaces across which to shard the query (
      get_namespaces())
      """
      _NAMESPACES_PER_PIPELINE = NAMESPACES_PER_PIPELINE # 25

      def __init__(self, argv, migration_history_obj, model):
      super(NamespacedDatastoreMigration, self).__init__(argv, migration_history_obj, model)
      self._namespaces = self.get_namespaces()
      self._pipelines = self._create_pipelines(argv)

      def get_namespaces(self):
      query_pb = query_pb2.Query()
      helper.set_kind(query_pb, "__namespace__")
      client = apache_helper.get_datastore(PROJECT)
      namespace_entities = apache_helper.fetch_entities(PROJECT, '', query_pb, client)

      namespaces =
      for n in namespace_entities:
      # Get namespace name or id
      key_path = n.key.path[-1]
      if key_path.HasField('id'):
      name_or_id = key_path.id
      else:
      name_or_id = key_path.name

      # Avoid duplicates and test namespaces
      if len(str(name_or_id)) > 1 and name_or_id not in namespaces:
      namespaces.append(name_or_id)

      return namespaces

      def run(self):
      for pipeline in self._pipelines:
      pipeline.run()

      def _create_pipelines(self, argv):
      pipelines =
      for namespaces in zip(*[iter(self._namespaces)] * self._NAMESPACES_PER_PIPELINE):
      p = beam.Pipeline(argv=argv)
      (
      (
      p | 'ReadNamespace_{}'.format(
      ns
      ) >> datastoreio.ReadFromDatastore(
      project=self.PROJECT,
      query=self.query(),
      namespace=ns
      )
      for ns in namespaces
      )
      | 'JoinNamespaceEntities' >> beam.Flatten()
      | self.__class__.__name__ >> beam.FlatMap(self._do_work)
      | self._get_sink()
      )
      pipelines.append(p)

      return pipelines

      model = "App"
      NamespacedDatastoreMigration(
      argv,
      kwargs.get('migration_history_obj'), # Irrelevant here
      model # Entity kind
      ).run()


      where argv is:



      argv = [
      '--project={0}'.format(PROJECT),
      '--job_name=' + name, # A human readable descriptor that's been cleaned
      '--staging_location=gs://{0}/migrations/'.format(BUCKET),
      '--temp_location=gs://{0}/migrations/'.format(BUCKET),
      '--setup_file=./setup.py',
      '--runner=DataflowRunner'
      ]


      This is based on the subclass:



      class _DatastoreMigrationBase(object):
      PROJECT = PROJECT

      def __init__(self, argv, migration_history_obj, model):
      self.migration_history_obj = migration_history_obj

      if not model:
      raise Exception('This operation requires a model class name.')
      self.model = model

      def query(self):
      # Instantiate a filter protobuf
      filter_pb = query_pb2.Filter()

      # Get all non-deleted model instances
      helper.set_property_filter(
      filter_pb,
      'deleted',
      query_pb2.PropertyFilter.EQUAL,
      False
      )

      # Instantiate a query protobuf
      query_pb = query_pb2.Query(
      filter=filter_pb
      )
      helper.set_kind(query_pb, self.model)

      return query_pb

      def _get_source(self):
      return 'DatastoreRead' >> datastoreio.ReadFromDatastore(
      self.PROJECT,
      self.query(),
      )

      @staticmethod
      def _do_work(entity):
      return entity

      def _get_sink(self):
      return 'WriteToDatastore' >> datastoreio.WriteToDatastore(
      self.PROJECT
      )


      However, when I subclass NamespacedDatastoreMigration like so:



      from ..helpers import create_argv
      from ..mappers import NamespacedDatastoreMigration


      class CampaignActionField(NamespacedDatastoreMigration):
      @staticmethod
      def _do_work(entity):
      target_url = entity.properties.get('target_url').string_value
      message = entity.properties.get('message').string_value
      path = entity.properties.get('path').string_value
      if target_url and not message and not path:
      entity.properties.get('action').string_value = 'webhook'

      return entity


      model = "Campaign" # Entity kind
      CampaignActionField(
      create_argv(kwargs.get('name')), # "ED-2275 Campaign action field"
      kwargs.get('migration_history_obj'), # Irrelevant here
      model
      ).run()


      and this new pipeline runs on Dataflow, it fails. At first, it starts out okay. By that I mean I see the following INFO logs:



      2018-11-20 (11:02:57) Worker configuration: n1-standard-1 in us-central1-b.

      2018-11-20 (11:03:15) Starting 1 workers in us-central1-b.


      # SEVERAL OF THE FOLLOWING FOR DIFFERENT NAMESPACES:

      2018-11-20 (11:03:15) Executing operation ReadNamespace_xxxx_1/GroupByKey/Create

      2018-11-20 (11:03:17) Executing operation ReadNamespace_xxxx_1/UserQuery/Read+ReadNamespace_xxxx_1/SplitQuery+ReadNa...

      2018-11-20 (11:05:58) Executing operation ReadNamespace_xxxx_1/GroupByKey/Close


      And then I get this traceback:



      Traceback (most recent call last):
      File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
      work_executor.execute()
      File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 156, in execute
      op.start()
      File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      def start(self):
      File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      with self.scoped_start_state:
      File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      with self.scoped_process_state:
      File "dataflow_worker/shuffle_operations.py", line 66, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      with self.shuffle_source.reader() as reader:
      File "dataflow_worker/shuffle_operations.py", line 70, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      self.output(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
      cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
      cython.cast(Operation, consumer).process(windowed_value)
      File "dataflow_worker/shuffle_operations.py", line 229, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
      with self.scoped_process_state:
      File "dataflow_worker/shuffle_operations.py", line 236, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
      self.output(wvalue.with_value((k, wvalue.value)))
      File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
      cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
      cython.cast(Operation, consumer).process(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
      with self.scoped_process_state:
      File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
      self.dofn_receiver.receive(o)
      File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
      self.process(windowed_value)
      File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
      self._reraise_augmented(exn)
      File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      raise
      File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
      self.do_fn_invoker.invoke_process(windowed_value)
      File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
      output_processor.process_outputs(
      File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
      self.main_receivers.receive(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
      cython.cast(Operation, consumer).process(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
      with self.scoped_process_state:
      File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
      self.dofn_receiver.receive(o)
      File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
      self.process(windowed_value)
      File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
      self._reraise_augmented(exn)
      File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      raise
      File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
      self.do_fn_invoker.invoke_process(windowed_value)
      File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
      output_processor.process_outputs(
      File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
      self.main_receivers.receive(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
      cython.cast(Operation, consumer).process(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
      with self.scoped_process_state:
      File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
      self.dofn_receiver.receive(o)
      File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
      self.process(windowed_value)
      File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
      self._reraise_augmented(exn)
      File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      raise
      File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
      self.do_fn_invoker.invoke_process(windowed_value)
      File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
      output_processor.process_outputs(
      File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
      self.main_receivers.receive(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
      cython.cast(Operation, consumer).process(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
      with self.scoped_process_state:
      File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
      self.dofn_receiver.receive(o)
      File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
      self.process(windowed_value)
      File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
      self._reraise_augmented(exn)
      File "apache_beam/runners/common.py", line 717, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      raise_with_traceback(new_exn)
      File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
      self.do_fn_invoker.invoke_process(windowed_value)
      File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
      output_processor.process_outputs(
      File "apache_beam/runners/common.py", line 763, in apache_beam.runners.common._OutputProcessor.process_outputs
      for result in results:
      TypeError: 'Entity' object is not iterable [while running 's152-c260']


      I assume this has something to do with the difference between the two _do_work() functions in NamespacedDatastoreMigration and CampaignActionField, since the former is successful while the latter fails, and _do_work() is the only difference between them (besides the entity kind being transformed). But I can't think of what exactly is going wrong and how to get around it. Does anyone have any thoughts?










      share|improve this question















      I'm running the following successful Apache Beam test pipeline on Google Dataflow. It uses Datastore as a source and sink. Many entities in our database are assigned to namespaces. This pipeline is meant to perform _do_work() on all entities of a certain kind in the given namespaces. Note, a similar test pipeline that does the same thing to non-namespaced entities also runs successfully:



      import apache_beam as beam
      from apache_beam.io.gcp.datastore.v1 import helper as apache_helper
      from apache_beam.io.gcp.datastore.v1 import datastoreio
      from google.cloud.proto.datastore.v1 import query_pb2
      from googledatastore import helper

      from .pipelines.dataflow_settings import (
      PROJECT, NAMESPACES_PER_PIPELINE
      )

      class NamespacedDatastoreMigration(_DatastoreMigrationBase):
      """
      Map a do-function over a query multiplexed across several namespaces.

      The inheritor must implement the following:
      - a PROJECT class attribute
      - a do-function (_do_work())
      - a method to get the namespaces across which to shard the query (
      get_namespaces())
      """
      _NAMESPACES_PER_PIPELINE = NAMESPACES_PER_PIPELINE # 25

      def __init__(self, argv, migration_history_obj, model):
      super(NamespacedDatastoreMigration, self).__init__(argv, migration_history_obj, model)
      self._namespaces = self.get_namespaces()
      self._pipelines = self._create_pipelines(argv)

      def get_namespaces(self):
      query_pb = query_pb2.Query()
      helper.set_kind(query_pb, "__namespace__")
      client = apache_helper.get_datastore(PROJECT)
      namespace_entities = apache_helper.fetch_entities(PROJECT, '', query_pb, client)

      namespaces =
      for n in namespace_entities:
      # Get namespace name or id
      key_path = n.key.path[-1]
      if key_path.HasField('id'):
      name_or_id = key_path.id
      else:
      name_or_id = key_path.name

      # Avoid duplicates and test namespaces
      if len(str(name_or_id)) > 1 and name_or_id not in namespaces:
      namespaces.append(name_or_id)

      return namespaces

      def run(self):
      for pipeline in self._pipelines:
      pipeline.run()

      def _create_pipelines(self, argv):
      pipelines =
      for namespaces in zip(*[iter(self._namespaces)] * self._NAMESPACES_PER_PIPELINE):
      p = beam.Pipeline(argv=argv)
      (
      (
      p | 'ReadNamespace_{}'.format(
      ns
      ) >> datastoreio.ReadFromDatastore(
      project=self.PROJECT,
      query=self.query(),
      namespace=ns
      )
      for ns in namespaces
      )
      | 'JoinNamespaceEntities' >> beam.Flatten()
      | self.__class__.__name__ >> beam.FlatMap(self._do_work)
      | self._get_sink()
      )
      pipelines.append(p)

      return pipelines

      model = "App"
      NamespacedDatastoreMigration(
      argv,
      kwargs.get('migration_history_obj'), # Irrelevant here
      model # Entity kind
      ).run()


      where argv is:



      argv = [
      '--project={0}'.format(PROJECT),
      '--job_name=' + name, # A human readable descriptor that's been cleaned
      '--staging_location=gs://{0}/migrations/'.format(BUCKET),
      '--temp_location=gs://{0}/migrations/'.format(BUCKET),
      '--setup_file=./setup.py',
      '--runner=DataflowRunner'
      ]


      This is based on the subclass:



      class _DatastoreMigrationBase(object):
      PROJECT = PROJECT

      def __init__(self, argv, migration_history_obj, model):
      self.migration_history_obj = migration_history_obj

      if not model:
      raise Exception('This operation requires a model class name.')
      self.model = model

      def query(self):
      # Instantiate a filter protobuf
      filter_pb = query_pb2.Filter()

      # Get all non-deleted model instances
      helper.set_property_filter(
      filter_pb,
      'deleted',
      query_pb2.PropertyFilter.EQUAL,
      False
      )

      # Instantiate a query protobuf
      query_pb = query_pb2.Query(
      filter=filter_pb
      )
      helper.set_kind(query_pb, self.model)

      return query_pb

      def _get_source(self):
      return 'DatastoreRead' >> datastoreio.ReadFromDatastore(
      self.PROJECT,
      self.query(),
      )

      @staticmethod
      def _do_work(entity):
      return entity

      def _get_sink(self):
      return 'WriteToDatastore' >> datastoreio.WriteToDatastore(
      self.PROJECT
      )


      However, when I subclass NamespacedDatastoreMigration like so:



      from ..helpers import create_argv
      from ..mappers import NamespacedDatastoreMigration


      class CampaignActionField(NamespacedDatastoreMigration):
      @staticmethod
      def _do_work(entity):
      target_url = entity.properties.get('target_url').string_value
      message = entity.properties.get('message').string_value
      path = entity.properties.get('path').string_value
      if target_url and not message and not path:
      entity.properties.get('action').string_value = 'webhook'

      return entity


      model = "Campaign" # Entity kind
      CampaignActionField(
      create_argv(kwargs.get('name')), # "ED-2275 Campaign action field"
      kwargs.get('migration_history_obj'), # Irrelevant here
      model
      ).run()


      and this new pipeline runs on Dataflow, it fails. At first, it starts out okay. By that I mean I see the following INFO logs:



      2018-11-20 (11:02:57) Worker configuration: n1-standard-1 in us-central1-b.

      2018-11-20 (11:03:15) Starting 1 workers in us-central1-b.


      # SEVERAL OF THE FOLLOWING FOR DIFFERENT NAMESPACES:

      2018-11-20 (11:03:15) Executing operation ReadNamespace_xxxx_1/GroupByKey/Create

      2018-11-20 (11:03:17) Executing operation ReadNamespace_xxxx_1/UserQuery/Read+ReadNamespace_xxxx_1/SplitQuery+ReadNa...

      2018-11-20 (11:05:58) Executing operation ReadNamespace_xxxx_1/GroupByKey/Close


      And then I get this traceback:



      Traceback (most recent call last):
      File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
      work_executor.execute()
      File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 156, in execute
      op.start()
      File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      def start(self):
      File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      with self.scoped_start_state:
      File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      with self.scoped_process_state:
      File "dataflow_worker/shuffle_operations.py", line 66, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      with self.shuffle_source.reader() as reader:
      File "dataflow_worker/shuffle_operations.py", line 70, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      self.output(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
      cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
      cython.cast(Operation, consumer).process(windowed_value)
      File "dataflow_worker/shuffle_operations.py", line 229, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
      with self.scoped_process_state:
      File "dataflow_worker/shuffle_operations.py", line 236, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
      self.output(wvalue.with_value((k, wvalue.value)))
      File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
      cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
      cython.cast(Operation, consumer).process(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
      with self.scoped_process_state:
      File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
      self.dofn_receiver.receive(o)
      File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
      self.process(windowed_value)
      File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
      self._reraise_augmented(exn)
      File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      raise
      File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
      self.do_fn_invoker.invoke_process(windowed_value)
      File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
      output_processor.process_outputs(
      File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
      self.main_receivers.receive(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
      cython.cast(Operation, consumer).process(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
      with self.scoped_process_state:
      File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
      self.dofn_receiver.receive(o)
      File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
      self.process(windowed_value)
      File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
      self._reraise_augmented(exn)
      File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      raise
      File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
      self.do_fn_invoker.invoke_process(windowed_value)
      File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
      output_processor.process_outputs(
      File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
      self.main_receivers.receive(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
      cython.cast(Operation, consumer).process(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
      with self.scoped_process_state:
      File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
      self.dofn_receiver.receive(o)
      File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
      self.process(windowed_value)
      File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
      self._reraise_augmented(exn)
      File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      raise
      File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
      self.do_fn_invoker.invoke_process(windowed_value)
      File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
      output_processor.process_outputs(
      File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
      self.main_receivers.receive(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
      cython.cast(Operation, consumer).process(windowed_value)
      File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
      with self.scoped_process_state:
      File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
      self.dofn_receiver.receive(o)
      File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
      self.process(windowed_value)
      File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
      self._reraise_augmented(exn)
      File "apache_beam/runners/common.py", line 717, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      raise_with_traceback(new_exn)
      File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
      self.do_fn_invoker.invoke_process(windowed_value)
      File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
      output_processor.process_outputs(
      File "apache_beam/runners/common.py", line 763, in apache_beam.runners.common._OutputProcessor.process_outputs
      for result in results:
      TypeError: 'Entity' object is not iterable [while running 's152-c260']


      I assume this has something to do with the difference between the two _do_work() functions in NamespacedDatastoreMigration and CampaignActionField, since the former is successful while the latter fails, and _do_work() is the only difference between them (besides the entity kind being transformed). But I can't think of what exactly is going wrong and how to get around it. Does anyone have any thoughts?







      python-2.7 google-cloud-platform google-cloud-dataflow apache-beam






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 23 at 10:30









      Maxim

      1,333110




      1,333110










      asked Nov 20 at 21:38









      Ryan Schuster

      377312




      377312
























          1 Answer
          1






          active

          oldest

          votes


















          1














          Turns out that changing FlatMap to Map in NamespacedDatastoreMigration's _create_pipelines method fixed this for me. I also was stupidly calling NamespacedDatastoreMigration with a non-namespaced model, which is why it was succeeding while CamapaignActionField (which uses a namespaced model) was not.






          share|improve this answer





















            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53401965%2fconfusing-error-during-apache-beams-process-outputs-on-google-dataflow-worker%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            1














            Turns out that changing FlatMap to Map in NamespacedDatastoreMigration's _create_pipelines method fixed this for me. I also was stupidly calling NamespacedDatastoreMigration with a non-namespaced model, which is why it was succeeding while CamapaignActionField (which uses a namespaced model) was not.






            share|improve this answer


























              1














              Turns out that changing FlatMap to Map in NamespacedDatastoreMigration's _create_pipelines method fixed this for me. I also was stupidly calling NamespacedDatastoreMigration with a non-namespaced model, which is why it was succeeding while CamapaignActionField (which uses a namespaced model) was not.






              share|improve this answer
























                1












                1








                1






                Turns out that changing FlatMap to Map in NamespacedDatastoreMigration's _create_pipelines method fixed this for me. I also was stupidly calling NamespacedDatastoreMigration with a non-namespaced model, which is why it was succeeding while CamapaignActionField (which uses a namespaced model) was not.






                share|improve this answer












                Turns out that changing FlatMap to Map in NamespacedDatastoreMigration's _create_pipelines method fixed this for me. I also was stupidly calling NamespacedDatastoreMigration with a non-namespaced model, which is why it was succeeding while CamapaignActionField (which uses a namespaced model) was not.







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Nov 20 at 22:59









                Ryan Schuster

                377312




                377312






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.





                    Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


                    Please pay close attention to the following guidance:


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53401965%2fconfusing-error-during-apache-beams-process-outputs-on-google-dataflow-worker%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    404 Error Contact Form 7 ajax form submitting

                    How to know if a Active Directory user can login interactively

                    Refactoring coordinates for Minecraft Pi buildings written in Python