[go: nahoru, domu]

Skip to content

Commit

Permalink
initial impl of task finished
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxwell You committed Feb 13, 2018
1 parent 2dc1c9d commit 2b3a375
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 34 deletions.
26 changes: 15 additions & 11 deletions src/cs455/overlay/node/MessengerNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import java.util.*;

public class MessengerNode implements Protocol, Node {
private boolean DEBUG = false;
private boolean DEBUG = true;

private int ID;
private String IP;
Expand Down Expand Up @@ -97,10 +97,10 @@ public void onEvent(Event event, TCPConnection connection) throws IOException {
processDeregistrationStatusResponse((RegistryReportsDeregistrationStatus)event, connection);
break;
case (REGISTRY_SENDS_NODE_MANIFEST):
processNodeManifest((RegistrySendsNodeManifest)event);
processNodeManifest((RegistrySendsNodeManifest)event, connection);
break;
case (REGISTRY_REQUESTS_TASK_INITIATE):
processTaskInitiate((RegistryRequestsTaskInitiate)event);
processTaskInitiate((RegistryRequestsTaskInitiate)event, connection);
break;
case (OVERLAY_NODE_SENDS_DATA):
processNodeSendsData((OverlayNodeSendsData)event);
Expand Down Expand Up @@ -141,7 +141,8 @@ private void processDeregistrationStatusResponse(RegistryReportsDeregistrationSt

// TODO: Could pass in a TCPConnection and use that as the registry connection instead of retrieving the registry connection below
// this would work because only the receiver thread on the connection between this node and the registry would rcv this msg
private void processNodeManifest(RegistrySendsNodeManifest event) throws IOException {
// connection is the connection to the registry, use that to send msg back to registry
private void processNodeManifest(RegistrySendsNodeManifest event, TCPConnection connection) throws IOException {
routingTable = event.getRoutingTable();

if (DEBUG) {
Expand Down Expand Up @@ -170,12 +171,12 @@ private void processNodeManifest(RegistrySendsNodeManifest event) throws IOExcep
String[] IPportNumArr = entry.getValue().split(":");
Socket socket = new Socket(IPportNumArr[0], Integer.parseInt(IPportNumArr[1]));
// connection to a node in the routing table
TCPConnection connection = new TCPConnection(socket, this);
connectionsCache.addConnection(entry.getValue(), connection);
TCPConnection routingConnection = new TCPConnection(socket, this);
connectionsCache.addConnection(entry.getValue(), routingConnection);
/* The rcvr and sndr thread for this node's side of the connection (pipe).
When the other node receives the connection request from this node,
they will start its their own sndr and rcvr threads for their end of the pipe */
connection.startSenderAndReceiverThreads();
routingConnection.startSenderAndReceiverThreads();
} catch(IOException ioe) {
connectionsEstablished = false;
break;
Expand All @@ -194,11 +195,10 @@ private void processNodeManifest(RegistrySendsNodeManifest event) throws IOExcep
infoStr = String.format("Messaging Node (%d) failed to establish connections to messaging nodes in its routing table", this.ID);
}
/* registryConnection's sender and receiver threads should have been started when initiating connections
with the registryAfter confirming registration, this node should have cached the connection
with the registry. After confirming registration, this node should have cached the connection
and started the sender and receiver threads in the processNodeRegistrationStatusResponse() above */
TCPConnection registryConnection = connectionsCache.getConnection(registryIPportNumStr);
NodeReportsOverlaySetupStatus overlaySetupStatus = new NodeReportsOverlaySetupStatus(status, infoStr);
registryConnection.getSenderThread().addMessage(overlaySetupStatus.getBytes());
connection.getSenderThread().addMessage(overlaySetupStatus.getBytes());
}

private int selectRandomDstID() {
Expand Down Expand Up @@ -253,7 +253,7 @@ private TCPConnection findClosestNode(int dstID) {
return routingConnection;
}

private void processTaskInitiate(RegistryRequestsTaskInitiate event) throws IOException {
private void processTaskInitiate(RegistryRequestsTaskInitiate event, TCPConnection connection) throws IOException {
System.out.printf("Task initiate received. Starting to send %d packets\n", event.getNumPacketsToSend());

// Begin sending msgs
Expand Down Expand Up @@ -291,6 +291,10 @@ private void processTaskInitiate(RegistryRequestsTaskInitiate event) throws IOEx
OverlayNodeSendsData nodeSendsData = new OverlayNodeSendsData(dstID, this.ID, payload, new ArrayList<>());
routingConnection.getSenderThread().addMessage(nodeSendsData.getBytes());
}

// Done sending messages, so send task finished message to registry
OverlayNodeReportsTaskFinished taskFinished = new OverlayNodeReportsTaskFinished(this.IP, this.portNum, this.ID);
connection.getSenderThread().addMessage(taskFinished.getBytes());
}

private void processNodeSendsData(OverlayNodeSendsData event) throws IOException {
Expand Down
24 changes: 22 additions & 2 deletions src/cs455/overlay/node/Registry.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ public class Registry implements Protocol, Node {
// store sockets used to communicate with other nodes so we dont have to create a new socket for each communication (snd/rcv)
private TCPConnectionsCache connectionsCache;
private ServerSocket serverSocket;
private int numNodesRegistered; // set when setup-overlay is initiated
private int numNodesEstablishedConnections = 0;

// keep track of nodes that have finished sending packets
private int numNodesFinishedSending = 0;

public Registry(int portNum, ServerSocket serverSocket) {
this.portNum = portNum;
this.serverSocket = serverSocket;
Expand All @@ -39,6 +43,10 @@ public ServerSocket getServerSocket() {
return serverSocket;
}

public void setNumNodesRegistered(int numNodesRegistered) {
this.numNodesRegistered = numNodesRegistered;
}

public int getNumNodesEstablishedConnections() {
return numNodesEstablishedConnections;
}
Expand All @@ -54,6 +62,10 @@ public void onEvent(Event event, TCPConnection connection) throws IOException {
break;
case (NODE_REPORTS_OVERLAY_SETUP_STATUS):
processOverlaySetupStatusResponse((NodeReportsOverlaySetupStatus)event);
break;
case (OVERLAY_NODE_REPORTS_TASK_FINISHED):
processTaskFinished((OverlayNodeReportsTaskFinished)event);
break;
}
}

Expand All @@ -72,7 +84,7 @@ private int assignID() {
private boolean validRegistration(String IP, int portNum, TCPConnection connection) {
String IPportNumStr = IP + ':' + portNum;
String connectionIP = connection.getSocket().getInetAddress().getHostAddress();
return registeredNodes.containsValue(IPportNumStr) && connectionIP.equals(IP);
return !registeredNodes.containsValue(IPportNumStr) && connectionIP.equals(IP);
}

private synchronized void registerNode(OverlayNodeSendsRegistration event, TCPConnection connection) throws IOException {
Expand All @@ -97,6 +109,7 @@ private synchronized void registerNode(OverlayNodeSendsRegistration event, TCPCo
System.out.printf("Registered node from %s, ID is %d\n", registeredNodes.get(ID), ID);
} else { // invalid registration request
// TODO: TEST IF IT WILL FAIL IF WE TRY TO REGISTER A NODE MORE THAN ONCE. JUST CODE SEND A REG REQ TWICE IN THE MSG NODE
ID = -1; // failure ID
infoStr = "Registration request failed. There is either (1) no more room in the Registry, " +
"(2) this node has already been registered, or (3) the IP address in the request did not" +
"match the IP address of the origin";
Expand All @@ -113,7 +126,7 @@ private boolean validDeregistration(String IP, int IDtoRemove, TCPConnection con
}

private synchronized void deregisterNode(OverlayNodeSendsDeregistration event, TCPConnection connection) throws IOException {
int idToRemove = event.getAssignedID();
int idToRemove = event.getNodeID();
String infoStr;
Socket connectionSocket = connection.getSocket();

Expand Down Expand Up @@ -151,6 +164,13 @@ private synchronized void processOverlaySetupStatusResponse(NodeReportsOverlaySe
System.out.println("Not all nodes in the overlay were successful in establishing connections to nodes that comprised their routing table");
}

// Syncd b/c registry could get many reports of finished sending at once
private synchronized void processTaskFinished(OverlayNodeReportsTaskFinished event) {
++numNodesFinishedSending;
if (DEBUG && numNodesFinishedSending == numNodesRegistered)
System.out.println("ALL MESSAGING NODES HAVE FINISHED SENDING MESSAGES");
}

private void processCommand(String[] command, InteractiveCommandParser commandParser) {
// carry out different tasks based on the command specified
switch (command[0]) {
Expand Down
2 changes: 1 addition & 1 deletion src/cs455/overlay/transport/TCPReceiverThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class TCPReceiverThread implements Runnable {
private DataInputStream dIn;
private Node node;

private boolean DEBUG = false;
private boolean DEBUG = true;

public TCPReceiverThread(TCPConnection connection, Node node) throws IOException {
this.connection = connection;
Expand Down
4 changes: 4 additions & 0 deletions src/cs455/overlay/util/InteractiveCommandParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public void setupOverlay(int routingTableSize) {

Registry registry = (Registry) node;

/* Routing of msgs will deal only with the nodes that are registered at the
time of setup-overlay being called */
registry.setNumNodesRegistered(registry.getRegisteredNodes().size());

// Transfer the entries from the HashMap into an ArrayList for faster iteration
ArrayList<Map.Entry<Integer, String>> registeredNodesList = new ArrayList<>(registry.getRegisteredNodes().entrySet());

Expand Down
19 changes: 19 additions & 0 deletions src/cs455/overlay/wireformats/EventFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,23 @@ private Event getNodeSendsData(DataInputStream dIn) throws IOException {
return new OverlayNodeSendsData(dstID, srcID, payload, routingTrace);
}

// registry will use this
private Event getTaskFinished(DataInputStream dIn) throws IOException {
// IP addr
int IPlength = dIn.readInt();
byte[] IPbytes = new byte[IPlength];
dIn.readFully(IPbytes);
String IP = new String(IPbytes);

// portNum
int portNum = dIn.readInt();

// nodeID
int nodeID = dIn.readInt();

return new OverlayNodeReportsTaskFinished(IP, portNum, nodeID);
}

// registry will use this
public Event processMsg(byte[] msg) throws IOException {
ByteArrayInputStream baInStream = new ByteArrayInputStream(msg);
Expand All @@ -173,6 +190,8 @@ public Event processMsg(byte[] msg) throws IOException {
return getTaskInitiate(dIn);
case(OVERLAY_NODE_SENDS_DATA):
return getNodeSendsData(dIn);
case(OVERLAY_NODE_REPORTS_TASK_FINISHED):
return getTaskFinished(dIn);
}

// close the streams
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,68 @@
package cs455.overlay.wireformats;

public class OverlayNodeReportsTaskFinished {
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;

public class OverlayNodeReportsTaskFinished implements Protocol, Event {
private int type = OVERLAY_NODE_REPORTS_TASK_FINISHED;
private String IP;
private int portNum;
private int nodeID;

public OverlayNodeReportsTaskFinished(String IP, int portNum, int nodeID) {
this.IP = IP;
this.portNum = portNum;
this.nodeID = nodeID;
}

@Override
public int getType() {
return type;
}

public String getIP() {
return IP;
}

public int getPortNum() {
return portNum;
}

public int getNodeID() {
return nodeID;
}

@Override
public byte[] getBytes() throws IOException {
/* Msg outline:
int: OVERLAY_NODE_REPORTS_TASK_FINISHED
int: length of following IP field
byte[]: IP addr
int: portNum
int: nodeID */
byte[] marshalledBytes = null;

ByteArrayOutputStream baOutStream = new ByteArrayOutputStream();
DataOutputStream dOut = new DataOutputStream(baOutStream);

dOut.writeInt(type);

byte[] IPbytes = IP.getBytes();
int IPlength = IPbytes.length;
dOut.writeInt(IPlength);
dOut.write(IPbytes);

dOut.writeInt(portNum);

dOut.writeInt(nodeID);

dOut.flush();
marshalledBytes = baOutStream.toByteArray();

baOutStream.close();
dOut.close();

return marshalledBytes;
}
}
31 changes: 12 additions & 19 deletions src/cs455/overlay/wireformats/OverlayNodeSendsDeregistration.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ public class OverlayNodeSendsDeregistration implements Protocol, Event {
private int type = OVERLAY_NODE_SENDS_DEREGISTRATION;
private String IP;
private int portNum;
private int assignedID;
private int nodeID;

public OverlayNodeSendsDeregistration(String IP, int portNum, int assignedID) {
public OverlayNodeSendsDeregistration(String IP, int portNum, int nodeID) {
this.IP = IP;
this.portNum = portNum;
this.assignedID = assignedID;
this.nodeID = nodeID;
}

public String getIP() {
Expand All @@ -24,8 +24,8 @@ public int getPortNum() {
return portNum;
}

public int getAssignedID() {
return assignedID;
public int getNodeID() {
return nodeID;
}

@Override
Expand All @@ -36,34 +36,27 @@ public int getType() {
// marshall this msg into a byte arr
@Override
public byte[] getBytes() throws IOException {
/*
Msg outline:
byte: msg type (OVERLAY_NODE_SENDS_DEREGISTRATION)
byte: length of following IP field
byte: IP addr
int: portNum
int: assigned node ID
*/
/* Msg outline:
int: msg type (OVERLAY_NODE_SENDS_DEREGISTRATION)
int: length of following IP field
byte[]: IP addr
int: portNum
int: nodeID */
byte[] marshalledBytes = null;

ByteArrayOutputStream baOutStream = new ByteArrayOutputStream();
DataOutputStream dOut = new DataOutputStream(baOutStream);

// msg type
dOut.writeInt(type);

byte[] IPbytes = IP.getBytes();
int IPlength = IPbytes.length;
// len of IP addr
dOut.writeInt(IPlength);
// IP addr in bytes
dOut.write(IPbytes);

// portNum
dOut.writeInt(portNum);

// assigned ID
dOut.writeInt(assignedID);
dOut.writeInt(nodeID);

// write buffer to the stream
dOut.flush();
Expand Down
6 changes: 6 additions & 0 deletions src/setupP2P
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
test_home=<~/455/cs455-hw1/src>
for i in 'cat machine_list'
do
echo 'logging into ' ${i}
gnome-terminal -x bash -c "ssh -t ${i} 'cd ${text_home}; java cs455.overlay.node.MessagingNode <hanoi> <51000>;bash;'" &\
done

0 comments on commit 2b3a375

Please sign in to comment.