Skip to content

Commit 565c862

Browse files
david-rowleytuhaihe
authored andcommitted
Add support for Postgres-XL
This adds support for XL's concept of aggregate collect functions which allows the aggregate to be partially calculated on each node, and the agg state to be passed to another node to allow the final aggregation to take place. Also added sanity checks for bogus values of FIXEDDECIMAL_SCALE Also removed accidental commit of regression test actual result files.
1 parent 3ef9f06 commit 565c862

12 files changed

Lines changed: 294 additions & 696 deletions

contrib/fixeddecimal/Makefile

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,15 @@ OBJS = fixeddecimal.o
55

66
EXTENSION = fixeddecimal
77
DATA = $(shell $(PG_CONFIG) --version | grep -qE " 9\.[5-9]| 10\.0" && \
8-
cat fixeddecimal--1.0.0_base.sql fixeddecimal--brin.sql > fixeddecimal--1.0.0.sql | echo fixeddecimal--1.0.0.sql || \
9-
cat fixeddecimal--1.0.0_base.sql > fixeddecimal--1.0.0.sql | echo fixeddecimal--1.0.0.sql)
8+
cat fixeddecimal--1.0.0_base.sql fixeddecimal--brin.sql > fixeddecimal--1.0.0.tmp | echo fixeddecimal--1.0.0.tmp || \
9+
cat fixeddecimal--1.0.0_base.sql > fixeddecimal--1.0.0.tmp | echo fixeddecimal--1.0.0.tmp) \
10+
$(shell $(PG_CONFIG) --version | grep -qE "XL" && \
11+
cat fixeddecimalaggstate.sql fixeddecimal--1.0.0.tmp fixeddecimal--xlaggs.sql > fixeddecimal--1.0.0.sql | echo fixeddecimal--1.0.0.sql || \
12+
cat fixeddecimal--1.0.0.tmp > fixeddecimal--1.0.0.sql fixeddecimal--aggs.sql | echo fixeddecimal--1.0.0.sql)
13+
1014
MODULES = fixeddecimal
1115

12-
CFLAGS=`pg_config --includedir-server`
16+
CFLAGS=`pg_config --includedir-server -UXCP`
1317

1418
TESTS = $(wildcard test/sql/*.sql)
1519
REGRESS = $(shell $(PG_CONFIG) --version | grep -qE " 9\.[5-9]| 10\.0" && \

contrib/fixeddecimal/fixeddecimal--1.0.0_base.sql

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -526,46 +526,3 @@ CREATE CAST (FIXEDDECIMAL AS NUMERIC)
526526

527527
CREATE CAST (NUMERIC AS FIXEDDECIMAL)
528528
WITH FUNCTION numeric_fixeddecimal (NUMERIC) AS ASSIGNMENT;
529-
530-
-- Aggregate Support
531-
532-
533-
CREATE FUNCTION fixeddecimal_avg_accum(INTERNAL, FIXEDDECIMAL)
534-
RETURNS INTERNAL
535-
AS 'fixeddecimal', 'fixeddecimal_avg_accum'
536-
LANGUAGE C IMMUTABLE;
537-
538-
CREATE FUNCTION fixeddecimal_sum(INTERNAL)
539-
RETURNS FIXEDDECIMAL
540-
AS 'fixeddecimal', 'fixeddecimal_sum'
541-
LANGUAGE C IMMUTABLE STRICT;
542-
543-
CREATE FUNCTION fixeddecimal_avg(INTERNAL)
544-
RETURNS FIXEDDECIMAL
545-
AS 'fixeddecimal', 'fixeddecimal_avg'
546-
LANGUAGE C IMMUTABLE STRICT;
547-
548-
CREATE AGGREGATE min(FIXEDDECIMAL) (
549-
SFUNC = fixeddecimalsmaller,
550-
STYPE = FIXEDDECIMAL,
551-
SORTOP = <
552-
);
553-
554-
CREATE AGGREGATE max(FIXEDDECIMAL) (
555-
SFUNC = fixeddecimallarger,
556-
STYPE = FIXEDDECIMAL,
557-
SORTOP = >
558-
);
559-
560-
CREATE AGGREGATE sum(FIXEDDECIMAL) (
561-
SFUNC = fixeddecimal_avg_accum,
562-
FINALFUNC = fixeddecimal_sum,
563-
STYPE = INTERNAL
564-
);
565-
566-
CREATE AGGREGATE avg(FIXEDDECIMAL) (
567-
SFUNC = fixeddecimal_avg_accum,
568-
FINALFUNC = fixeddecimal_avg,
569-
STYPE = INTERNAL
570-
);
571-
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
2+
-- Aggregate Support
3+
4+
5+
CREATE FUNCTION fixeddecimal_avg_accum(INTERNAL, FIXEDDECIMAL)
6+
RETURNS INTERNAL
7+
AS 'fixeddecimal', 'fixeddecimal_avg_accum'
8+
LANGUAGE C IMMUTABLE;
9+
10+
CREATE FUNCTION fixeddecimal_sum(INTERNAL)
11+
RETURNS FIXEDDECIMAL
12+
AS 'fixeddecimal', 'fixeddecimal_sum'
13+
LANGUAGE C IMMUTABLE STRICT;
14+
15+
CREATE FUNCTION fixeddecimal_avg(INTERNAL)
16+
RETURNS FIXEDDECIMAL
17+
AS 'fixeddecimal', 'fixeddecimal_avg'
18+
LANGUAGE C IMMUTABLE STRICT;
19+
20+
CREATE AGGREGATE min(FIXEDDECIMAL) (
21+
SFUNC = fixeddecimalsmaller,
22+
STYPE = FIXEDDECIMAL,
23+
SORTOP = <
24+
);
25+
26+
CREATE AGGREGATE max(FIXEDDECIMAL) (
27+
SFUNC = fixeddecimallarger,
28+
STYPE = FIXEDDECIMAL,
29+
SORTOP = >
30+
);
31+
32+
CREATE AGGREGATE sum(FIXEDDECIMAL) (
33+
SFUNC = fixeddecimal_avg_accum,
34+
FINALFUNC = fixeddecimal_sum,
35+
STYPE = INTERNAL
36+
);
37+
38+
CREATE AGGREGATE avg(FIXEDDECIMAL) (
39+
SFUNC = fixeddecimal_avg_accum,
40+
FINALFUNC = fixeddecimal_avg,
41+
STYPE = INTERNAL
42+
);
43+
44+
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
2+
-- Aggregate Support
3+
4+
5+
CREATE FUNCTION fixeddecimalaggstatecombine(FIXEDDECIMALAGGSTATE, INTERNAL)
6+
RETURNS FIXEDDECIMALAGGSTATE
7+
AS 'fixeddecimal', 'fixeddecimalaggstatecombine'
8+
LANGUAGE C IMMUTABLE;
9+
10+
CREATE FUNCTION fixeddecimal_avg_accum(INTERNAL, FIXEDDECIMAL)
11+
RETURNS INTERNAL
12+
AS 'fixeddecimal', 'fixeddecimal_avg_accum'
13+
LANGUAGE C IMMUTABLE;
14+
15+
CREATE FUNCTION fixeddecimal_sum(INTERNAL)
16+
RETURNS FIXEDDECIMAL
17+
AS 'fixeddecimal', 'fixeddecimal_sum'
18+
LANGUAGE C IMMUTABLE STRICT;
19+
20+
CREATE FUNCTION fixeddecimal_avg(INTERNAL)
21+
RETURNS FIXEDDECIMAL
22+
AS 'fixeddecimal', 'fixeddecimal_avg'
23+
LANGUAGE C IMMUTABLE STRICT;
24+
25+
CREATE AGGREGATE min(FIXEDDECIMAL) (
26+
SFUNC = fixeddecimalsmaller,
27+
CFUNC = fixeddecimalsmaller,
28+
CTYPE = FIXEDDECIMAL,
29+
STYPE = FIXEDDECIMAL,
30+
SORTOP = <
31+
);
32+
33+
CREATE AGGREGATE max(FIXEDDECIMAL) (
34+
SFUNC = fixeddecimallarger,
35+
CFUNC = fixeddecimallarger,
36+
CTYPE = FIXEDDECIMAL,
37+
STYPE = FIXEDDECIMAL,
38+
SORTOP = >
39+
);
40+
41+
CREATE AGGREGATE sum(FIXEDDECIMAL) (
42+
SFUNC = fixeddecimal_avg_accum,
43+
CFUNC = fixeddecimalaggstatecombine,
44+
CTYPE = FIXEDDECIMALAGGSTATE,
45+
FINALFUNC = fixeddecimal_sum,
46+
STYPE = INTERNAL
47+
);
48+
49+
CREATE AGGREGATE avg(FIXEDDECIMAL) (
50+
SFUNC = fixeddecimal_avg_accum,
51+
CFUNC = fixeddecimalaggstatecombine,
52+
CTYPE = FIXEDDECIMALAGGSTATE,
53+
FINALFUNC = fixeddecimal_avg,
54+
STYPE = INTERNAL
55+
);
56+
57+

contrib/fixeddecimal/fixeddecimal.c

Lines changed: 146 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@
2424
#include "access/hash.h"
2525
#include "utils/array.h"
2626
#include "utils/builtins.h"
27+
28+
#ifdef PGXC
29+
#include "utils/int8.h"
30+
#endif /* PGXC */
31+
2732
#include "utils/numeric.h"
2833

2934
#define MAXINT8LEN 25
@@ -42,6 +47,15 @@
4247
*/
4348
#define FIXEDDECIMAL_SCALE 2
4449

50+
/* Sanity checks */
51+
#if FIXEDDECIMAL_SCALE == 0
52+
#error "FIXEDDECIMAL_SCALE cannot be zero. Just use a BIGINT if that's what you really want"
53+
#endif
54+
55+
#if FIXEDDECIMAL_SCALE > 19
56+
#error "FIXEDDECIMAL_SCALE cannot be greater than 19"
57+
#endif
58+
4559
/*
4660
* This is bounded by the maximum and minimum values of int64.
4761
* 9223372036854775807 is 19 decimal digits long, but we we can only represent
@@ -118,6 +132,14 @@ PG_FUNCTION_INFO_V1(fixeddecimal_avg_accum);
118132
PG_FUNCTION_INFO_V1(fixeddecimal_avg);
119133
PG_FUNCTION_INFO_V1(fixeddecimal_sum);
120134

135+
#ifdef PGXC
136+
PG_FUNCTION_INFO_V1(fixeddecimalaggstatein);
137+
PG_FUNCTION_INFO_V1(fixeddecimalaggstateout);
138+
PG_FUNCTION_INFO_V1(fixeddecimalaggstatesend);
139+
PG_FUNCTION_INFO_V1(fixeddecimalaggstaterecv);
140+
PG_FUNCTION_INFO_V1(fixeddecimalaggstatecombine);
141+
#endif /* PGXC */
142+
121143
/* Aggregate Internal State */
122144
typedef struct FixedDecimalAggState
123145
{
@@ -286,6 +308,36 @@ pg_int64tostr_zeropad(char *str, int64 value, int64 padding)
286308
return end;
287309
}
288310

311+
/*
312+
* fixeddecimal2str
313+
* Prints the fixeddecimal 'val' to buffer as a string.
314+
* Returns a pointer to the end of the written string.
315+
*/
316+
static char *
317+
fixeddecimal2str(int64 val, char *buffer)
318+
{
319+
char *ptr = buffer;
320+
int64 integralpart = val / FIXEDDECIMAL_MULTIPLIER;
321+
int64 fractionalpart = val % FIXEDDECIMAL_MULTIPLIER;
322+
323+
if (val < 0)
324+
{
325+
fractionalpart = -fractionalpart;
326+
327+
/*
328+
* Handle special case for negative numbers where the intergral part
329+
* is zero. pg_int64tostr() won't prefix with "-0" in this case, so
330+
* we'll do it manually
331+
*/
332+
if (integralpart == 0)
333+
*ptr++ = '-';
334+
}
335+
ptr = pg_int64tostr(ptr, integralpart);
336+
*ptr++ = '.';
337+
ptr = pg_int64tostr_zeropad(ptr, fractionalpart, FIXEDDECIMAL_SCALE);
338+
return ptr;
339+
}
340+
289341
/*
290342
* scanfixeddecimal --- try to parse a string into a fixeddecimal.
291343
*/
@@ -583,28 +635,8 @@ fixeddecimalout(PG_FUNCTION_ARGS)
583635
{
584636
int64 val = PG_GETARG_INT64(0);
585637
char buf[MAXINT8LEN + 1];
586-
char *ptr = &buf[0];
587-
int64 integralpart = val / FIXEDDECIMAL_MULTIPLIER;
588-
int64 fractionalpart = val % FIXEDDECIMAL_MULTIPLIER;
589-
590-
591-
if (val < 0)
592-
{
593-
fractionalpart = -fractionalpart;
594-
595-
/*
596-
* Handle special case for negative numbers where the intergral part
597-
* is zero. pg_int64tostr() won't prefix with "-0" in this case, so
598-
* we'll do it manually
599-
*/
600-
if (integralpart == 0)
601-
*ptr++ = '-';
602-
}
603-
ptr = pg_int64tostr(ptr, integralpart);
604-
*ptr++ = '.';
605-
ptr = pg_int64tostr_zeropad(ptr, fractionalpart, FIXEDDECIMAL_SCALE);
606-
607-
PG_RETURN_CSTRING(pnstrdup(buf, ptr - &buf[0]));
638+
char *end = fixeddecimal2str(val, buf);
639+
PG_RETURN_CSTRING(pnstrdup(buf, end - buf));
608640
}
609641

610642
/*
@@ -1684,6 +1716,7 @@ fixeddecimal_avg(PG_FUNCTION_ARGS)
16841716
PG_RETURN_INT64(state->sumX / state->N);
16851717
}
16861718

1719+
16871720
Datum
16881721
fixeddecimal_sum(PG_FUNCTION_ARGS)
16891722
{
@@ -1697,3 +1730,94 @@ fixeddecimal_sum(PG_FUNCTION_ARGS)
16971730

16981731
PG_RETURN_INT64(state->sumX);
16991732
}
1733+
1734+
1735+
#ifdef PGXC
1736+
1737+
/*
1738+
* Input / Output / Send / Receive functions for aggrgate states
1739+
* Currently for XL only
1740+
*/
1741+
1742+
Datum
1743+
fixeddecimalaggstatein(PG_FUNCTION_ARGS)
1744+
{
1745+
char *str = pstrdup(PG_GETARG_CSTRING(0));
1746+
FixedDecimalAggState *state;
1747+
char *token;
1748+
1749+
state = (FixedDecimalAggState *) palloc0(sizeof(FixedDecimalAggState));
1750+
1751+
token = strtok(str, ":");
1752+
state->sumX = DatumGetInt64(DirectFunctionCall3(fixeddecimalin, CStringGetDatum(token), 0, -1));
1753+
token = strtok(NULL, ":");
1754+
state->N = DatumGetInt64(DirectFunctionCall1(int8in, CStringGetDatum(token)));
1755+
pfree(str);
1756+
1757+
PG_RETURN_POINTER(state);
1758+
}
1759+
1760+
1761+
/*
1762+
* fixeddecimalaggstateout()
1763+
*/
1764+
Datum
1765+
fixeddecimalaggstateout(PG_FUNCTION_ARGS)
1766+
{
1767+
FixedDecimalAggState *state = (FixedDecimalAggState *) PG_GETARG_POINTER(0);
1768+
char buf[MAXINT8LEN + 1 + MAXINT8LEN + 1];
1769+
char *p;
1770+
1771+
p = fixeddecimal2str(state->sumX, buf);
1772+
*p++ = ':';
1773+
p = pg_int64tostr(p, state->N);
1774+
PG_RETURN_CSTRING(pnstrdup(buf, p - buf));
1775+
}
1776+
1777+
/*
1778+
* fixeddecimalaggstaterecv
1779+
*/
1780+
Datum
1781+
fixeddecimalaggstaterecv(PG_FUNCTION_ARGS)
1782+
{
1783+
StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
1784+
FixedDecimalAggState *state;
1785+
state = (FixedDecimalAggState *) palloc(sizeof(FixedDecimalAggState));
1786+
1787+
state->sumX = pq_getmsgint(buf, sizeof(int64));
1788+
state->N = pq_getmsgint(buf, sizeof(int64));
1789+
1790+
PG_RETURN_POINTER(state);
1791+
}
1792+
1793+
/*
1794+
* fixeddecimalaggstatesend
1795+
*/
1796+
Datum
1797+
fixeddecimalaggstatesend(PG_FUNCTION_ARGS)
1798+
{
1799+
FixedDecimalAggState *state = (FixedDecimalAggState *) PG_GETARG_POINTER(0);
1800+
StringInfoData buf;
1801+
1802+
pq_begintypsend(&buf);
1803+
1804+
pq_sendint(&buf, state->sumX, sizeof (int64));
1805+
pq_sendint(&buf, state->N, sizeof (int64));
1806+
1807+
PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
1808+
}
1809+
1810+
Datum
1811+
fixeddecimalaggstatecombine(PG_FUNCTION_ARGS)
1812+
{
1813+
FixedDecimalAggState *state1 = (FixedDecimalAggState *) PG_GETARG_POINTER(0);
1814+
FixedDecimalAggState *state2 = (FixedDecimalAggState *) PG_GETARG_POINTER(1);
1815+
FixedDecimalAggState *newstate = palloc(sizeof(FixedDecimalAggState));
1816+
1817+
newstate->sumX = DatumGetInt64(DirectFunctionCall2(fixeddecimalpl, Int64GetDatum(state1->sumX), Int64GetDatum(state2->sumX)));
1818+
newstate->N = DatumGetInt64(DirectFunctionCall2(int8pl, Int64GetDatum(state1->N), Int64GetDatum(state2->N)));
1819+
1820+
PG_RETURN_POINTER(newstate);
1821+
}
1822+
1823+
#endif /* PGXC */

0 commit comments

Comments
 (0)