Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions consistency.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,36 @@ Only export the functions which should be called externally.
## Namespaces

Avoid `\d` namespace switches.

## Injectable Dependencies

Pass dependencies as a single dict to `init` — this keeps the signature stable as more injectables are added. All dependencies are required; `init` must error immediately with a clear message if any are absent. Store each injected function in `.z.m` and always call it through `.z.m` at call sites.

```q
/ mymodule.q
init:{[deps]
/ deps - `log!(logdict) or `log`timer!(logdict;timerdict)
/ `log: `info`warn`error!({[c;m]};{[c;m]};{[c;m]}) - required
/ examples:
/ mymodule.init[enlist[`log]!enlist logdep]
logdict:$[99h=type deps;$[(`log in key deps) and not (::)~deps`log;deps`log;()!()];()!()];
if[not count logdict;
'"di.mymodule: log dependency is required; pass `info`warn`error functions - see di.log or refer to confluence documentation";
];
.z.m.loginfo:logdict`info;
.z.m.logwarn:logdict`warn;
.z.m.logerr:logdict`error;
};

myfunc:{[x]
.z.m.loginfo[`mymodule;"processing ",string x];
};
```

Callers provide a dict of functions. Use `di.log` if no custom logger is needed:

```q
log:use`di.log
logdep:`info`warn`error!(log.info;log.warn;log.error)
mymodule.init[enlist[`log]!enlist logdep]
```
16 changes: 13 additions & 3 deletions di/compression/compression.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

---

## memo: Dependencies
## Dependencies

- KX log module
Passed as a dictionary to `init`. The `log` dependency is required — `init` throws if absent.

| Key | Required | Type | Description |
|---|---|---|---|
| `` `log `` | yes | dict | Logger with keys `` `info`warn`error ``, each `{[c;m]}`. See `di.log` for a default implementation. |

---

Expand Down Expand Up @@ -58,7 +62,8 @@ depth, 10,default, 1, 17, 8
## :wrench: Functions

| Function | Description |
|--------------------|----------------------------------------------------------------|
|--------------------|----------------------------------------------------------------|
|`init` | Initialise module with injected log dependency |
|`showcomp` | Load specified compression config and show compression details for files to be compressed |
|`getcompressioncsv` | get function to return loaded compressioncsv config |
|`compressmaxage` | Compress files according to config up to the specified max age |
Expand All @@ -72,6 +77,11 @@ depth, 10,default, 1, 17, 8
```q
// Include compression module in a process
cmp:use`di.compression
logdep:`info`warn`error!(
{[c;m] -1 "INFO [",string[c],"] ",m;};
{[c;m] -1 "WARN [",string[c],"] ",m;};
{[c;m] -2 "ERROR [",string[c],"] ",m;});
cmp.init[enlist[`log]!enlist logdep]

// View dictionary of functions
cmp
Expand Down
55 changes: 34 additions & 21 deletions di/compression/compression.q
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ checkcsv:{[csvtab;path]
/ include snappy (3) for version 3.4 or after
allowedalgos:0 1 2,$[.z.K>=3.4;3;()];
if[0b~all colscheck:`table`minage`column`calgo`cblocksize`clevel in (cols csvtab);
log.error[err:path,": Compression config has incorrect column layout at column(s): ", (" " sv string where not colscheck), ". Should be `table`minage`column`calgo`cblocksize`clevel."];'err];
.z.m.logerr[`compression;err:path,": Compression config has incorrect column layout at column(s): ", (" " sv string where not colscheck), ". Should be `table`minage`column`calgo`cblocksize`clevel."];'err];
if[count checkalgo:exec i from csvtab where not calgo in allowedalgos;
log.error[err:path,": Compression config has incorrect compression algo in row(s): ",(", " sv string -1_allowedalgos)," or ",(string last allowedalgos),"."];'err];
.z.m.logerr[`compression;err:path,": Compression config has incorrect compression algo in row(s): ",(", " sv string -1_allowedalgos)," or ",(string last allowedalgos),"."];'err];
if[count checkblock:exec i from csvtab where calgo in 1 2, not cblocksize in 12 + til 9;
log.error[err:path,": Compression config has incorrect compression blocksize at row(s): ", (" " sv string checkblock), ". Should be between 12 and 19."];'err];
.z.m.logerr[`compression;err:path,": Compression config has incorrect compression blocksize at row(s): ", (" " sv string checkblock), ". Should be between 12 and 19."];'err];
if[count checklevel: exec i from csvtab where calgo in 2, not clevel in til 10;
log.error[err:path,": Compression config has incorrect compression level at row(s): ", (" " sv string checklevel), ". Should be between 0 and 9."];'err];
.z.m.logerr[`compression;err:path,": Compression config has incorrect compression level at row(s): ", (" " sv string checklevel), ". Should be between 0 and 9."];'err];
if[.z.o like "w*"; if[count rowwin:where ((csvtab[`cblocksize] < 16) & csvtab[`calgo] > 0);
log.error[err:path,": Compression config has incorrect compression blocksize for windows at row: ", (" " sv string rowwin), ". Must be more than or equal to 16."];'err]];
.z.m.logerr[`compression;err:path,": Compression config has incorrect compression blocksize for windows at row: ", (" " sv string rowwin), ". Must be more than or equal to 16."];'err]];
if[(any nulls: any null (csvtab[`column];csvtab[`table];csvtab[`minage];csvtab[`clevel]))>0;
log.error[err:path,": Compression config has empty cells in column(s): ", (" " sv string `column`table`minage`clevel where nulls)];'err];
.z.m.logerr[`compression;err:path,": Compression config has empty cells in column(s): ", (" " sv string `column`table`minage`clevel where nulls)];'err];
};

/ Empty compressioncsv table defined for edge case where a bad config is loaded first attempt
Expand All @@ -22,8 +22,8 @@ compressioncsv:([] table:`$();minage:`int$();column:`$();calgo:`int$();cblocksiz
loadcsv:{[inputcsv]
/ accepts hsym path as argument
/ loads and checks compression config
loadedcsv:@[{log.info["Opening ", x];("SISIII"; enlist ",") 0:"S"$x}; (string inputcsv); {log.error["failed to open ", (x)," : ",y];'y}[string inputcsv]];
res:.[checkcsv;(loadedcsv;string inputcsv);{log.error["failed to load csv due to error: ",x];:0b}];
loadedcsv:@[{.z.m.loginfo[`compression;"Opening ", x];("SISIII"; enlist ",") 0:"S"$x}; (string inputcsv); {[p;e].z.m.logerr[`compression;"failed to open ", p," : ",e];'e}[string inputcsv]];
res:.[checkcsv;(loadedcsv;string inputcsv);{.z.m.logerr[`compression;"failed to load csv due to error: ",x];:0b}];
if[res~0b;:(::)];
compressioncsv::loadedcsv;
};
Expand Down Expand Up @@ -57,7 +57,7 @@ showcomp:{[hdbpath;csvpath;maxage]
/ load config from csvpath and display summary of files to be compressed and how
/ load csv
loadcsv[$[10h = type csvpath;hsym `$csvpath;hsym csvpath]];
log.info["compression: scanning hdb directory structure"];
.z.m.loginfo[`compression;"compression: scanning hdb directory structure"];
/ build paths table and fill age
$[count key (` sv hdbpath,`$"par.txt");
pathstab:update 0W^age from (,/) hdbstructure'[hsym each `$(read0 ` sv hdbpath,`$"par.txt")];
Expand All @@ -80,7 +80,7 @@ showcomp:{[hdbpath;csvpath;maxage]
t: t lj a;
/ in case of no default specified, delete from the table where no data is joined on
t: delete from t where calgo=0Nj,cblocksize=0Nj,clevel=0Nj;
log.info["compression: getting current size of each file up to a maximum age of ",string maxage];
.z.m.loginfo[`compression;"compression: getting current size of each file up to a maximum age of ",string maxage];
update currentsize:hcount each fullpath from select from t where age within (compressage;maxage)
};

Expand All @@ -106,13 +106,13 @@ statstabupdate:{[file;algo;sizeuncomp;compressionvaluepre]
};

singlethreadcompress:{[table]
log.info["compression: Single threaded process, compress applied sequentially"];
.z.m.loginfo[`compression;"compression: Single threaded process, compress applied sequentially"];
{compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize];
cleancompressed[x `fullpath;x `calgo]} each table;
};

multithreadcompress:{[table]
log.info["compression: Multithreaded process, compress applied in parallel "];
.z.m.loginfo[`compression;"compression: Multithreaded process, compress applied in parallel"];
{compress[x `fullpath;x `calgo;x `cblocksize;x `clevel; x `currentsize]} peach table;
{cleancompressed[x `fullpath;x `calgo]} each table;
};
Expand All @@ -139,22 +139,22 @@ summarystats:{
totalcompratio: (sum compressedfiles`uncompressedLength) % sum compressedfiles`compressedLength;
memoryusage:((sum decompressedfiles`uncompressedLength) - sum decompressedfiles`compressedLength) % 2 xexp 20;
totaldecompratio: neg (sum decompressedfiles`compressedLength) % sum decompressedfiles`uncompressedLength;
log.info["compression: Memory savings from compression: ", .Q.f[2;memorysavings], "MB. Total compression ratio: ", .Q.f[2;totalcompratio],"."];
log.info["compression: Additional memory used from de-compression: ",.Q.f[2;memoryusage], "MB. Total de-compression ratio: ", .Q.f[2;totaldecompratio],"."];
log.info["compression: Check getstatstab[] for info on each file."];
.z.m.loginfo[`compression;"compression: Memory savings from compression: ", .Q.f[2;memorysavings], "MB. Total compression ratio: ", .Q.f[2;totalcompratio],"."];
.z.m.loginfo[`compression;"compression: Additional memory used from de-compression: ",.Q.f[2;memoryusage], "MB. Total de-compression ratio: ", .Q.f[2;totaldecompratio],"."];
.z.m.loginfo[`compression;"compression: Check getstatstab[] for info on each file."];
};

compress:{[filetoCompress;algo;blocksize;level;sizeuncomp]
compressedFile: hsym `$(string filetoCompress),"_kdbtempzip";
/ compress or decompress as appropriate:
cmp:$[algo=0;"de";""];
$[((0 = count -21!filetoCompress) & not 0 = algo)|((not 0 = count -21!filetoCompress) & 0 = algo);
[log.info["compression: ",cmp,"compressing ","file ", (string filetoCompress), " with algo: ", (string algo), ", blocksize: ", (string blocksize), ", and level: ", (string level), "."];
[.z.m.loginfo[`compression;"compression: ",cmp,"compressing ","file ", (string filetoCompress), " with algo: ", (string algo), ", blocksize: ", (string blocksize), ", and level: ", (string level), "."];
/ perform the compression/decompression
-19!(filetoCompress;compressedFile;blocksize;algo;level);
];
/ if already compressed/decompressed, then log that and skip.
log.info["compression: file ", (string filetoCompress), " is already ",cmp,"compressed",". Skipping this file"]
.z.m.loginfo[`compression;"compression: file ", (string filetoCompress), " is already ",cmp,"compressed",". Skipping this file"]
]
};

Expand All @@ -163,17 +163,17 @@ cleancompressed:{[filetoCompress;algo]
cmp:$[algo=0;"de";""];
/ Verify compressed file exists
if[()~ key compressedFile;
log.info["compression: No compressed file present for the following file - ",string[filetoCompress]];
.z.m.loginfo[`compression;"compression: No compressed file present for the following file - ",string[filetoCompress]];
:();
];
/ Verify compressed file's contents match original
if[not ((get compressedFile)~sf:get filetoCompress) & (count -21!compressedFile) or algo=0;
log.info["compression: ",cmp,"compressed ","file ",string[compressedFile]," doesn't match original. Deleting new file"];
.z.m.loginfo[`compression;"compression: ",cmp,"compressed ","file ",string[compressedFile]," doesn't match original. Deleting new file"];
hdel compressedFile;
:();
];
/ Given above two checks satisfied run the delete of old and rename compressed to original name
log.info["compression: File ",cmp,"compressed ",string[filetoCompress]," successfully; matches orginal. Deleting original."];
.z.m.loginfo[`compression;"compression: File ",cmp,"compressed ",string[filetoCompress]," successfully; matches original. Deleting original."];
system "r ", (last ":" vs string compressedFile)," ", last ":" vs string filetoCompress;
/ move the hash files too.
hashfilecheck[compressedFile;filetoCompress;sf];
Expand All @@ -185,8 +185,21 @@ hashfilecheck:{[compressedFile;filetoCompress;sf]
/ check for double hash file if nested data contains symbol vector/atom
$[3.6<=.z.K;
if[77 = type sf; system "r ", (last ":" vs string compressedFile),"# ", (last ":" vs string filetoCompress),"#";
.[{system "r ", (last ":" vs string x),"## ", (last ":" vs string y),"##"};(compressedFile;filetoCompress);log.info["compression: File does not have enumeration domain"]]];
.[{system "r ", (last ":" vs string x),"## ", (last ":" vs string y),"##"};(compressedFile;filetoCompress);{.z.m.loginfo[`compression;"compression: File does not have enumeration domain"]}]];
/ if running below 3.6, nested list types will be 77h+t and will not have double hash file
if[78 <= type sf; system "r ", (last ":" vs string compressedFile),"# ", (last ":" vs string filetoCompress),"#"]
]
};

init:{[deps]
/ deps - `log!(logdict) where logdict is `info`warn`error!({[c;m]};{[c;m]};{[c;m]}) - required
/ examples:
/ cmp.init[enlist[`log]!enlist logdep]
logdict:$[99h=type deps;$[(`log in key deps) and not (::)~deps`log;deps`log;()!()];()!()];
if[not count logdict;
'"di.compression: log dependency is required; pass `info`warn`error functions - see di.log or refer to confluence documentation";
];
.z.m.loginfo:logdict`info;
.z.m.logwarn:logdict`warn;
.z.m.logerr:logdict`error;
};
7 changes: 1 addition & 6 deletions di/compression/init.q
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
/ Load core functionality into root module namespace
\l ::compression.q

/ Load KX log module - needed for log.info and log.error
logger:use`kx.log
log:logger.createLog[]

export:([showcomp;getcompressioncsv;compressmaxage;docompression;getstatstab])
export:([init;showcomp;getcompressioncsv;compressmaxage;docompression;getstatstab])
5 changes: 4 additions & 1 deletion di/compression/test.csv
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
action,ms,bytes,lang,code,repeat,minver,comment

before,0,0,q,cmp:use`di.compression,1,,Initialize module
before,0,0,q,cmp:use`di.compression,1,,load module
before,0,0,q,mocklog:`info`warn`error!({[c;m]};{[c;m]};{[c;m]}),1,,define no-op mock logger
fail,0,0,q,cmp.init[(::)],1,,init without log dep throws
run,0,0,q,cmp.init[enlist[`log]!enlist mocklog],1,,init with injected logger

before,0,0,q,trade:([]date:asc 100#.z.d - til 10;time:100#asc 09:30+10?00:10:00;sym:100?`AAPL`GOOGL`MSFT`TSLA`NVDA`AMD;price:(5*100?10000)%100;size:5*100?100;side:100?`B`S;src:100?`NYSE`NASDAQ),1,,Create mock trade table
before,0,0,q,quote:([]date:asc 100#.z.d - til 10;time:100#asc 09:30+10?00:10:00;sym:100?`AAPL`GOOGL`MSFT`TSLA`NVDA`AMD;bid:(5*100?10000)%100;ask:(5*100?10000)%100;bsize:5*100?100;asize:5*100?100;src:100?`NYSE`NASDAQ),1,,Create mock quote table
Expand Down