forked from danielcamposramos/Knowledge3D
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_original_kernel.py
More file actions
157 lines (136 loc) · 5.12 KB
/
test_original_kernel.py
File metadata and controls
157 lines (136 loc) · 5.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python3
"""
Test original RPN kernel (modular_rpn_kernel.ptx) to establish baseline.
This should work if loader API is correct.
"""
def main() -> int:
import ctypes
from pathlib import Path
import numpy as np
from knowledge3d.cranium.sovereign import loader
print("=" * 70)
print("Testing Original RPN Kernel (Baseline)")
print("=" * 70)
# Load ORIGINAL kernel (not extended)
ptx_path = Path("knowledge3d/cranium/ptx/modular_rpn_kernel.ptx")
if not ptx_path.exists():
print(f"❌ Kernel not found: {ptx_path}")
return 1
print(f"\n[1/6] Loading kernel: {ptx_path}")
try:
module = loader.load_module_from_file(str(ptx_path))
kernel = loader.get_function(module, "modular_rpn_geometric_kernel")
print("✓ Kernel loaded successfully")
except Exception as e:
print(f"❌ Failed to load kernel: {e}")
return 1
# Test simple DOT product program
# Opcode: 0x30 (DOT)
# Expected: dot([1,2,3], [4,5,6]) = 32.0
print("\n[2/6] Preparing test program (DOT product)")
op_codes = np.array([0x30], dtype=np.uint16) # DOT opcode
scalars = np.array([], dtype=np.float32)
vectors = np.array(
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0], dtype=np.float32
) # Two 3D vectors
print(" Opcode: 0x30 (DOT)")
print(" Vectors: [1,2,3] · [4,5,6]")
print(" Expected result: 32.0")
# Allocate GPU memory
print("\n[3/6] Allocating GPU memory")
try:
state_size = 15 * 1040 # 15 instances × 1040 bytes
state_buffer = loader.gpu_malloc(state_size)
zeros = np.zeros(state_size, dtype=np.uint8)
loader.memcpy_htod(state_buffer, zeros.ctypes.data, state_size)
op_codes_gpu = loader.gpu_malloc(op_codes.nbytes)
scalars_gpu = loader.gpu_malloc(max(4, scalars.nbytes)) # At least 4 bytes
vectors_gpu = loader.gpu_malloc(vectors.nbytes)
loader.memcpy_htod(op_codes_gpu, op_codes.ctypes.data, op_codes.nbytes)
loader.memcpy_htod(vectors_gpu, vectors.ctypes.data, vectors.nbytes)
print("✓ GPU memory allocated and data copied")
except Exception as e:
print(f"❌ GPU allocation failed: {e}")
return 1
# Launch kernel
print("\n[4/6] Launching kernel")
try:
loader.launch(
kernel,
grid=(1, 1, 1),
block=(1, 1, 1),
params=[
ctypes.c_uint32(0), # instance_id
ctypes.c_uint64(op_codes_gpu.value),
ctypes.c_uint64(scalars_gpu.value),
ctypes.c_uint64(vectors_gpu.value),
ctypes.c_uint64(state_buffer.value),
ctypes.c_uint32(len(op_codes)),
],
)
loader.synchronize()
print("✓ Kernel executed")
except Exception as e:
print(f"❌ Kernel launch failed: {e}")
return 1
# Read error code from state buffer
print("\n[5/6] Reading results from GPU")
try:
# State layout: head(4) + size(4) + error(4) + reserved(4) + stack[...]
error_host = ctypes.c_uint32()
error_ptr = loader.CUdeviceptr(state_buffer.value + 8) # Offset 8 = error field
loader.memcpy_dtoh(
ctypes.byref(error_host), error_ptr, ctypes.sizeof(error_host)
)
error_code = error_host.value
# Read result from stack
result_host = ctypes.c_float()
result_ptr = loader.CUdeviceptr(state_buffer.value + 16) # Offset 16 = stack[0]
loader.memcpy_dtoh(
ctypes.byref(result_host), result_ptr, ctypes.sizeof(result_host)
)
result = result_host.value
print("✓ Results read from GPU")
except Exception as e:
print(f"❌ Failed to read results: {e}")
return 1
# Cleanup
print("\n[6/6] Cleaning up GPU memory")
try:
loader.gpu_free(op_codes_gpu)
loader.gpu_free(scalars_gpu)
loader.gpu_free(vectors_gpu)
loader.gpu_free(state_buffer)
print("✓ GPU memory freed")
except Exception as e:
print(f"⚠️ Cleanup warning: {e}")
# Print results
print("\n" + "=" * 70)
print("RESULTS")
print("=" * 70)
print(f"Error code: {error_code}")
if error_code == 0:
print(" → kErrorNone (success)")
elif error_code == 9001:
print(" → kErrorUnknownOpcode")
elif error_code == 9002:
print(" → kErrorStackUnderflow")
elif error_code == 9003:
print(" → kErrorStackOverflow")
else:
print(" → Unknown error")
print(f"\nResult: {result:.6f}")
print("Expected: 32.000000")
print(f"Difference: {abs(result - 32.0):.6f}")
# Final verdict
print("\n" + "=" * 70)
if error_code == 0 and abs(result - 32.0) < 0.01:
print("✅ PASS: Original kernel works correctly!")
print(" → Problem is in EXTENDED kernel, not loader")
return 0
print("❌ FAIL: Original kernel broken")
print(" → Problem is in loader API or kernel execution")
print(" → Fix loader before testing extended kernel")
return 1
if __name__ == "__main__":
raise SystemExit(main())