Confusing error during Apache Beam's `process_outputs` on Google Dataflow worker
I'm running the following successful Apache Beam test pipeline on Google Dataflow. It uses Datastore as a source and sink. Many entities in our database are assigned to namespaces. This pipeline is meant to perform _do_work()
on all entities of a certain kind in the given namespaces. Note, a similar test pipeline that does the same thing to non-namespaced entities also runs successfully:
import apache_beam as beam
from apache_beam.io.gcp.datastore.v1 import helper as apache_helper
from apache_beam.io.gcp.datastore.v1 import datastoreio
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper
from .pipelines.dataflow_settings import (
PROJECT, NAMESPACES_PER_PIPELINE
)
class NamespacedDatastoreMigration(_DatastoreMigrationBase):
"""
Map a do-function over a query multiplexed across several namespaces.
The inheritor must implement the following:
- a PROJECT class attribute
- a do-function (_do_work())
- a method to get the namespaces across which to shard the query (
get_namespaces())
"""
_NAMESPACES_PER_PIPELINE = NAMESPACES_PER_PIPELINE # 25
def __init__(self, argv, migration_history_obj, model):
super(NamespacedDatastoreMigration, self).__init__(argv, migration_history_obj, model)
self._namespaces = self.get_namespaces()
self._pipelines = self._create_pipelines(argv)
def get_namespaces(self):
query_pb = query_pb2.Query()
helper.set_kind(query_pb, "__namespace__")
client = apache_helper.get_datastore(PROJECT)
namespace_entities = apache_helper.fetch_entities(PROJECT, '', query_pb, client)
namespaces =
for n in namespace_entities:
# Get namespace name or id
key_path = n.key.path[-1]
if key_path.HasField('id'):
name_or_id = key_path.id
else:
name_or_id = key_path.name
# Avoid duplicates and test namespaces
if len(str(name_or_id)) > 1 and name_or_id not in namespaces:
namespaces.append(name_or_id)
return namespaces
def run(self):
for pipeline in self._pipelines:
pipeline.run()
def _create_pipelines(self, argv):
pipelines =
for namespaces in zip(*[iter(self._namespaces)] * self._NAMESPACES_PER_PIPELINE):
p = beam.Pipeline(argv=argv)
(
(
p | 'ReadNamespace_{}'.format(
ns
) >> datastoreio.ReadFromDatastore(
project=self.PROJECT,
query=self.query(),
namespace=ns
)
for ns in namespaces
)
| 'JoinNamespaceEntities' >> beam.Flatten()
| self.__class__.__name__ >> beam.FlatMap(self._do_work)
| self._get_sink()
)
pipelines.append(p)
return pipelines
model = "App"
NamespacedDatastoreMigration(
argv,
kwargs.get('migration_history_obj'), # Irrelevant here
model # Entity kind
).run()
where argv is:
argv = [
'--project={0}'.format(PROJECT),
'--job_name=' + name, # A human readable descriptor that's been cleaned
'--staging_location=gs://{0}/migrations/'.format(BUCKET),
'--temp_location=gs://{0}/migrations/'.format(BUCKET),
'--setup_file=./setup.py',
'--runner=DataflowRunner'
]
This is based on the subclass:
class _DatastoreMigrationBase(object):
PROJECT = PROJECT
def __init__(self, argv, migration_history_obj, model):
self.migration_history_obj = migration_history_obj
if not model:
raise Exception('This operation requires a model class name.')
self.model = model
def query(self):
# Instantiate a filter protobuf
filter_pb = query_pb2.Filter()
# Get all non-deleted model instances
helper.set_property_filter(
filter_pb,
'deleted',
query_pb2.PropertyFilter.EQUAL,
False
)
# Instantiate a query protobuf
query_pb = query_pb2.Query(
filter=filter_pb
)
helper.set_kind(query_pb, self.model)
return query_pb
def _get_source(self):
return 'DatastoreRead' >> datastoreio.ReadFromDatastore(
self.PROJECT,
self.query(),
)
@staticmethod
def _do_work(entity):
return entity
def _get_sink(self):
return 'WriteToDatastore' >> datastoreio.WriteToDatastore(
self.PROJECT
)
However, when I subclass NamespacedDatastoreMigration
like so:
from ..helpers import create_argv
from ..mappers import NamespacedDatastoreMigration
class CampaignActionField(NamespacedDatastoreMigration):
@staticmethod
def _do_work(entity):
target_url = entity.properties.get('target_url').string_value
message = entity.properties.get('message').string_value
path = entity.properties.get('path').string_value
if target_url and not message and not path:
entity.properties.get('action').string_value = 'webhook'
return entity
model = "Campaign" # Entity kind
CampaignActionField(
create_argv(kwargs.get('name')), # "ED-2275 Campaign action field"
kwargs.get('migration_history_obj'), # Irrelevant here
model
).run()
and this new pipeline runs on Dataflow, it fails. At first, it starts out okay. By that I mean I see the following INFO logs:
2018-11-20 (11:02:57) Worker configuration: n1-standard-1 in us-central1-b.
2018-11-20 (11:03:15) Starting 1 workers in us-central1-b.
# SEVERAL OF THE FOLLOWING FOR DIFFERENT NAMESPACES:
2018-11-20 (11:03:15) Executing operation ReadNamespace_xxxx_1/GroupByKey/Create
2018-11-20 (11:03:17) Executing operation ReadNamespace_xxxx_1/UserQuery/Read+ReadNamespace_xxxx_1/SplitQuery+ReadNa...
2018-11-20 (11:05:58) Executing operation ReadNamespace_xxxx_1/GroupByKey/Close
And then I get this traceback:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 156, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
def start(self):
File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.scoped_process_state:
File "dataflow_worker/shuffle_operations.py", line 66, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.shuffle_source.reader() as reader:
File "dataflow_worker/shuffle_operations.py", line 70, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/shuffle_operations.py", line 229, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
with self.scoped_process_state:
File "dataflow_worker/shuffle_operations.py", line 236, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
self.output(wvalue.with_value((k, wvalue.value)))
File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 717, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 763, in apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
TypeError: 'Entity' object is not iterable [while running 's152-c260']
I assume this has something to do with the difference between the two _do_work()
functions in NamespacedDatastoreMigration
and CampaignActionField
, since the former is successful while the latter fails, and _do_work()
is the only difference between them (besides the entity kind being transformed). But I can't think of what exactly is going wrong and how to get around it. Does anyone have any thoughts?
python-2.7 google-cloud-platform google-cloud-dataflow apache-beam
add a comment |
I'm running the following successful Apache Beam test pipeline on Google Dataflow. It uses Datastore as a source and sink. Many entities in our database are assigned to namespaces. This pipeline is meant to perform _do_work()
on all entities of a certain kind in the given namespaces. Note, a similar test pipeline that does the same thing to non-namespaced entities also runs successfully:
import apache_beam as beam
from apache_beam.io.gcp.datastore.v1 import helper as apache_helper
from apache_beam.io.gcp.datastore.v1 import datastoreio
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper
from .pipelines.dataflow_settings import (
PROJECT, NAMESPACES_PER_PIPELINE
)
class NamespacedDatastoreMigration(_DatastoreMigrationBase):
"""
Map a do-function over a query multiplexed across several namespaces.
The inheritor must implement the following:
- a PROJECT class attribute
- a do-function (_do_work())
- a method to get the namespaces across which to shard the query (
get_namespaces())
"""
_NAMESPACES_PER_PIPELINE = NAMESPACES_PER_PIPELINE # 25
def __init__(self, argv, migration_history_obj, model):
super(NamespacedDatastoreMigration, self).__init__(argv, migration_history_obj, model)
self._namespaces = self.get_namespaces()
self._pipelines = self._create_pipelines(argv)
def get_namespaces(self):
query_pb = query_pb2.Query()
helper.set_kind(query_pb, "__namespace__")
client = apache_helper.get_datastore(PROJECT)
namespace_entities = apache_helper.fetch_entities(PROJECT, '', query_pb, client)
namespaces =
for n in namespace_entities:
# Get namespace name or id
key_path = n.key.path[-1]
if key_path.HasField('id'):
name_or_id = key_path.id
else:
name_or_id = key_path.name
# Avoid duplicates and test namespaces
if len(str(name_or_id)) > 1 and name_or_id not in namespaces:
namespaces.append(name_or_id)
return namespaces
def run(self):
for pipeline in self._pipelines:
pipeline.run()
def _create_pipelines(self, argv):
pipelines =
for namespaces in zip(*[iter(self._namespaces)] * self._NAMESPACES_PER_PIPELINE):
p = beam.Pipeline(argv=argv)
(
(
p | 'ReadNamespace_{}'.format(
ns
) >> datastoreio.ReadFromDatastore(
project=self.PROJECT,
query=self.query(),
namespace=ns
)
for ns in namespaces
)
| 'JoinNamespaceEntities' >> beam.Flatten()
| self.__class__.__name__ >> beam.FlatMap(self._do_work)
| self._get_sink()
)
pipelines.append(p)
return pipelines
model = "App"
NamespacedDatastoreMigration(
argv,
kwargs.get('migration_history_obj'), # Irrelevant here
model # Entity kind
).run()
where argv is:
argv = [
'--project={0}'.format(PROJECT),
'--job_name=' + name, # A human readable descriptor that's been cleaned
'--staging_location=gs://{0}/migrations/'.format(BUCKET),
'--temp_location=gs://{0}/migrations/'.format(BUCKET),
'--setup_file=./setup.py',
'--runner=DataflowRunner'
]
This is based on the subclass:
class _DatastoreMigrationBase(object):
PROJECT = PROJECT
def __init__(self, argv, migration_history_obj, model):
self.migration_history_obj = migration_history_obj
if not model:
raise Exception('This operation requires a model class name.')
self.model = model
def query(self):
# Instantiate a filter protobuf
filter_pb = query_pb2.Filter()
# Get all non-deleted model instances
helper.set_property_filter(
filter_pb,
'deleted',
query_pb2.PropertyFilter.EQUAL,
False
)
# Instantiate a query protobuf
query_pb = query_pb2.Query(
filter=filter_pb
)
helper.set_kind(query_pb, self.model)
return query_pb
def _get_source(self):
return 'DatastoreRead' >> datastoreio.ReadFromDatastore(
self.PROJECT,
self.query(),
)
@staticmethod
def _do_work(entity):
return entity
def _get_sink(self):
return 'WriteToDatastore' >> datastoreio.WriteToDatastore(
self.PROJECT
)
However, when I subclass NamespacedDatastoreMigration
like so:
from ..helpers import create_argv
from ..mappers import NamespacedDatastoreMigration
class CampaignActionField(NamespacedDatastoreMigration):
@staticmethod
def _do_work(entity):
target_url = entity.properties.get('target_url').string_value
message = entity.properties.get('message').string_value
path = entity.properties.get('path').string_value
if target_url and not message and not path:
entity.properties.get('action').string_value = 'webhook'
return entity
model = "Campaign" # Entity kind
CampaignActionField(
create_argv(kwargs.get('name')), # "ED-2275 Campaign action field"
kwargs.get('migration_history_obj'), # Irrelevant here
model
).run()
and this new pipeline runs on Dataflow, it fails. At first, it starts out okay. By that I mean I see the following INFO logs:
2018-11-20 (11:02:57) Worker configuration: n1-standard-1 in us-central1-b.
2018-11-20 (11:03:15) Starting 1 workers in us-central1-b.
# SEVERAL OF THE FOLLOWING FOR DIFFERENT NAMESPACES:
2018-11-20 (11:03:15) Executing operation ReadNamespace_xxxx_1/GroupByKey/Create
2018-11-20 (11:03:17) Executing operation ReadNamespace_xxxx_1/UserQuery/Read+ReadNamespace_xxxx_1/SplitQuery+ReadNa...
2018-11-20 (11:05:58) Executing operation ReadNamespace_xxxx_1/GroupByKey/Close
And then I get this traceback:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 156, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
def start(self):
File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.scoped_process_state:
File "dataflow_worker/shuffle_operations.py", line 66, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.shuffle_source.reader() as reader:
File "dataflow_worker/shuffle_operations.py", line 70, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/shuffle_operations.py", line 229, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
with self.scoped_process_state:
File "dataflow_worker/shuffle_operations.py", line 236, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
self.output(wvalue.with_value((k, wvalue.value)))
File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 717, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 763, in apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
TypeError: 'Entity' object is not iterable [while running 's152-c260']
I assume this has something to do with the difference between the two _do_work()
functions in NamespacedDatastoreMigration
and CampaignActionField
, since the former is successful while the latter fails, and _do_work()
is the only difference between them (besides the entity kind being transformed). But I can't think of what exactly is going wrong and how to get around it. Does anyone have any thoughts?
python-2.7 google-cloud-platform google-cloud-dataflow apache-beam
add a comment |
I'm running the following successful Apache Beam test pipeline on Google Dataflow. It uses Datastore as a source and sink. Many entities in our database are assigned to namespaces. This pipeline is meant to perform _do_work()
on all entities of a certain kind in the given namespaces. Note, a similar test pipeline that does the same thing to non-namespaced entities also runs successfully:
import apache_beam as beam
from apache_beam.io.gcp.datastore.v1 import helper as apache_helper
from apache_beam.io.gcp.datastore.v1 import datastoreio
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper
from .pipelines.dataflow_settings import (
PROJECT, NAMESPACES_PER_PIPELINE
)
class NamespacedDatastoreMigration(_DatastoreMigrationBase):
"""
Map a do-function over a query multiplexed across several namespaces.
The inheritor must implement the following:
- a PROJECT class attribute
- a do-function (_do_work())
- a method to get the namespaces across which to shard the query (
get_namespaces())
"""
_NAMESPACES_PER_PIPELINE = NAMESPACES_PER_PIPELINE # 25
def __init__(self, argv, migration_history_obj, model):
super(NamespacedDatastoreMigration, self).__init__(argv, migration_history_obj, model)
self._namespaces = self.get_namespaces()
self._pipelines = self._create_pipelines(argv)
def get_namespaces(self):
query_pb = query_pb2.Query()
helper.set_kind(query_pb, "__namespace__")
client = apache_helper.get_datastore(PROJECT)
namespace_entities = apache_helper.fetch_entities(PROJECT, '', query_pb, client)
namespaces =
for n in namespace_entities:
# Get namespace name or id
key_path = n.key.path[-1]
if key_path.HasField('id'):
name_or_id = key_path.id
else:
name_or_id = key_path.name
# Avoid duplicates and test namespaces
if len(str(name_or_id)) > 1 and name_or_id not in namespaces:
namespaces.append(name_or_id)
return namespaces
def run(self):
for pipeline in self._pipelines:
pipeline.run()
def _create_pipelines(self, argv):
pipelines =
for namespaces in zip(*[iter(self._namespaces)] * self._NAMESPACES_PER_PIPELINE):
p = beam.Pipeline(argv=argv)
(
(
p | 'ReadNamespace_{}'.format(
ns
) >> datastoreio.ReadFromDatastore(
project=self.PROJECT,
query=self.query(),
namespace=ns
)
for ns in namespaces
)
| 'JoinNamespaceEntities' >> beam.Flatten()
| self.__class__.__name__ >> beam.FlatMap(self._do_work)
| self._get_sink()
)
pipelines.append(p)
return pipelines
model = "App"
NamespacedDatastoreMigration(
argv,
kwargs.get('migration_history_obj'), # Irrelevant here
model # Entity kind
).run()
where argv is:
argv = [
'--project={0}'.format(PROJECT),
'--job_name=' + name, # A human readable descriptor that's been cleaned
'--staging_location=gs://{0}/migrations/'.format(BUCKET),
'--temp_location=gs://{0}/migrations/'.format(BUCKET),
'--setup_file=./setup.py',
'--runner=DataflowRunner'
]
This is based on the subclass:
class _DatastoreMigrationBase(object):
PROJECT = PROJECT
def __init__(self, argv, migration_history_obj, model):
self.migration_history_obj = migration_history_obj
if not model:
raise Exception('This operation requires a model class name.')
self.model = model
def query(self):
# Instantiate a filter protobuf
filter_pb = query_pb2.Filter()
# Get all non-deleted model instances
helper.set_property_filter(
filter_pb,
'deleted',
query_pb2.PropertyFilter.EQUAL,
False
)
# Instantiate a query protobuf
query_pb = query_pb2.Query(
filter=filter_pb
)
helper.set_kind(query_pb, self.model)
return query_pb
def _get_source(self):
return 'DatastoreRead' >> datastoreio.ReadFromDatastore(
self.PROJECT,
self.query(),
)
@staticmethod
def _do_work(entity):
return entity
def _get_sink(self):
return 'WriteToDatastore' >> datastoreio.WriteToDatastore(
self.PROJECT
)
However, when I subclass NamespacedDatastoreMigration
like so:
from ..helpers import create_argv
from ..mappers import NamespacedDatastoreMigration
class CampaignActionField(NamespacedDatastoreMigration):
@staticmethod
def _do_work(entity):
target_url = entity.properties.get('target_url').string_value
message = entity.properties.get('message').string_value
path = entity.properties.get('path').string_value
if target_url and not message and not path:
entity.properties.get('action').string_value = 'webhook'
return entity
model = "Campaign" # Entity kind
CampaignActionField(
create_argv(kwargs.get('name')), # "ED-2275 Campaign action field"
kwargs.get('migration_history_obj'), # Irrelevant here
model
).run()
and this new pipeline runs on Dataflow, it fails. At first, it starts out okay. By that I mean I see the following INFO logs:
2018-11-20 (11:02:57) Worker configuration: n1-standard-1 in us-central1-b.
2018-11-20 (11:03:15) Starting 1 workers in us-central1-b.
# SEVERAL OF THE FOLLOWING FOR DIFFERENT NAMESPACES:
2018-11-20 (11:03:15) Executing operation ReadNamespace_xxxx_1/GroupByKey/Create
2018-11-20 (11:03:17) Executing operation ReadNamespace_xxxx_1/UserQuery/Read+ReadNamespace_xxxx_1/SplitQuery+ReadNa...
2018-11-20 (11:05:58) Executing operation ReadNamespace_xxxx_1/GroupByKey/Close
And then I get this traceback:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 156, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
def start(self):
File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.scoped_process_state:
File "dataflow_worker/shuffle_operations.py", line 66, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.shuffle_source.reader() as reader:
File "dataflow_worker/shuffle_operations.py", line 70, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/shuffle_operations.py", line 229, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
with self.scoped_process_state:
File "dataflow_worker/shuffle_operations.py", line 236, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
self.output(wvalue.with_value((k, wvalue.value)))
File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 717, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 763, in apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
TypeError: 'Entity' object is not iterable [while running 's152-c260']
I assume this has something to do with the difference between the two _do_work()
functions in NamespacedDatastoreMigration
and CampaignActionField
, since the former is successful while the latter fails, and _do_work()
is the only difference between them (besides the entity kind being transformed). But I can't think of what exactly is going wrong and how to get around it. Does anyone have any thoughts?
python-2.7 google-cloud-platform google-cloud-dataflow apache-beam
I'm running the following successful Apache Beam test pipeline on Google Dataflow. It uses Datastore as a source and sink. Many entities in our database are assigned to namespaces. This pipeline is meant to perform _do_work()
on all entities of a certain kind in the given namespaces. Note, a similar test pipeline that does the same thing to non-namespaced entities also runs successfully:
import apache_beam as beam
from apache_beam.io.gcp.datastore.v1 import helper as apache_helper
from apache_beam.io.gcp.datastore.v1 import datastoreio
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper
from .pipelines.dataflow_settings import (
PROJECT, NAMESPACES_PER_PIPELINE
)
class NamespacedDatastoreMigration(_DatastoreMigrationBase):
"""
Map a do-function over a query multiplexed across several namespaces.
The inheritor must implement the following:
- a PROJECT class attribute
- a do-function (_do_work())
- a method to get the namespaces across which to shard the query (
get_namespaces())
"""
_NAMESPACES_PER_PIPELINE = NAMESPACES_PER_PIPELINE # 25
def __init__(self, argv, migration_history_obj, model):
super(NamespacedDatastoreMigration, self).__init__(argv, migration_history_obj, model)
self._namespaces = self.get_namespaces()
self._pipelines = self._create_pipelines(argv)
def get_namespaces(self):
query_pb = query_pb2.Query()
helper.set_kind(query_pb, "__namespace__")
client = apache_helper.get_datastore(PROJECT)
namespace_entities = apache_helper.fetch_entities(PROJECT, '', query_pb, client)
namespaces =
for n in namespace_entities:
# Get namespace name or id
key_path = n.key.path[-1]
if key_path.HasField('id'):
name_or_id = key_path.id
else:
name_or_id = key_path.name
# Avoid duplicates and test namespaces
if len(str(name_or_id)) > 1 and name_or_id not in namespaces:
namespaces.append(name_or_id)
return namespaces
def run(self):
for pipeline in self._pipelines:
pipeline.run()
def _create_pipelines(self, argv):
pipelines =
for namespaces in zip(*[iter(self._namespaces)] * self._NAMESPACES_PER_PIPELINE):
p = beam.Pipeline(argv=argv)
(
(
p | 'ReadNamespace_{}'.format(
ns
) >> datastoreio.ReadFromDatastore(
project=self.PROJECT,
query=self.query(),
namespace=ns
)
for ns in namespaces
)
| 'JoinNamespaceEntities' >> beam.Flatten()
| self.__class__.__name__ >> beam.FlatMap(self._do_work)
| self._get_sink()
)
pipelines.append(p)
return pipelines
model = "App"
NamespacedDatastoreMigration(
argv,
kwargs.get('migration_history_obj'), # Irrelevant here
model # Entity kind
).run()
where argv is:
argv = [
'--project={0}'.format(PROJECT),
'--job_name=' + name, # A human readable descriptor that's been cleaned
'--staging_location=gs://{0}/migrations/'.format(BUCKET),
'--temp_location=gs://{0}/migrations/'.format(BUCKET),
'--setup_file=./setup.py',
'--runner=DataflowRunner'
]
This is based on the subclass:
class _DatastoreMigrationBase(object):
PROJECT = PROJECT
def __init__(self, argv, migration_history_obj, model):
self.migration_history_obj = migration_history_obj
if not model:
raise Exception('This operation requires a model class name.')
self.model = model
def query(self):
# Instantiate a filter protobuf
filter_pb = query_pb2.Filter()
# Get all non-deleted model instances
helper.set_property_filter(
filter_pb,
'deleted',
query_pb2.PropertyFilter.EQUAL,
False
)
# Instantiate a query protobuf
query_pb = query_pb2.Query(
filter=filter_pb
)
helper.set_kind(query_pb, self.model)
return query_pb
def _get_source(self):
return 'DatastoreRead' >> datastoreio.ReadFromDatastore(
self.PROJECT,
self.query(),
)
@staticmethod
def _do_work(entity):
return entity
def _get_sink(self):
return 'WriteToDatastore' >> datastoreio.WriteToDatastore(
self.PROJECT
)
However, when I subclass NamespacedDatastoreMigration
like so:
from ..helpers import create_argv
from ..mappers import NamespacedDatastoreMigration
class CampaignActionField(NamespacedDatastoreMigration):
@staticmethod
def _do_work(entity):
target_url = entity.properties.get('target_url').string_value
message = entity.properties.get('message').string_value
path = entity.properties.get('path').string_value
if target_url and not message and not path:
entity.properties.get('action').string_value = 'webhook'
return entity
model = "Campaign" # Entity kind
CampaignActionField(
create_argv(kwargs.get('name')), # "ED-2275 Campaign action field"
kwargs.get('migration_history_obj'), # Irrelevant here
model
).run()
and this new pipeline runs on Dataflow, it fails. At first, it starts out okay. By that I mean I see the following INFO logs:
2018-11-20 (11:02:57) Worker configuration: n1-standard-1 in us-central1-b.
2018-11-20 (11:03:15) Starting 1 workers in us-central1-b.
# SEVERAL OF THE FOLLOWING FOR DIFFERENT NAMESPACES:
2018-11-20 (11:03:15) Executing operation ReadNamespace_xxxx_1/GroupByKey/Create
2018-11-20 (11:03:17) Executing operation ReadNamespace_xxxx_1/UserQuery/Read+ReadNamespace_xxxx_1/SplitQuery+ReadNa...
2018-11-20 (11:05:58) Executing operation ReadNamespace_xxxx_1/GroupByKey/Close
And then I get this traceback:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
work_executor.execute()
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 156, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
def start(self):
File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.scoped_start_state:
File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.scoped_process_state:
File "dataflow_worker/shuffle_operations.py", line 66, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
with self.shuffle_source.reader() as reader:
File "dataflow_worker/shuffle_operations.py", line 70, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
self.output(windowed_value)
File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/shuffle_operations.py", line 229, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
with self.scoped_process_state:
File "dataflow_worker/shuffle_operations.py", line 236, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
self.output(wvalue.with_value((k, wvalue.value)))
File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
cython.cast(Operation, consumer).process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 717, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 763, in apache_beam.runners.common._OutputProcessor.process_outputs
for result in results:
TypeError: 'Entity' object is not iterable [while running 's152-c260']
I assume this has something to do with the difference between the two _do_work()
functions in NamespacedDatastoreMigration
and CampaignActionField
, since the former is successful while the latter fails, and _do_work()
is the only difference between them (besides the entity kind being transformed). But I can't think of what exactly is going wrong and how to get around it. Does anyone have any thoughts?
python-2.7 google-cloud-platform google-cloud-dataflow apache-beam
python-2.7 google-cloud-platform google-cloud-dataflow apache-beam
edited Nov 23 at 10:30
Maxim
1,333110
1,333110
asked Nov 20 at 21:38
Ryan Schuster
377312
377312
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
Turns out that changing FlatMap
to Map
in NamespacedDatastoreMigration
's _create_pipelines
method fixed this for me. I also was stupidly calling NamespacedDatastoreMigration
with a non-namespaced model, which is why it was succeeding while CamapaignActionField
(which uses a namespaced model) was not.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53401965%2fconfusing-error-during-apache-beams-process-outputs-on-google-dataflow-worker%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Turns out that changing FlatMap
to Map
in NamespacedDatastoreMigration
's _create_pipelines
method fixed this for me. I also was stupidly calling NamespacedDatastoreMigration
with a non-namespaced model, which is why it was succeeding while CamapaignActionField
(which uses a namespaced model) was not.
add a comment |
Turns out that changing FlatMap
to Map
in NamespacedDatastoreMigration
's _create_pipelines
method fixed this for me. I also was stupidly calling NamespacedDatastoreMigration
with a non-namespaced model, which is why it was succeeding while CamapaignActionField
(which uses a namespaced model) was not.
add a comment |
Turns out that changing FlatMap
to Map
in NamespacedDatastoreMigration
's _create_pipelines
method fixed this for me. I also was stupidly calling NamespacedDatastoreMigration
with a non-namespaced model, which is why it was succeeding while CamapaignActionField
(which uses a namespaced model) was not.
Turns out that changing FlatMap
to Map
in NamespacedDatastoreMigration
's _create_pipelines
method fixed this for me. I also was stupidly calling NamespacedDatastoreMigration
with a non-namespaced model, which is why it was succeeding while CamapaignActionField
(which uses a namespaced model) was not.
answered Nov 20 at 22:59
Ryan Schuster
377312
377312
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53401965%2fconfusing-error-during-apache-beams-process-outputs-on-google-dataflow-worker%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown