Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class InputManager(
case (uri, partitioning) =>
new InputPortMaterializationReaderThread(
uri = uri,
portId = portId,
inputMessageQueue = this.inputMessageQueue,
workerActorId = this.actorId,
partitioning = partitioning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ class DefaultResourceAllocator(
val toWorkerActorIds =
operatorConfigs(globalPortId.opId).workerConfigs.map(_.workerId)
val fromVirtualThreadActorIds = toWorkerActorIds.map(toWorkerActorId =>
getFromActorIdForInputPortStorage(inputMatUri.toString, toWorkerActorId)
getFromActorIdForInputPortStorage(
inputMatUri.toString,
globalPortId.portId,
toWorkerActorId
)
)
// Extract the input port partitionInfo defined in the physicalOp, defaulting to UnknownPartition.
val inputPortPartitionInfo = region
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.texera.amber.core.virtualidentity.{
ChannelIdentity,
EmbeddedControlMessageIdentity
}
import org.apache.texera.amber.core.workflow.PortIdentity
import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.toPartitioner
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{
NO_ALIGNMENT,
Expand All @@ -55,6 +56,7 @@ import scala.collection.mutable.ArrayBuffer

class InputPortMaterializationReaderThread(
uri: URI,
portId: PortIdentity,
inputMessageQueue: LinkedBlockingQueue[DPInputQueueElement],
workerActorId: ActorVirtualIdentity,
partitioning: Partitioning
Expand All @@ -65,7 +67,7 @@ class InputPortMaterializationReaderThread(
private lazy val channelId = {
// A unique channel between this thread (dummy actor) and the worker actor.
val fromActorId: ActorVirtualIdentity =
getFromActorIdForInputPortStorage(uri.toString, workerActorId)
getFromActorIdForInputPortStorage(uri.toString, portId, workerActorId)
ChannelIdentity(fromActorId, workerActorId, isControl = false)
}
private val partitioner = toPartitioner(partitioning, workerActorId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ trait AssignPortHandler {
dp.inputManager.addPort(msg.portId, schema, inputPortURIs, partitionings)
inputPortURIStrs.foreach { uriStr =>
val toActorId = ctx.receiver
val fromActorId = getFromActorIdForInputPortStorage(uriStr, toActorId)
val fromActorId = getFromActorIdForInputPortStorage(uriStr, msg.portId, toActorId)
val channelId =
ChannelIdentity(fromWorkerId = fromActorId, toWorkerId = toActorId, isControl = false)
// Same as AddInputChannelHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
import org.apache.texera.amber.engine.e2e.TestUtils.buildWorkflow
import org.apache.texera.amber.operator.TestOperators
import org.apache.texera.amber.operator.difference.DifferenceOpDesc
import org.apache.texera.amber.operator.split.SplitOpDesc
import org.apache.texera.amber.operator.udf.python.{
DualInputPortsPythonUDFOpDescV2,
Expand Down Expand Up @@ -334,4 +335,42 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory
}
}

"RegionPlanGenerator" should "generate a runnable schedule for csv->difference with both ports from same csv" in {
val csv = TestOperators.headerlessSmallCsvScanOpDesc()
val diff = new DifferenceOpDesc()
val workflow = buildWorkflow(
List(csv, diff),
List(
LogicalLink(
csv.operatorIdentifier,
PortIdentity(),
diff.operatorIdentifier,
PortIdentity()
),
LogicalLink(
csv.operatorIdentifier,
PortIdentity(),
diff.operatorIdentifier,
PortIdentity(1)
)
),
new WorkflowContext()
)

val (schedule, _) = new ExpansionGreedyScheduleGenerator(
workflow.context,
workflow.physicalPlan
).generate()

val levels = schedule.levelSets
// Self-link to both ports must be broken into multiple region levels via materialization,
// otherwise the upstream csv blocks waiting on the dependent right port and execution deadlocks.
assert(
levels.size > 1,
s"expected multiple region levels (csv + materialized read), got ${levels.size} levels"
)
val regionList = levels.values.flatten.toList
assert(regionList.nonEmpty, "scheduler must produce at least one region")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.texera.amber.core.virtualidentity.{
PhysicalOpIdentity,
WorkflowIdentity
}
import org.apache.texera.amber.core.workflow.PortIdentity

import scala.util.matching.Regex

Expand Down Expand Up @@ -95,14 +96,26 @@ object VirtualIdentityUtils {
/**
* An input port materialization reader thread mimics the behavior of an upstream worker.
* Each thread has a virtual actor id. This method creates such a virtual actor id.
* @param storageURIStr The materialization location to read from.
*
* The destination port id is part of the identity so that when one upstream URI feeds
* multiple input ports of the same downstream worker (e.g. a single source connected to
* both inputs of Difference), each (uri, port, worker) triple still produces a distinct
* channel, preventing FIFO sequence collisions and end-of-channel markers from being
* routed to the wrong port.
*
* @param storageURIStr The materialization location to read from.
* @param toPortId The downstream input port the reader feeds.
* @param toWorkerActorId The worker actor that the thread belongs to.
* @return
*/
def getFromActorIdForInputPortStorage(
storageURIStr: String,
toPortId: PortIdentity,
toWorkerActorId: ActorVirtualIdentity
): ActorVirtualIdentity = {
ActorVirtualIdentity(MATERIALIZATION_READER_ACTOR_PREFIX + storageURIStr + toWorkerActorId.name)
ActorVirtualIdentity(
MATERIALIZATION_READER_ACTOR_PREFIX + storageURIStr +
s"_port${toPortId.id}${if (toPortId.internal) "i" else ""}_" +
toWorkerActorId.name
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.texera.amber.core.virtualidentity.{
PhysicalOpIdentity,
WorkflowIdentity
}
import org.apache.texera.amber.core.workflow.PortIdentity
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

Expand Down Expand Up @@ -152,12 +153,29 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with Matchers {

// ----- getFromActorIdForInputPortStorage -----

"getFromActorIdForInputPortStorage" should "prefix MATERIALIZATION_READER_ to the storage URI plus actor name" in {
"getFromActorIdForInputPortStorage" should "prefix MATERIALIZATION_READER_ and include the destination port id" in {
val toWorker = ActorVirtualIdentity("Worker:WF1-myOp-main-0")
val virtualReader = VirtualIdentityUtils.getFromActorIdForInputPortStorage(
"iceberg:/warehouse/x",
PortIdentity(0),
toWorker
)
virtualReader.name shouldBe "MATERIALIZATION_READER_iceberg:/warehouse/xWorker:WF1-myOp-main-0"
virtualReader.name shouldBe
"MATERIALIZATION_READER_iceberg:/warehouse/x_port0_Worker:WF1-myOp-main-0"
}

it should "produce distinct ids for the same uri+worker but different port ids" in {
val toWorker = ActorVirtualIdentity("Worker:WF1-myOp-main-0")
val left = VirtualIdentityUtils.getFromActorIdForInputPortStorage(
"iceberg:/warehouse/x",
PortIdentity(0),
toWorker
)
val right = VirtualIdentityUtils.getFromActorIdForInputPortStorage(
"iceberg:/warehouse/x",
PortIdentity(1),
toWorker
)
left should not be right
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class DifferenceOpDesc extends LogicalOp {
OperatorGroupConstants.SET_GROUP,
inputPorts = List(
InputPort(PortIdentity(), displayName = "left"),
InputPort(PortIdentity(1), displayName = "right")
InputPort(PortIdentity(1), displayName = "right", dependencies = List(PortIdentity()))
),
outputPorts = List(OutputPort(blocking = true))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,19 @@ class DifferenceOpExecSpec extends AnyFlatSpec with BeforeAndAfter {
opExec.close()
}

it should "return empty set when same tuples are used as both inputs (self-difference)" in {
opExec.open()
counter = 0
val tuples = (1 to 5).map(_ => tuple()).toList

tuples.foreach(t => opExec.processTuple(t, input1))
assert(opExec.onFinish(input1).isEmpty)

tuples.foreach(t => opExec.processTuple(t, input2))
val outputTuples = opExec.onFinish(input2).toSet
assert(outputTuples.isEmpty)

opExec.close()
}

}