[go: nahoru, domu]

Skip to content

Commit

Permalink
new thread-per-connection approach implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
myou11 committed Feb 7, 2018
1 parent 3eae9ef commit 0e45a8b
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 113 deletions.
83 changes: 47 additions & 36 deletions src/cs455/overlay/node/MessengerNode.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package cs455.overlay.node;

import cs455.overlay.routing.RoutingTable;
import cs455.overlay.transport.TCPConnectionsCache;
import cs455.overlay.transport.TCPReceiverThread;
import cs455.overlay.transport.TCPSenderThread;
import cs455.overlay.transport.TCPServerThread;
import cs455.overlay.transport.*;
import cs455.overlay.util.InteractiveCommandParser;
import cs455.overlay.wireformats.*;

Expand Down Expand Up @@ -88,15 +85,6 @@ public long getRcvSummation() {
return rcvSummation;
}

private void createSenderThread(Socket socket, byte[] msg) {
try {
// send msg by creating a sender thread
(new Thread(new TCPSenderThread(socket, msg))).start();
} catch (IOException ioe) {
ioe.printStackTrace();
}
}

@Override
public void onEvent(Event event, Socket socket) throws IOException {
switch(event.getType()) {
Expand All @@ -118,14 +106,19 @@ public void onEvent(Event event, Socket socket) throws IOException {
}
}

private void processRegistrationStatusResponse(RegistryReportsRegistrationStatus event, Socket socket) {
private void processRegistrationStatusResponse(RegistryReportsRegistrationStatus event, Socket socket) throws IOException {
if (event.getID() > -1) {
this.ID = event.getID();

// Cache the socket connected to the registry
// Cached this only if successfully registered, otherwise no need to cache connection to registry
this.getConnectionsCache().addConnection(registryIPportNumStr, socket);

// TODO: UPDATE THIS COMMENT TO REFLECT THE NEW TCP CONNECTION CHANGES
// TCPConnection connection = connectionsCache.getConnection(registryIPportNumStr);
// TODO: DIDN'T START THE SENDER AND RECEIVER THREADS HERE BECAUSE I DID IT IN THE TCPCONNECTION CTOR; IF IT DOESN'T WORK, START IT HERE
// connectionsCache.addConnection(registryIPportNumStr, connection);
// TODO: didnt cache connection here bc it is already done when this msging node initiates a connec w registry.
// TODO: also dont need to start sndr and rcvr threads for this connec since this msging node does that when it
// TODO: first creates this connection to register itself to the registry (REFINE THESE COMMENTS)
System.out.printf("Cached connection with %s using socket: %s\n", registryIPportNumStr, socket);
System.out.println(event.getInfoStr());
System.out.println("My assigned ID is: " + this.ID);
Expand Down Expand Up @@ -169,8 +162,15 @@ private void processNodeManifest(RegistrySendsNodeManifest event) throws IOExcep
try {
String[] IPportNumArr = entry.getValue().split(":");
Socket socket = new Socket(IPportNumArr[0], Integer.parseInt(IPportNumArr[1]));
connectionsCache.addConnection(entry.getValue(), socket);
(new Thread(new TCPReceiverThread(socket, this))).start();
// TODO: COMMENT ABOUT THE NEW TCPCONNECTION HERE; UNCOMMENT THE startSenderAndReceiverThreads() BELOW IF CTOR CALL DOESN'T WORK
// connection to a node in the routing table
TCPConnection connection = new TCPConnection(socket, this);
connectionsCache.addConnection(entry.getValue(), connection);
// TODO: refine comments here
// the rcvr and sndr thread for this node's side of the connection (pipe)
// when the other node receives the connection request from the socket above,
// it will start its own sndr and rcvr threads for its end of the pipe
connection.startSenderAndReceiverThreads();
} catch(IOException ioe) {
connectionsEstablished = false;
break;
Expand All @@ -188,15 +188,19 @@ private void processNodeManifest(RegistrySendsNodeManifest event) throws IOExcep
status = -1;
infoStr = String.format("Messaging Node (%d) failed to establish connections to messaging nodes in its routing table", this.ID);
}
Socket registryConnection = connectionsCache.getConnection(registryIPportNumStr);
// TODO: MAKE SURE THIS IS COMMENTED THOROUGHLY WITH NEW TCP CONNECTION CHANGES; UPDATE COMMENTS WITH NEW CTOR START THREADS
// registryConnection's sender and receiver threads should have been started when the registry confirmed
// this node's registration. after confirming registration, this node should have cached the connection
// and started the sender and receiver threads in the processNodeRegistrationStatusResponse() above
TCPConnection registryConnection = connectionsCache.getConnection(registryIPportNumStr);
NodeReportsOverlaySetupStatus overlaySetupStatus = new NodeReportsOverlaySetupStatus(status, infoStr);
createSenderThread(registryConnection, overlaySetupStatus.getBytes());
registryConnection.sendMsg(overlaySetupStatus.getBytes());
}

// finds the closest node to the given dst ID and returns the connection to it
private Socket findClosestNode(int dstID) {
private TCPConnection findClosestNode(int dstID) {
// store connection to closest node
Socket routingSocket = null;
TCPConnection routingConnection = null;

int numHopsToDst = 0;
// to find hops to dst, have to start at src ID (this node's) and count clockwise
Expand All @@ -215,14 +219,14 @@ private Socket findClosestNode(int dstID) {
if(Math.pow(2, entry) < numHopsToDst) {
closestID = routingTableList.get(entry).getKey();
// get the connection to the current closest node
routingSocket = connectionsCache.getConnection(routingTableList.get(entry).getValue());
routingConnection = connectionsCache.getConnection(routingTableList.get(entry).getValue());
}
}
// after this loop, we should have the connection to the closest node w/o overshooting

System.out.printf("Node (%d) is not in my routing table. Routing data to closest node: node (%d)\n", dstID, closestID);
//System.out.printf("Node (%d) is not in my routing table. Routing data to closest node: node (%d)\n", dstID, closestID);

return routingSocket;
return routingConnection;
}

private void processTaskInitiate(RegistryRequestsTaskInitiate event) throws IOException {
Expand All @@ -245,15 +249,15 @@ private void processTaskInitiate(RegistryRequestsTaskInitiate event) throws IOEx
dstID = registeredNodeIDs.get(randIndex);
}

Socket routingSocket;
TCPConnection routingConnection;
if (routingTable.contains(dstID)) {
String IPportNumStr = routingTable.getEntry(dstID);
System.out.printf("Node (%d) is in my routing table. Sending data to %s\n", dstID, IPportNumStr);
//System.out.printf("Node (%d) is in my routing table. Sending data to %s\n", dstID, IPportNumStr);
// send OverlayNodeSendsData msg
routingSocket = connectionsCache.getConnection(IPportNumStr);
routingConnection = connectionsCache.getConnection(IPportNumStr);
} else { // ID to send packet to is not in routing table
// find closest node to route data to
routingSocket = findClosestNode(dstID);
routingConnection = findClosestNode(dstID);
}

// Send packet to the next node with a random int (-2mil to 2mil) as the payload
Expand All @@ -266,7 +270,7 @@ private void processTaskInitiate(RegistryRequestsTaskInitiate event) throws IOEx
}

OverlayNodeSendsData nodeSendsData = new OverlayNodeSendsData(dstID, this.ID, payload, new ArrayList<>());
createSenderThread(routingSocket, nodeSendsData.getBytes());
routingConnection.sendMsg(nodeSendsData.getBytes());
}
}

Expand All @@ -277,7 +281,7 @@ private void processNodeSendsData(OverlayNodeSendsData event) throws IOException
ArrayList<Integer> routingTrace = event.getRoutingTrace();

// will hold connection that we should route packet to
Socket routingSocket;
TCPConnection routingConnection;

if (dstID == this.ID) {
// this is the dst
Expand All @@ -288,17 +292,17 @@ private void processNodeSendsData(OverlayNodeSendsData event) throws IOException
}
} else if (routingTable.contains(dstID)){
// dst is in routing table
routingSocket = connectionsCache.getConnection(routingTable.getEntry(dstID));
routingConnection = connectionsCache.getConnection(routingTable.getEntry(dstID));

synchronized(trackersLock) {
++this.relayTracker;
}

OverlayNodeSendsData nodeSendsData = new OverlayNodeSendsData(dstID, srcID, payload, routingTrace);
createSenderThread(routingSocket, nodeSendsData.getBytes());
routingConnection.sendMsg(nodeSendsData.getBytes());
} else {
// dst is not in routing table; choose closest node
routingSocket = findClosestNode(dstID);
routingConnection = findClosestNode(dstID);

// don't have to check if this is the src node b/c once a msg gets sent, it should never arrv back at the src
// and since this is also not the sink for the msg, this node is relaying a msg, so add this ID to the routing trace
Expand All @@ -310,7 +314,7 @@ private void processNodeSendsData(OverlayNodeSendsData event) throws IOException
}

OverlayNodeSendsData nodeSendsData = new OverlayNodeSendsData(dstID, srcID, payload, routingTrace);
createSenderThread(routingSocket, nodeSendsData.getBytes());
routingConnection.sendMsg(nodeSendsData.getBytes());
}
}

Expand Down Expand Up @@ -365,8 +369,15 @@ public static void main(String[] args) {
// start thread to send the msg, so the node can still receive
// any incoming requests while it sends the msg
// pass the marshalled nodeRegistration byte[] in as the msg to be sent
(new Thread(new TCPSenderThread(commSocket, nodeRegistration.getBytes()))).start();
/*(new Thread(new TCPSenderThread(commSocket, nodeRegistration.getBytes()))).start();
(new Thread(new TCPReceiverThread(commSocket, msgNode))).start();
*/
// TODO: UPDATE COMMENTS FOR NEW TCPCONNECTION WAY
TCPConnection registryConnection = new TCPConnection(commSocket, msgNode);
msgNode.getConnectionsCache().addConnection(registryIPportNumStr, registryConnection);
// TODO: START SNDR AND RCVR THREADS HERE IF TCPCONNECTIONS CTOR DOESN'T WORK
registryConnection.startSenderAndReceiverThreads();
registryConnection.sendMsg(nodeRegistration.getBytes());
} catch (IOException ioe) {
ioe.printStackTrace();
}
Expand Down
Loading

0 comments on commit 0e45a8b

Please sign in to comment.