Skip to content

Commit bd5976d

Browse files
committed
Added alpc + rpc tests files
1 parent 7947ace commit bd5976d

2 files changed

Lines changed: 186 additions & 0 deletions

File tree

tests/test_alpc.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import pytest
2+
import threading
3+
import time
4+
5+
import windows.alpc
6+
import windows.generated_def as gdef
7+
8+
from pfwtest import *
9+
10+
11+
def generate_client_server_test(client_function, server_function):
12+
def generated_test():
13+
th = threading.Thread(target=server_function, args=())
14+
th.start()
15+
time.sleep(0.5)
16+
client_function()
17+
th.join()
18+
return True
19+
return generated_test
20+
21+
PORT_NAME = r"\RPC Control\PythonForWindowsTestPort"
22+
CLIENT_MESSAGE = "Message 1\x00\xffABCD"
23+
SERVER_MESSAGE = "Message 2-" + "".join(chr(i) for i in range(256))
24+
25+
def alpc_simple_test_server():
26+
server = windows.alpc.AlpcServer(PORT_NAME)
27+
msg = server.recv()
28+
assert msg.type & 0xfff == gdef.LPC_CONNECTION_REQUEST
29+
server.accept_connection(msg)
30+
msg = server.recv()
31+
assert msg.type & 0xfff == gdef.LPC_REQUEST
32+
assert msg.data == CLIENT_MESSAGE
33+
msg.data = SERVER_MESSAGE
34+
server.send(msg)
35+
36+
def alpc_simple_test_client():
37+
client = windows.alpc.AlpcClient(PORT_NAME)
38+
response = client.send_receive(CLIENT_MESSAGE)
39+
assert response.data == SERVER_MESSAGE
40+
41+
test_simple_alpc = generate_client_server_test(alpc_simple_test_client, alpc_simple_test_server)
42+
43+
44+
def send_message_with_view(client, message_data, view_data):
45+
# Create View
46+
section = client.create_port_section(0, 0, 0x4000)
47+
view = client.map_section(section[0], 0x4000)
48+
49+
# New message with a View
50+
msg = windows.alpc.AlpcMessage(0x2000)
51+
msg.attributes.ValidAttributes |= gdef.ALPC_MESSAGE_VIEW_ATTRIBUTE
52+
msg.view_attribute.Flags = 0
53+
msg.view_attribute.ViewBase = view.ViewBase
54+
msg.view_attribute.SectionHandle = view.SectionHandle
55+
msg.view_attribute.ViewSize = 0x4000
56+
msg.data = message_data
57+
windows.current_process.write_memory(view.ViewBase, view_data)
58+
return client.send_receive(msg)
59+
60+
61+
CLIENT_VIEW_MESSAGE = "Message 1\x00\xffABCD"
62+
CLIENT_VIEW_DATA = "Message Data-view" + "".join(chr(i) for i in range(256))
63+
64+
65+
def alpc_view_test_server():
66+
server = windows.alpc.AlpcServer(PORT_NAME)
67+
msg = server.recv()
68+
assert msg.type & 0xfff == gdef.LPC_CONNECTION_REQUEST
69+
server.accept_connection(msg)
70+
msg = server.recv()
71+
assert msg.type & 0xfff == gdef.LPC_REQUEST
72+
assert msg.view_is_valid
73+
view_data = windows.current_process.read_memory(msg.view_attribute.ViewBase, len(CLIENT_VIEW_DATA))
74+
assert view_data == CLIENT_VIEW_DATA
75+
assert msg.data == CLIENT_VIEW_MESSAGE
76+
msg.data = SERVER_MESSAGE
77+
server.send(msg)
78+
79+
def alpc_view_test_client():
80+
client = windows.alpc.AlpcClient(PORT_NAME)
81+
response = send_message_with_view(client, CLIENT_VIEW_MESSAGE, CLIENT_VIEW_DATA)
82+
assert response.data == SERVER_MESSAGE
83+
84+
test_view_alpc = generate_client_server_test(alpc_view_test_client, alpc_view_test_server)

tests/test_rpc.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import pytest
2+
import os.path
3+
4+
import windows.rpc as rpc
5+
from windows.rpc import ndr
6+
import windows.generated_def as gdef
7+
8+
from pfwtest import *
9+
10+
11+
12+
UAC_UIID = "201ef99a-7fa0-444c-9399-19ba84f12a1a"
13+
14+
15+
16+
def test_rpc_epmapper():
17+
endpoints = windows.rpc.find_alpc_endpoints(UAC_UIID)
18+
assert endpoints
19+
endpoint = endpoints[0]
20+
assert endpoint.protseq == "ncalrpc"
21+
assert endpoint.endpoint
22+
assert endpoint.object.Uuid.to_string().lower() == UAC_UIID
23+
24+
25+
# NDR Descriptions
26+
class NDRPoint(ndr.NdrStructure):
27+
MEMBERS = [ndr.NdrLong, ndr.NdrLong]
28+
29+
class NdrUACStartupInfo(ndr.NdrStructure):
30+
MEMBERS = [ndr.NdrUniquePTR(ndr.NdrWString),
31+
ndr.NdrLong,
32+
ndr.NdrLong,
33+
ndr.NdrLong,
34+
ndr.NdrLong,
35+
ndr.NdrLong,
36+
ndr.NdrLong,
37+
ndr.NdrLong,
38+
ndr.NdrLong,
39+
ndr.NdrLong,
40+
NDRPoint]
41+
42+
class UACParameters(ndr.NdrParameters):
43+
MEMBERS = [ndr.NdrUniquePTR(ndr.NdrWString),
44+
ndr.NdrUniquePTR(ndr.NdrWString),
45+
ndr.NdrLong,
46+
ndr.NdrLong,
47+
ndr.NdrWString,
48+
ndr.NdrWString,
49+
NdrUACStartupInfo,
50+
ndr.NdrLong,
51+
ndr.NdrLong]
52+
53+
class NdrProcessInformation(ndr.NdrParameters):
54+
MEMBERS = [ndr.NdrLong] * 4
55+
56+
57+
def test_rpc_uac_call():
58+
client = windows.rpc.find_alpc_endpoint_and_connect(UAC_UIID)
59+
iid = client.bind(UAC_UIID)
60+
61+
python_path = 'C:\\Python27\\python.exe'
62+
python_name = os.path.basename(python_path)
63+
64+
# Marshalling parameters.
65+
parameters = UACParameters.pack([
66+
python_path + "\x00", # Application Path
67+
python_path + "\x00", # Commandline
68+
0, # UAC-Request Flag
69+
gdef.CREATE_UNICODE_ENVIRONMENT, # dwCreationFlags
70+
"\x00", # StartDirectory
71+
"WinSta0\\Default\x00", # Station
72+
# Startup Info
73+
(None, # Title
74+
0, # dwX
75+
0, # dwY
76+
0, # dwXSize
77+
0, # dwYSize
78+
0, # dwXCountChars
79+
0, # dwYCountChars
80+
0, # dwFillAttribute
81+
0, # dwFlags
82+
5, # wShowWindow
83+
# Point structure: Use MonitorFromPoint to setup StartupInfo.hStdOutput
84+
(0, 0)),
85+
0, # Window-Handle to know if UAC can steal focus
86+
0xffffffff]) # UAC Timeout
87+
88+
89+
result = client.call(iid, 0, parameters)
90+
stream = ndr.NdrStream(result)
91+
92+
ph, th, pid, tid = NdrProcessInformation.unpack(stream)
93+
return_value = ndr.NdrLong.unpack(stream)
94+
assert ph
95+
assert pid
96+
assert th
97+
assert tid
98+
windows.winproxy.CloseHandle(th) # NoLeak
99+
proc = windows.WinProcess(handle=ph)
100+
assert proc.name == python_name
101+
proc.exit(0)
102+

0 commit comments

Comments
 (0)