Skip to content

Commit d6172eb

Browse files
committed
Use registered type for Row
* Introduce register_row to register with both coder and schema registry Save schema registry id->type mapping
1 parent f300e59 commit d6172eb

9 files changed

Lines changed: 75 additions & 31 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 1
3+
"modification": 16
44
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 1
3+
"modification": 17
44
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 1
3+
"modification": 3
44
}

sdks/python/apache_beam/coders/typecoders.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ def _normalize_typehint_type(typehint_type):
124124
def register_coder(
125125
self, typehint_type: Any,
126126
typehint_coder_class: Type[coders.Coder]) -> None:
127+
"Register a user type with a coder"
127128
if not isinstance(typehint_coder_class, type):
128129
raise TypeError(
129130
'Coder registration requires a coder class object. '
@@ -133,6 +134,21 @@ def register_coder(
133134
self._register_coder_internal(
134135
self._normalize_typehint_type(typehint_type), typehint_coder_class)
135136

137+
def register_row(self, typehint_type: Any) -> None:
138+
"""
139+
Register a user type with a Beam Row.
140+
141+
This registers the type with a RowCoder and register its schema.
142+
"""
143+
from apache_beam.coders import RowCoder
144+
from apache_beam.typehints.schemas import typing_to_runner_api
145+
146+
# Register with row coder
147+
self.register_coder(typehint_type, RowCoder)
148+
# This call generated a schema id for the type and register it with
149+
# schema registry
150+
typing_to_runner_api(typehint_type)
151+
136152
def get_coder(self, typehint: Any) -> coders.Coder:
137153
if typehint and typehint.__module__ == '__main__':
138154
# See https://github.com/apache/beam/issues/21541

sdks/python/apache_beam/internal/cloudpickle_pickler.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,20 +256,27 @@ def dump_session(file_path):
256256
# dump supported Beam Registries (currently only logical type registry)
257257
from apache_beam.coders import typecoders
258258
from apache_beam.typehints import schemas
259+
from apache_beam.typehints.schema_registry import SCHEMA_REGISTRY
259260

260261
with _pickle_lock, open(file_path, 'wb') as file:
261262
coder_reg = typecoders.registry.get_custom_type_coder_tuples()
262263
logical_type_reg = schemas.LogicalType._known_logical_types.copy_custom()
264+
schema_reg = SCHEMA_REGISTRY.get_registered_typings()
263265

264266
pickler = cloudpickle.CloudPickler(file)
265267
# TODO(https://github.com/apache/beam/issues/18500) add file system registry
266268
# once implemented
267-
pickler.dump({"coder": coder_reg, "logical_type": logical_type_reg})
269+
pickler.dump({
270+
"coder": coder_reg,
271+
"logical_type": logical_type_reg,
272+
"schema": schema_reg
273+
})
268274

269275

270276
def load_session(file_path):
271277
from apache_beam.coders import typecoders
272278
from apache_beam.typehints import schemas
279+
from apache_beam.typehints.schema_registry import SCHEMA_REGISTRY
273280

274281
with _pickle_lock, open(file_path, 'rb') as file:
275282
registries = cloudpickle.load(file)
@@ -284,3 +291,7 @@ def load_session(file_path):
284291
schemas.LogicalType._known_logical_types.load(registries["logical_type"])
285292
else:
286293
_LOGGER.warning('No logical type registry found in saved session')
294+
if "schema" in registries:
295+
SCHEMA_REGISTRY.load_registered_typings(registries["schema"])
296+
else:
297+
_LOGGER.warning('No schema registry found in saved session')

sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import apache_beam as beam
3232
from apache_beam import coders
33+
from apache_beam.io import jdbc
3334
from apache_beam.io.jdbc import ReadFromJdbc
3435
from apache_beam.io.jdbc import WriteToJdbc
3536
from apache_beam.options.pipeline_options import StandardOptions
@@ -64,7 +65,7 @@
6465
("f_timestamp", Timestamp), ("f_decimal", Decimal),
6566
("f_date", datetime.date), ("f_time", datetime.time)],
6667
)
67-
coders.registry.register_coder(JdbcTestRow, coders.RowCoder)
68+
coders.registry.register_row(JdbcTestRow)
6869

6970
CustomSchemaRow = typing.NamedTuple(
7071
"CustomSchemaRow",
@@ -82,11 +83,17 @@
8283
("renamed_time", datetime.time),
8384
],
8485
)
85-
coders.registry.register_coder(CustomSchemaRow, coders.RowCoder)
86+
87+
# Need to put inside enforce_millis_instant_for_timestamp context to align
88+
# with the same setup in ReadFromJdbc.__init__. Remove once Beam moved to
89+
# micros instant for timestamp
90+
# Alternatively, use coders.registry.register_coder(CustomSchemaRow, RowCoder)
91+
with jdbc.enforce_millis_instant_for_timestamp():
92+
coders.registry.register_row(CustomSchemaRow)
8693

8794
SimpleRow = typing.NamedTuple(
8895
"SimpleRow", [("id", int), ("name", str), ("value", float)])
89-
coders.registry.register_coder(SimpleRow, coders.RowCoder)
96+
coders.registry.register_row(SimpleRow)
9097

9198

9299
@pytest.mark.uses_gcp_java_expansion_service

sdks/python/apache_beam/typehints/row_type_test.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,18 @@ class RowTypeTest(unittest.TestCase):
3333
@staticmethod
3434
def _check_key_type_and_count(x) -> int:
3535
key_type = type(x[0])
36-
if not row_type._user_type_is_generated(key_type):
37-
raise RuntimeError("Expect type after GBK to be generated user type")
36+
if row_type._user_type_is_generated(key_type):
37+
raise RuntimeError("Type after GBK not preserved, get generated type")
38+
if not hasattr(key_type, row_type._BEAM_SCHEMA_ID):
39+
raise RuntimeError("Type after GBK missing Beam schema ID")
3840

3941
return len(x[1])
4042

4143
def test_group_by_key_namedtuple(self):
4244
MyNamedTuple = typing.NamedTuple(
4345
"MyNamedTuple", [("id", int), ("name", str)])
4446

45-
beam.coders.typecoders.registry.register_coder(
46-
MyNamedTuple, beam.coders.RowCoder)
47+
beam.coders.typecoders.registry.register_row(MyNamedTuple)
4748

4849
def generate(num: int):
4950
for i in range(100):
@@ -67,8 +68,7 @@ class MyDataClass:
6768
id: int
6869
name: str
6970

70-
beam.coders.typecoders.registry.register_coder(
71-
MyDataClass, beam.coders.RowCoder)
71+
beam.coders.typecoders.registry.register_row(MyDataClass)
7272

7373
def generate(num: int):
7474
for i in range(100):
@@ -120,10 +120,8 @@ class DataClassInt:
120120
class DataClassStr(DataClassInt):
121121
name: str
122122

123-
beam.coders.typecoders.registry.register_coder(
124-
DataClassInt, beam.coders.RowCoder)
125-
beam.coders.typecoders.registry.register_coder(
126-
DataClassStr, beam.coders.RowCoder)
123+
beam.coders.typecoders.registry.register_row(DataClassInt)
124+
beam.coders.typecoders.registry.register_row(DataClassStr)
127125

128126
def generate(num: int):
129127
for i in range(10):

sdks/python/apache_beam/typehints/schema_registry.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
class SchemaTypeRegistry(object):
2727
def __init__(self):
2828
self.by_id = {}
29-
self.by_typing = {}
29+
self.by_typing = {} # currently not used
3030

3131
def generate_new_id(self):
3232
for _ in range(100):
@@ -43,6 +43,15 @@ def add(self, typing, schema):
4343
if schema.id:
4444
self.by_id[schema.id] = (typing, schema)
4545

46+
def load_registered_typings(self, by_id):
47+
for id, typing in by_id.items():
48+
if id not in self.by_id:
49+
self.by_id[id] = (typing, None)
50+
51+
def get_registered_typings(self):
52+
# Used by save_main_session, as pb2.schema isn't picklable
53+
return {k: v[0] for k, v in self.by_id.items()}
54+
4655
def get_typing_by_id(self, unique_id):
4756
if not unique_id:
4857
return None

sdks/python/apache_beam/typehints/schemas.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -588,19 +588,22 @@ def named_tuple_from_schema(self, schema: schema_pb2.Schema) -> type:
588588
descriptions[field.name] = field.description
589589
subfields.append((field.name, field_py_type))
590590

591-
user_type = NamedTuple(type_name, subfields)
592-
593-
# Define a reduce function, otherwise these types can't be pickled
594-
# (See BEAM-9574)
595-
setattr(
596-
user_type,
597-
'__reduce__',
598-
_named_tuple_reduce_method(schema.SerializeToString()))
599-
setattr(user_type, "_field_descriptions", descriptions)
600-
setattr(user_type, row_type._BEAM_SCHEMA_ID, schema.id)
601-
602-
self.schema_registry.add(user_type, schema)
603-
coders.registry.register_coder(user_type, coders.RowCoder)
591+
if schema.id in self.schema_registry.by_id:
592+
user_type = self.schema_registry.by_id[schema.id][0]
593+
else:
594+
user_type = NamedTuple(type_name, subfields)
595+
596+
# Define a reduce function, otherwise these types can't be pickled
597+
# (See BEAM-9574)
598+
setattr(
599+
user_type,
600+
'__reduce__',
601+
_named_tuple_reduce_method(schema.SerializeToString()))
602+
setattr(user_type, "_field_descriptions", descriptions)
603+
setattr(user_type, row_type._BEAM_SCHEMA_ID, schema.id)
604+
605+
self.schema_registry.add(user_type, schema)
606+
coders.registry.register_coder(user_type, coders.RowCoder)
604607

605608
return user_type
606609

0 commit comments

Comments
 (0)