What is Distributed Computing System ?

Check out more papers on Computer Engineering Computer Computer Science

1.) Introduction

A distributed computing system can be defined as a collection of processors interconnected by a communication network such that each processor has its own local memory. The communication between any two or more processors of the system takes place by passing information over the communication network. It has its application in various fields like Hadoop and Map Reduce which we will be discussing further in details.

Hadoop is becoming the technology of choice for enterprises that need to effectively collect, store and process large amounts of structured and complex data.

The purpose of the thesis is to research about the possibility of using a MapReduce framework to implement Hadoop.

Now all this is possible by the file system that is used by Hadoop and it is HDFS or Hadoop Distributed File System.

HDFS is a distributed file system and capable to run on hardware. It is similar with existing distributed file systems and its main advantage over the other distributed File system is, it is designed to be deployed on low-cost hardware and highly fault-tolerant. HDFS provides extreme throughput access to applications having large data sets.

Originally it was built as infrastructure support for the Apache Nutch web search engine. Applications that run using HDFS have extremely large data sets like few gigabytes to even terabytes in size. Thus, HDFS is designed to support very large sized files. It provides high data communication and can connect hundreds of nodes in a single cluster and supports tens of millions of files in a system at a time.

Now we take all the above things mentioned above in details. We will be discussing various fields where Hadoop is being implemented like in storage facility of Facebook and twitter, HIVE, PIG etc.

2.) Serial vs. Parallel Programming

In the early decades of computing, programs were serial or sequential, that is, a program consisted of a categorization of instructions, where each instruction executed sequential as name suggests. It ran from start to finish on a single processor.

Parallel programming (grid computing) developed as a means of improving performance and efficiency. In a parallel program, the process is broken up into several parts, each of which will be executed concurrently. The instructions from each part run simultaneously on different CPUs. These CPUs can exist on a single machine, or they can be CPUs in a set of computers connected via a network.

Not only are parallel programs faster, they can also be used to solve problems on large datasets using non-local resources. When you have a set of computers connected on a network, you have a vast pool of CPUs, and you often have the ability to read and write very large files (assuming a distributed file system is also in place).

Parallelism is nothing but a strategy for performing complex and large tasks faster than traditional serial way. A large task can either be performed serially, one step following another, or can be decomposed into smaller tasks to be performed simultaneously using concurrent mechanism in parallel systems.

Parallelism is done by:

  • Breaking up the process into smaller processes
  • Assigning the smaller processes to multiple processors to work on simultaneously
  • Coordinating the processors

Parallel problem solving can be seen in real life application too.

Examples: automobile manufacturing plant; operating a large organization; building construction;

3.) History of clusters:

 

1940's - early days

ENIAC, developed during World War II, had 25 independent computing units

 

1960's - still early days

ILLIAC - SIMD machine with 32 processors and a distributed memory architecture, based on research undertaken by Von Neumann at the Institute for Advanced Study (IAS)

Developments were also being made in shared-memory multiprocessor machines and pipelined vector supercomputers

 

1970's & 1980's - used for scientific research

Parallel computers failed to make it in mainstream computing, but were extensively used for computationally intense scientific problems such as CFD. Mainly SIMD machines like the AMT-DAP, the Maspar MP-1 and the Thinking Machines CM-1, as well as some more novel developments like the Inmos Transputer (pictured)

 

1990's - mainstream MIMD processing

SIMD computers become obsolete. Mainstream manufacturers produce multiprocessor versions like the SGI Origin 2000 pictured. Two and four processor Intel boxes come on to the market. Clusters of Intel machines running Linux attract interest from the scientific community and industry. Becoming very popular in the 2000's.

Clustering is the use of cluster of computers, typically PCs or some workstations, storage devices, and interconnections, appears to outsider (user) as a single highly super system. Cluster computing can be used for high availability and load balancing. It can be used as a relatively low-cost form of parallel processing system for scientific and other related applications.

Computer clustering technology put cluster of few systems together to provide better system reliability. Cluster server systems can connect a group of systems together in order to provide combined processing service for the clients in the cluster.

Cluster operating systems distribute the tasks amongst the available systems. Clusters of systems or workstations can connect a group of systems together to share critically demanding and tough tasks. Theoretically, a cluster operating system can provide seamless optimization in every case.

At the present time, cluster server and workstation systems are mostly used in High Availability applications and in scientific applications such as numerical computations.

A cluster is a type of parallel or distributed system that:

  • consists of a collection of interconnected whole computers
  • and is used as single, unified computing resource.

The ``whole computer'' in above definition can have one or more processors built into a single operating system image.

Why a Cluster

  • Lower cost: In all-purpose small sized systems profit from using proper technology. Both hardware and software costs tend to be expressively minor for minor systems. However one must study the entire cost of proprietorship of your computing environment while making a buying conclusion. Next subdivision facts to some issues which may counterbalance some of the gains of primary cost of acquirement of a cluster. .
  • Vendor independence: Though it is usually suitable to use similar components through a number of servers in a cluster, it is worthy to retain a certain degree of vendor independence, especially if the cluster is being organized for long term usage. A Linux cluster created on mostly service hardware permits for much better vendor liberation than a large multi-processor scheme using a proprietary operating system.
  • Scalability: In several environments the problem load is too large that it just cannot be processed on a specific system within the time limits of the organization. Clusters similarly provide a hassle-free path for increasing the computational means as the load rises over time. Most large systems scale to a assured number of processors and require a costly upgrade
  • Reliability, Availability and Serviceability (RAS): A larger system is typically more vulnerable to failure than a smaller system. A major hardware or software component failure fetches the whole system down. Hence if a large single system is positioned as the computational resource, a module failure will bring down substantial computing power. In case of a cluster, a single module failure only affects a small part of the overall computational resources. A system in the cluster can be repaired without bringing rest of the cluster down. Also, additional computational resources can be added to a cluster while it is running the user assignment. Hence a cluster maintains steadiness of user operations in both of these cases. In similar type of situations a SMP system will require a complete shutdown and a restart.
  • Adaptability: It is much easier to adapt the topology. The patterns of linking the compute nodes together, of a cluster to best suit the application requirements of a computer center. Vendors typically support much classified topologies of MPPs because of design, or sometimes testing, issues.
  • Faster technology innovation: Clusters benefit from thousands of researchers all around the world, who typically work on smaller systems rather than luxurious high end systems.

Limitations of Clusters

  • It is noteworthy to reference certain shortcomings of using clusters as opposite to a single large system. These should be closely cautious while defining the best computational resource for the organization. System managers and programmers of the organization should intensely take part in estimating the following trade-offs.
  • A cluster increases the number of individual components in a computer center. Every server in a cluster has its own sovereign network ports, power supplies, etc. The increased number of components and cables going across servers in a cluster partially counterbalances some of the RAS advantages stated above. It is easier to achieve a single system as opposed to numerous servers in a cluster. There are a lot more system services obtainable to manage computing means within a single system than those which can assistance manage a cluster. As clusters progressively find their way into profitable organizations, more cluster savvy tools will become accessible over time, which will bridge some of this gap.
  • In order for a cluster to scale to make actual use of numerous CPUs, the workload needs to be properly well-adjusted on the cluster. Workload inequity is easier to handle in a shared memory environment, because switching tasks across processors doesn't involve too much data movement. On the other hand, on a cluster it tends to be very tough to move a by this time running task from one node to another. If the environment is such that workload balance cannot be controlled, a cluster may not provide good parallel proficiency.
  • Programming patterns used on a cluster are typically diverse from those used on shared-memory systems. It is relatively easier to use parallelism in a shared-memory system, since the shared data is gladly available. On a cluster, as in an MPP system, either the programmer or the compiler has to explicitly transport data from one node to another. Before deploying a cluster as a key resource in your environment, you should make sure that your system administrators and programmers are comfortable in working in a cluster environment.

Getting Started With Linux Cluster:

Although clustering can be performed on various operating systems like Windows, Macintosh, Solaris etc. , Linux has its own advantages which are as follows:-

  • Linux runs on a wide range of hardware
  • Linux is exceptionally stable
  • Linux source code is freely distributed.
  • Linux is relatively virus free.
  • Having a wide variety of tools and applications for free.
  • Good environment for developing cluster infrastructure.

Cluster Overview and Terminology

A compute cluster comprises of a lot of different hardware and software modules with complex interfaces between various modules. In fig 1.3 we show a simplified concept of the key layers that form a cluster. Following sections give a brief overview of these layers.

4.) Parallel computing and Distributed Computing system

Parallel computing

It is the concurrent execution of some permutation of multiple instances of programmed instructions and data on multiple processors in order to achieve results faster.

A parallel computing system is a system in which computer with more than one processor for parallel processing. In the past, each processor of a multiprocessing system every time came in its own processor packaging, but in recent times-introduced multicore processors contain multiple logical processors in a single package. There are many diverse kinds of parallel computers. They are well-known by the kind of interconnection among the processors (“processing elements" or PEs) and memory.

Distributed Computing System:

There are two types of distributed Computing systems:

  1. Tightly coupled system: In these systems, there is a single system wide primary memory (address space) that is shared by all the processors. In these systems any communication between the processors usually takes place through the shared memory. In tightly coupled systems, the number of processors that can be usefully deployed is usually small and limited by the bandwidth of the shared memory. Tightly coupled systems are referred to as parallel processing systems
  2. Loosely coupled systems: In these systems, the processors do not share memory, and each processor has its own local memory. In these systems, all physical communication between the processors is done by passing messages across the network that interconnects the processors. In this type of System Processors are expandable and can have unlimited number of processor. Loosely coupled systems, are referred to as distributed computing systems.

4.1) Minicomputer Model

It is a simple extension of the centralized time-sharing system. A distributed computing system based on this classical consists of a few minicomputers or large supercomputers unified by a communication network. Each minicomputer usually has many user simultaneously logged on to it through several terminals linked to it with every user logged on to one exact minicomputer, with remote access to other minicomputers, The network permits a user to access remote resources that are available on same machine other than the one on to which the user is currently logged.

The minicomputer model is used when resource sharing with remote users is anticipated.

The initial ARPAnet is an example of a distributed computing system based on the minicomputer model.

4.2) Workstation Model

Workstation model consists of several workstations unified by a communication network. The best example of a Workstation Model can be a company’s office or a university department which may have quite a few workstation scattered throughout a building or campus, with each workstation equipped with its individual disk and serving time which is specifically during the night, Notion of using workstation Model is that when certain workstations are idle (not being used), resulting in the waste of great amounts of CPU time the model connects all these workstations by a high-speed LAN so that futile workstations may be used to process jobs of users who are logged onto to other workstations and do not have adequate processing power at their own workstations to get their jobs handled efficiently.

A user logs onto one of the workstations which is his “home” workstation and submits jobs for execution if the system does not have sufficient processing power for executing the processes of the submitted jobs resourcefully, it transfers one or more of the processes from the user’s workstation to some other workstation that is currently ideal and gets the process executed there, and finally the outcome of execution is given back to the user’s workstation deprived of the user being aware of it.

The main Issue increases if a user logs onto a workstation that was idle until now and was being used to perform a process of another workstation .How the remote process is to be controlled at this time .To handle this type of problem we have three solutions: The first method is to allow the remote process share the resources of the workstation along with its own logged-on user’s processes. This method is easy to apply, but it setbacks the main idea of workstations helping as personal computers, because if remote processes are permitted to execute concurrently with the logged-on user’s own processes, the logged-on user does not get his or her fail-safe response.

The second method is to kill the remote process. The main disadvantage of this technique is that all the processing done for the remote process gets lost and the file system may be left in an erratic state, making this method repellent.

The third method is to migrating the remote process back to its home workstation, so that its execution can be continued there. This method is tough to implement because it involves the system to support preemptive process migration facility that is stopping the current process when a higher priority process comes into the execution.

Thus we can say that the workstation model is a network of individual workstations, each with its own disk and a local file system.

The Sprite system and experimental system developed at Zerox PARC are two examples of distributed computing systems, based on the workstation model.

4.3) Workstation-Server Model

Workstation Server Model consists of a limited minicomputers and numerous workstations (both diskful and diskless workstations) but most of them are diskless connected by a high speed communication Network. A workstation with its own local disk is generally called a diskful workstation and a workstation without a local disk is named as diskless workstation.

The file systems used by these workstations is either applied either by a diskful workstation or by a minicomputer armed with a disk for file storage. One or more of the minicomputers are used for applying the file system. Other minicomputer may be used for providing other types of service area, such as database service and print service. Thus, every minicomputer is used as a server machine to provide one or more types of services. Therefore in the workstation-server model, in addition to the workstations, there are dedicated machines (may be specialized workstations) for running server processes (called servers) for handling and providing access to shared resources.

A user logs onto a workstation called his home workstation, Normal computation activities required by the user’s processes are performed at the user’s home workstation, but requirements for services provided by special servers such as a file server or a database server are sent to a server providing that type of service that performs the user’s requested activity and returns the result of request processing to the user’s workstation. Therefore, in this model, the user’s processes need not be migrated to the server machines for getting the work done by those machines.

For better complete system performance, the local disk of diskful workstation is normally used for such purposes as storage of temporary file, storage of unshared files, storage of shared files that are rarely changed, paging activity in virtual-memory management, and caching of remotely accessed data.

Workstation Server Model is better than Workstation Model in the following ways:

It is much cheaper to use a few minicomputers equipped with large, fast disks than a large number of diskful workstations, with each workstation having a small, slow disk.

Diskless workstations are also preferred to diskful workstations from a system maintenance point of view. Backup and hardware maintenance are easier to perform with a few large disks than with many small disks scattered all Furthermore, installing new releases of software (such as a file server with new functionalities) is easier when the software is to be installed on a few file server machines than on every workstations.

In the workstation-server model, since all files are managed by the file servers, users have the flexibility to use any workstation and access the files in the same manner irrespective of which workstation the user is currently logged on .Whereas this is not true with the workstation model, in which each workstation has its local file system, because different mechanisms are needed to access local and remote files. Unlike the workstation model, this model does not need a process migration facility, which is difficult to implement.

In this model, a client process or workstation sends a request to a server process or a mini computer for getting some service such as reading a block of a file. The server executes the request and sends back a reply to the client that contains the result of request processing.

A user has guarantied response time because workstations are not used for executing remote process. However, the model does not utilize the processing capability of idle workstation.

The V-System (Cheriton 1988) is an example of a distributed computing system that is based on the workstation-server model.

4.4) Processor-Pool Model

In the process of pool model the processors are pooled together-to be shared by the users needed. The pool -or processors consist of a large number of micro-computers and minicomputers attached to the network. Each processor in the pool has its own memory to load and run a system program or an application program of the distributed-computing system. The processor-pool model is used for the purpose that most of the time a user does not need any computing power but once in a while he may need a very large amount of computing power for short time (e.g., when recompiling a program consisting of a large number of files after changing a basic shared declaration).

In processor-pool model, the processors in the pool have no terminal attached directly to them, and users access the system from terminals that are attached to the network via special devices. These terminals are either small diskless workstations or graphic terminals. A special server called a run server manages and allocates the processors in the pool to different users on a demand basis. When a user submits a job for computation an appropriate number of Processors are temporarily assigned to his or her job by the run server. In this type of model we do not have a concept of home machine, in this when a user logs on he is logged on to the whole system by default.

The processor-pool model allows better utilization of the available processing power of a distributed computing system as in this model the entire processing power of the system is available for use by the current logged-on users, whereas this is not true for the workstation-server model in which several workstations may be idle at a particular time but they cannot be used for processing the jobs of other users.

Furthermore, the processor-pool model provides greater flexibility than the workstation-server model as the system’s services can be easily expanded without the need to install any more computers. The processors in the pool can be allocated to act as extra servers to carry any additional load arising from an increased user population or to provide new services.

However, the processor-pool model is usually considered to be unsuitable for high-performance interactive application, program of a user is being executed and the terminal via which the user is interacting with the system. The workstation-server model is generally considered to be more suitable for such applications.

Amoeba [Mullender et al. 1990]. Plan 9 [Pike et al. 1990], and the Cambridge Distributed Computing System [Needham and Herbert 1982] are examples of distributed computing systems based on the processor-pool model.

5) Issues in designing a distributed operating system:

To design a distributed operating system is a more difficult task than designing a centralized operating system for several reasons. In the design of a centralized operating system, it is assumed that the operating system has access to complete and accurate information about the environment is which it is functioning. In a distributed system, the resources are physically separated, their is no common clock among the multiple processors as the delivery of messages is delayed, and not have up-to-date, consistent knowledge about the state of the various components of the underlying distributed system .And lack of up-to-date and consistent information makes many thing (such as management of resources and synchronization of cooperating activities) much harder in the design of a distributed operating system,. For example, it is hard to schedule the processors optimally if the operating system is not sure how many of them are up at the moment.

Therefore a distributed operating system must be designed to provide all the advantages of a distributed system to its users. That is, the users should be able to view a distributed system as a virtual centralized system that is flexible, efficient, reliable, secure, and easy to use. To meet this challenge, designers of a distributed operating system must deal with several design issues. Some of the key design issues are:

5.1) Transparency

The main goal of a distributed operating system is to make the existence of multiple computers invisible (transparent) and that is to provide each user the feeling that he is the only user working on the system. That is, distributed operating system must be designed in such a way that a collection of distinct machines connected by a communication subsystem appears to its users as a virtual unprocessed.

Accesses Transparency:

Access transparency typically refers to the situation where users should not need or be able to recognize whether a resource (hardware or software) is remote or local. This implies that the distributed operating system should allow users to access remote resource in the same ways as local resources. That is, the user should not be able to distinguish between local and remote resources, and it should be the responsibility of the distributed operating system to locate the resources and to arrange for servicing user requests in a user-transparent manner.

Location Transparency:

Location Transparency is achieved if the name of a resource is kept hidden and user mobility is there, that is:

Name transparency:

This refers to the fact that the name of a resource (hardware or software) should not reveal any hint as to the physical location of the resource. Furthermore, such resources, which are capable of being moved from one node to another in a distributed system (such as a file), must be allowed to move without having their names changed. Therefore, resource names must be unique system wide.

User Mobility: this refers to the fact that no matter which machine a user is logged onto, he should be able to access a resource with the same name he should not require two different names to access the same resource from two different nodes of the system. In a distributed system that supports user mobility, users can freely log on to any machine in the system and access any resource without making any extra effort.

Replication Transparency

Replicas or copies of files and other resources are created by the system for the better performance and reliability of the data in case of any loss. These replicas are placed on the different nodes of the distributed System. Both, the existence of multiple copies of a replicated resource and the replication activity should be transparent to the users. Two important issues related to replication transparency are naming of replicas and replication control. It is the responsibility of the system to name the various copies of a resource and to map a user-supplied name of the resource to an appropriate replica of the resource. Furthermore, replication control decisions such as how many copies of resource should be created, where should each copy be placed, and when should a copy be created/deleted should be made entirely automatically by the system in a user -transparent manner.

Failure Transparency

Failure transparency deals with masking from the users partial failures in the system, such as a communication link failure, a machine failure, or a storage device crash. A distributed operating system having failure transparency property will continue to function, perhaps in a degraded form, in the face of partial failures. For example suppose the file service of a distributed operating system is to be made failure transparent. This can be done by implementing it as a group of file servers that closely cooperate with each other to manage the files of the system and that function in such a manner that the users can utilize the file service even if only one of the file servers is up and working. In this case, the users cannot notice the failure of one or more file servers, except for slower performance of file access operations. Be implemented in this way for failure transparency. An attempt to design a completely failure-transparent distributed system will result in a very slow and highly expensive system due to the large amount of redundancy required for tolerating all types of failures.

Migration Transparency

An object is migrated from one node to another for a better performance, reliability and great security. The aim of migration transparency is to ensure that the movement of the object is handled automatically by the system in a user-transparent manner. Three important issues in achieving this goal are as follows:

  • Migration decisions such as which object is to be moved from where to where should be made automatically by the system.
  • Migration of an object from one node to another should not require any change in its name.
  • When the migrating object is a process, the interposes communication mechanism should ensure that a massage sent to the migrating process reaches it without the need for the sender process to resend it if the receiver process moves to another node before the massage is received.

Concurrency Transparency

In a distributed system multiple users uses the system concurrently. In such a situation, it is economical to share the system resource (hardware or software) among the concurrently executing user processes. However since the number of available resources in a computing system is restricted one user processes, must necessarily influence the action of other concurrently executing processes. For example, concurrent update to the file by two different processes should be prevented. Concurrency transparency means that each user has a feeling that he is the sole user of the system and other users do not exist in the system. For providing concurrency transparency, the recourse sharing mechanisms of the distributed operating system must have the following properties:

An event-ordering property ensures that all access requests to various system resources are properly ordered to provide a consistent view to all users of the system.

A mutual-exclusion property ensures that at any time at most one process accesses a shared resource, which must not be used simultaneously by multiple processes if program operation is to be correct.

A no-starvation property ensures that if every process that is granted a resources which must not be used simultaneously by multiple processes, eventually releases it, every request for that restore is eventually granted.

A no-deadlock property ensures that a situation will never occur in which competing process prevent their mutual progress ever though no single one requests more resources than available in the system.

Performance Transparency

The aim of performance transparency is never get into a situation in which one processors of the system is overloaded with jobs while another processor is idle. Performance Transparency allows the system to be atomically reconfirmed to improve performance, as loads vary dynamically in the system. That is, the processing capability of the system should be uniformly distributed among the currently available jobs in the system. This requirement calls for the support of intelligent resource allocation and process migration facilities in distributed operating systems.

Scaling Transparency:

The aim of scaling transparency is to allow the system to expand in scale without disrupting the activities of the users.

5.2) Reliability

Distributed systems are expected to be more reliable than centralized systems die to the existence of multiple instances of resources. However the existence of multiple instances of the resource alone cannot increase the system’s reliability. Rather, the distributed operating system, which manages these resources, must be designed properly to increase the system's reliability by taking full advantage of this characteristic feature of a distributed system.

A fault is a mechanical or algorithmic defect that may generate an error. A fault in a system causes system failure. Depending on the manner in which a failed system behaves, system failures are of two types – fail-stop and Byzantine. In the case of fail-stop failure, the system stops functioning after the fault is being detected. On the other hand, in the case of Byzantine failure, the system continues to function but produces wrong results. Undetected software bugs often cause Byzantine failure of a system. Byzantine failures are much more difficult to deal with than fail-stop failure.

For higher reliability, the fault-handling mechanisms of a distributed operating system must be designed properly to avoid faults, to tolerate faults, and to detected and recover from faults.

Fault Avoidance

Fault avoidance deals with designing the components of the system in such a way that the occurrence of faults is minimized. By using high reliable components we can increase the reliability.

Fault Tolerance

The first approach we would like to follow is to avoid the errors or faults in the system. If not avoided we would like to tolerate that to the maximum extent.

Fault tolerance is the ability of a system to continue functioning in the event of partial system failure. The performance of the system might be degraded due to partial failure. But otherwise the system functions properly. Some of the important concepts that may be used to improve the fault tolerance ability of a distributed operating system are as follows:

Redundancy techniques: The basic idea behind redundancy techniques is to avoid single points of failure by replicating critical hardware and software components, so that if one of them fails, the other can be used to continue which is done by keeping replicas or copies of the resources. For example, a critical process can be simultaneously executed on two nodes so that if one of the two nodes fails, the execution of the process can be completed at the other node. Similarly, a critical file may be replicated on two or more storage devices for better reliability. Larger is the number of copies kept, the better is the reliability but the larger is the system overhead involved. Copies kept, the better is the reliability but he larger is the system overhead involved. Therefore, a distributed operating system must be designed to maintaining a proper balance between the degree of reliability and the incurred overhead.

Now to decide that how many copies are required we take a system is said to be k-fault tolerant if it can continue to function ever in the event of the failure of k components. Therefore, if the system is to be designed to tolerate k-fail-stop failures, k+1 replicas are needed. If k replicas are lost due to failures, the remaining one replica can be used for continued functioning of the system. On the other hand, if the system is to be designed to tolerate k Byzantine failures, a minimum of 2k+1 replicas are needed. This is because a voting mechanism can be used to believe the majority k+1 of the replicas when k replicas behave abnormally

Fault Detection and Recovery

If the fault was neither avoided nor tolerated then we need to detect it and recover it.

Some of the commonly used techniques for implementing this method in a distributed operating system are as follows:

Atomic transactions:

An atomic transaction is a computation in which either all of the operations are performed successfully or none of their effects prevails, and other processes executing concurrently cannot modify or observe intermediate states of the computations. They make cash recovery which is much easier and faster, because a transaction can only end in two states: Either all the operations of the transaction are performed or none of the operations of the transaction is performed.

In a system with transaction facility, if a process halts unexpectedly due to a hardware fault or a software error before a transaction is completed, the system subsequently restores any data objects that were undergoing modification to their original states.

Stateless servers:

We can have two types of models working– state full or stateless. The two are distinguished by one aspect of the client-server relationship, whether or not the history of the service requests between a client and a server affect the execution of the next service request. The stateful approach does depend on the history of the serviced requests, but the stateless approach does not depend on it. The stateless service makes cash recovery very easy because no client state information is maintained by the server. On the other hand, the stateful service requires complex crash recovery procedures. Both the client and server need to reliably detect cashes. The server needs to detect client cashes so that it can discard any stage it is holding for the client, and the client must detect server crashes so that it can perform necessary error-handling activities. Although stateful service becomes necessary in some cases, to simplify the failure detection and recovery actions, the stateless service paradigm must be used wherever possible.

Acknowledgments and timeout-based retransmissions of messages:

In a distributed system, actions such as a node crash or a communication link failure may interrupt a communication that was in progress between two processes, resulting in the loss of a message. Therefore, a reliable inter process communication mechanism must have ways to detect lost messages so that they can be retransmitted. Handling of loss messages usually involves return of acknowledgement messages and retransmissions on the basis of involves return of acknowledgement messages and retransmissions on the basis of timeouts. That is, the receiver must return acknowledgment messages for every message receive, and if the sender does not receive any acknowledge for a message within a problem associated with this approach is that of duplicate messages. Duplicate messages may be sent in the event of failures or because of timeouts. Therefore, a reliable inter process communication mechanism should also be capable of detecting and handling duplicate messages. Handling of duplicate messages usually involves a mechanism for automatically generating and assigning appropriate sequence number to messages.

5.3) Flexibility

Another important issue in the design of distributed operating systems is flexibility. Flexibility is the most important feature for open distributed systems. The design of a distributed operating system should be flexible due to the following reasons:

Ease of modification:

From the experience of system designers, it has been found that some parts of the design often need to be replaced/modified either because some bug is detected in the design or because the design is no longer suitable for the changed system environment or new-user requirements. Therefore it should be easy to incorporate changes in the system in a user-transparent manner or with minimum interruption caused to the users.

Ease of enhancement:

It should be easy to add new services to the system. In every system, new functionalities have to be added for in time to make it more powerful and easy to use., if a group of users do not like the style in which a particular service is provided by the operating system, they should have the flexibility to add and use their own service that works in the style with which the users of that group are more familiar and feel more comfortable.

The most important design factor that influences the flexibility of a distributed operating system is the model used for designing its kernel. The kernel of an operating system is its central controlling part that provides basic system facilities. It operates in a separate address space that is inaccessible to user processes. It is the only part of an operating system that a user cannot replace or modify.

The two commonly used models for kernel design in distributed operating systems are the monolithic kernel and the micro kernel .In the monolithic kernel model must operating system service such as process management, memory management, device management, file management, name management, and inter process communication are provided by the kernel. As a result the kernel has a large, monolithic structure.

In the microkernel model, the main goal is to keep the kernel as small as possible. Therefore, in this model, the kernel is a very small nucleus of software that provides only the minimal facilities necessary for implementing additional operating system services. The only services provided by the kernel in this model are inter-process communication, low-level device management, a limited amount of low-level Process’s management, and some memory management. All other operating system services, such as file management, name management, additional process, and memory management activities, and much system call handling are implemented as user-level server processes. Each server process has its own address space and can be programmed separately.

As compared to the monolithic kernel model, the microkernel model has several advantages. In the monolithic kernel model, the large size of the kernel reduces the overall flexibility and configurability of the resulting operating system. On the other hand, the resulting operating system of the microkernel model is highly modular in nature. Due to this characteristic feature, the operating system of the microkernel model is easy to design, implement, and install. Moreover, since most of the services are implemented as user-level server processes, it is also easy to modify the design or add new services. Furthermore, for adding or changing a service, there is no need to stop the system and boot a new kernel, as in the case of a monolithic kernel. Therefore, changes in the system can be incorporated without interrupting the users.

In the microkernel model each server is an independent process having its own address space. Therefore, the server has to use some form of message-based inter-process communication mechanic to communicate with each other while performing some job. Furthermore, message passing between server processes and the microkernel requires context switches, resulting in additional performance overhead. In the monolithic kernel model, however, since services are provided by the kernel, the same address space is shared by all of the servers. Therefore, no message passing and no context switching are required while the kernel is performing the job. Hence a request may be serviced faster in the monolithic kernel than in the microkernel model.

In spite of its potential performance cost microkernel model is being preferred for the design of modern distributed operating systems. The two main reasons for this are as follows:

  1. The advantages of the microkernel model more than compensate for the performance cost. In spite of the better performance of programs written in assembly languages, most programs are written in high-level languages due to the advantages of ease of design, maintenance, and portability. Similarly, the flexibility advantages of the microkernel model previously described more than outweigh its small performance penalty
  2. Some experimental results have shown that although in theory the microkernel model seems to have poorer performance than the monolithic kernel model, this is not true in practice. It is because other factors tend to dominate, and the small overhead involved in exchanging messages is usually negligible.

5.4) Performance

If a distributed system is to be used, its performance must be at least as good as a centralized system. That is, when a particular application is run on a distributed system, its overall performance should be better than or at least equal to that of running the same application on a single-processor system. However, to achieve this goal, it is important that the various components of the operating system of a distributed system be designed properly; otherwise, the overall performance of the distributed system may turn out to be worse than a centralized system. Some design principles considered useful for better performance are as follows:

Batch if possible:

Batching often helps in improving performance greatly. For example: transfer of data across the network in large chunks rather than as individual pages is much more efficient. Similarly, piggybacking of acknowledgment of previous messages with the next message during a series of messages exchanged between two communicating entities also improves performance.

Cache whenever possible:

Caching of data at clients' sites frequently improves overall system performance because it makes data available wherever it is being currently used, thus saving a large amount of computing time and network bandwidth. In addition, caching reduces contention on centralized resources.

Minimize copying of data:

Data copying overhead (e.g., moving data in and out of buffers) involves a substantial CPU cost of many operations.

6) MapReduce

MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to produce a set of intermediate key/value pairs, and a reduce function that combines all intermediate values related with the same intermediate key.

6.1) HowMap Reducecame?

Back in 2000 Google had a problem that they were adjusting the internet database after every couple. They needed a process to build the index over the entire database, and they had no way for doing this with using any commercial tool. So they designed a structure called Map reduce which had a very basic idea that we buy a bunch of command services. In layman language, we buy many services, with each service has got a local disk attached to it and every one has a CPU attached to it. And we spread the data among all these and we have even replicas stored on them, to have a backup in case of any server stops working. So now we have stored data pretty cheaply and reliably. And the best thing is that we have got attached CPU to all the disks, so if we want to perform some internal function, instead of using Servers we can use local CPU’s with the local copy of data and this gives us massive data parallelism, we don’t have to read all the data and perform the action. This process kills the property of Centralization. But this is far better than the centralization process in the way, that today we have got large images, videos, and large chunks of data. It is difficult to maintain this type of complex and large data by the process of centralization as it makes the system slower, and to avoid this we need large amount of data store that I centralized that is not feasible.

6.2) How is it better than Relational Database like MySQL?

RDBMS is really good database for predictable queries to run over pr it is good for structural or tabular data. But what is happening now is the data what people want is getting more complex and complicated like twitter feed, free text constructural data, complex data and this kind of data doesn’t fit nicely into the tables, even if they do then of we want to fire a query like-Give me a analysis of natural languages captured in all the tweets,SQL doesn’t have words for that, so we need a different kind of database platform, which can look for a very large database platform, which can look for a very large mass of information and to do new kind of processing. We need to actually throw data away, if we have limited amount of data storage capacity, so we have the main problem of complex data with large volume, which is handled by map reduce.

Map reduce is doing large scale processing not the transactional processing but data processing on a scalable platform.

6.3) Map/Reduce functions:

It is organized as a “map” function which transforms a piece of data into some number of key/value pairs. Each of these elements will then be sorted by their key and reach to the same node, where a “reduce” function is use to merge the values (of the same key) into a single result. Here key is the line number and the value is the content.

map(input_record) {

emit(k1,v1)

emit(k2, v2)

}

reduce(key,values){

aggregate=initialize()

while(values.has_next){

aggregate=merge(values.next)

}

collect(key,aggregate)

}

A parallel algorithm is usually structure as multiple rounds of Map/Reduce:

6.4) Distributed File System

The distributed file system is used to handle large files with sequential read/write operation. Each file is broken into chunks, and stored across multiple data nodes as local OS files.

There is a master called NameNode which is used to keep track of overall file directory structure and decides the placement of chunks. This NameNode is the central control point and may re-distributed replicas as needed.

To read a file, the client API will calculate the chunk index and make a request to the NameNode. The NameNode will reply which DataNodes has a copy of that chunk. From these points, the client contacts the Data Node directly without going through the NameNode.

To write a file, client API will first contact the NameNode. The response of the NameNode contains who is the primary and who are the secondary replicas. Then the client push its changes to all DataNodes in any order, but this change is stored in a buffer of each DataNodes. After changes are buffered at all DataNodes, the client send a commit request to the primary replica, which determines an order to update and then push this order to all other secondary replicas. After all secondary replica complete the commit, the primary replica will response to the client about the success.

All changes of chunk distribution and metadata changes will be written to an operation log file at the NameNode. This log file maintains an order list of operation which is important for the NameNode to recover its view after a crash. The NameNode also maintain its persistent state by regularly check-pointing to a file.

In case of the Name Node crash, all leases granting operation will fail and so any write operation is effectively fail also. Read operation should continuously to work as long as the client program has a handle to the Data Node. To recover from NameNode crash, a new NameNode can take over after restoring the state from the last checkpoint file and replay the operation log i.e. If this Name node crashes it stores the current status in File system and we need to start the whole process of map reduce from the point where the failure has occurred.

When a Data Node crashes, it will be detected by the NameNode. The NameNode removes the crashed Data Node from the cluster and spread its chunks to other surviving DataNodes. This way, the replication factor of each chunk will be maintained across the cluster.

Later when the Data Nodes recover and rejoin the cluster, it reports all its chunks to the NameNode at boot time. Each chunk has a version number which will advance at each update. Therefore, the NameNode can easily figure out if any of the chunks of a Data Node becomes stale. Those stale chunks will be garbage collected at a later time.

How the job gets executed?

Map Reduce is based on a “poll” model where multiple “Task Tracker s” poll the “Job Tracker” for tasks (either map task or reduce task).

The job execution starts when the client program uploads three files: “job.xml” (the job configuration including map, combine, reduce function and input/output data path, etc.), “job. Split” (specifies how many splits and range based on dividing files into ~16 – 64 MB size); “job.jar” (the actual Mapper and Reducer implementation classes) to the HDFS location .Then the client program notifies the Job Tracker about the Job submission. The Job Tracker returns a Job id to the client program and starts allocating map tasks to the idle Task Tracker s when they poll for tasks.

Each Task Tracker has a defined number of "task slots" based on the capacity of the machine. There are heartbeat protocol which allows the Job Tracker to know how many free slots from each Task Tracker . The Job Tracker will determine appropriate jobs for the Task Tracker s based on how busy they are; their network proximity to the data sources .The assigned Task Tracker s will fork a Map Task (separate JVM process) to execute the map phase processing. The Map Task extracts the input data from the splits by using the “Record Reader” and “Input Format” and it invokes the user provided “map” function which emits a number of key/value pair in the memory buffer.

When the buffer is full, the output collector will spill the memory buffer into disk. For optimizing the network bandwidth, an optional “combine” function can be invoked to partially reduce values of each key. Afterwards, the “partition” function is invoked on each key to calculate its reducer node index. The memory buffer is eventually flushed into 2 files; the first index file contains an offset pointer of each partition. The second data file contains all records sorted by partition and then by key.

When the map task has finished executing all input records, it start the commit process, it first flush the in-memory buffer (even it is not full) to the index + data file pair. Then a merge sort for all index + data file pairs will be performed to create a single index + data file pair.

The index + data file pair will then be splitted into are R local directories, one for each partition. After all the Map Task completes (all splits are done), the Task Tracker will notify the Job Tracker which keeps track of the overall progress of job. Job Tracker also provides a web interface for viewing the job status.

When the Job Tracker notices that some map tasks are completed, it will start allocating reduce tasks to subsequent polling Task Tracker s (there are R Task Tracker s will be allocated for reduce task). These allocated Task Tracker s remotely download the region files (according to the assigned reducer index) from the completed map phase nodes and concatenate (merge sort) them into a single file. Whenever more map tasks are completed afterwards, Job Tracker will notify these allocated Task Tracker s to download more region files (merge with previous file). In this manner, downloading region files are interleaved with the map task progress. The reduce phase is not started at this moment yet.

Eventually all the map tasks are completed. The Job Tracker then notifies all the allocated Task Tracker s to proceed to the reduce phase. Each allocated Task Tracker will fork a Reduce Task (separate JVM) to read the downloaded file (which is already sorted by key) and invoke the “reduce” function, which collects the key/aggregated Value into the final output file (one per reducer node). Note that each reduce task (and map task as well) is single-threaded. And this thread will invoke reduce (key, values) function in ascending (or descending) order of the keys assigned to this reduce task. This provides an interesting property that allentries written by reduce () function is sorted in increasing order. The output of each reducer is written to a temp output file in HDFS. When the reducer finishes processing all keys, the temp output file will be renamed atomically to its final output filename.

The Map/Reduce framework is resilient to crashes of any components. Task Tracker nodes periodically report their status to the Job Tracker which keeps track of the overall job progress. If the Job Tracker hasn’t heard from any Task Tracker nodes for a long time, it assumes the Task Tracker node has been crashed and will reassign its tasks appropriately to other Task Tracker nodes. Since the map phase result is stored in the local disk, which will not be available when the Task Tracker node crashes. In case a map-phase Task Tracker node crashes, the crashed Map Tasks (regardless of whether it is complete or not) will be reallocated to a different Task Tracker node, which will rerun all the assigned splits. However, the reduce phase result is stored in HDFS, which is available even the Task Tracker node crashes. Therefore, in case a reduce-phase Task Tracker node crashes, only the incomplete Reduce Tasks need to be reassigned to a different Task Tracker node, where the uncompleted reduce tasks will be re-run.

The job submission process is asynchronous. Client program can poll for the job status at any stage by providing the job id.

6.5) A basic Example:

To count the appearances of each different word in a set of documents:

void map(String name, String document):

// name: document name

// document: document contents

for each word w in document:

EmitIntermediate(w, "1");

void reduce(String word, Iterator partialCounts):

// word: a word

// partialCounts: a list of aggregated partial counts

int result = 0;

for each pc in partialCounts:

result += ParseInt(pc);

Emit(AsString(result));

Here, every document is fragmented into words, and each word is count up initially with a "1" value by the Mapfunction, using the word as the result key. The framework places together all the pairs with the same key and feeds them to the same call to reduce, thus this function just requests to sum all of its input values to find the over-all appearances of that word.

Example 2:

Page A A map output Reduced Output

This-A This -A

Page –A

Contains-A Page –A,B

So –A Contains-A,B

Much-A so-A

Text-A Much-A

Text-A,B

My-B

Page B B map output Too-B

My-B

Page-B

Contains-B

Text -B

Too-B

So we can say that

Map:-Goes over data, filters the data and collect the useful data.

Reducer:-have got their own data and it does the aggregation or summary of the data.

Map Reducer Library gathers together all the pairs with the same key(Shuffle/sort)

Sort is the heart of map.

Reduce function combines the value for a key

Map Reduce is easy to understand and easy to optimize.

Map Reduce is used as the backend of Google.

6.6) Disadvantage of Map Reduce is:

It does not perform the join function of MySQL,it works with the single flow of data.If we want to perform the join function we need to perform it manually like If I visit all the webpages and find the most predominant cities in the world,and which page relates to city.How to perform this join between all the webpages in the world and list of cities.We join by copying the list of cities and sending it to each mapper,because we have large dataset of webpages and small listt of cities.

6.7) Scalabality:

Runs on 1000 nodes.

5TB sorts on 500 nodes takes 2.5 hours.

Distributed File System-

150TB,3M Files

7) HADOOP:

Hadoop is a Java based framework for supporting data intensive applications, and is primarily used as a text-processing engine for large scale applications.

Hadoop can be broken down in to two prime components, the file system component and the map reduce engine.

File-system:

Hadoop implements its own file-system termed as HDFS, which is a rack-aware distributed file-system, primarily intended to run map-reduce jobs. It is not necessary to use HDFS as the file-system for running map-reduce jobs, and specialized setups can have their own file-system implementations (for instance, Amazon EC2 offerings of Hadoop also supports their own file-system Amazon S3, which is another distributed file-system developed by Amazon). Still, to the date, HDFS remains the most commonly used file system for map-reduce jobs with Hadoop.

Map-reduce:

The map-reduce engine in Hadoop runs on top of a distributed file-system. In a typical setup, there is a job-tracker node, and there are multiple task-tracker nodes. The entire map-reduce jobs are submitted to the job-tracker, which in-turn pushes the jobs to the available task-trackers. The task-trackers first do the map jobs, and then the reduce jobs. All the data communication between the mappers and reducers is handled by the map-reduce engine. The initial input and the final output, both lie on the under-lying file-system.

Yahoo, Facebook, twitter run this kind of production every single minute. Hadoop Framework is written in Java. The File System that Hadoop practices is HDFS.

7.1) Architecture and Workingof HDFS:

HDFS has master slave architecture. An HDFS configuration consists of a single Name Node, a master server that brings about the file system namespace and let the clients access the files.

There are a number of Data Nodes, usually one per node, which manage storage attached to the nodes. HDFS has a file system namespace and lets user data to be kept in files. Internally, a file is divided into one or more blocks and these blocks are stored in a set of Data Nodes. The Name Node executes file system namespace processes like opening, closing, and renaming files and directories. It also regulates the mapping of blocks to Data Nodes. The Data Nodes are liable for serving read and write requirements from the file system’s clients. The Data Nodes also achieve block creation, deletion, and replication upon instruction from the Name Node. The Name Node and Data Node are fragments of software intended to run on machines. These machines normally run a Linux operating system (OS). HDFS is made using the Java language; any machine that supports Java can run the Name Node or the Data Node software that is HDFS can be installed on that machine.

The existence of a single Name Node in creates the architecture of the system simple. The NameNode is the repository for all HDFS metadata. The system is designed in such a way that user data never movements through the Name Node.

The NameNode usages a transaction log called the Edit Log to record every single change that happens to file system metadata.

For illustration, creating a new file in HDFS causes the NameNode to insert a record into the Edit Log. Similarly, changing the replication factor of a file causes a new record to be inserted into the Edit Log. The Name Node uses a file in its local host OS file system to store the Edit Log. The entire file system namespace, including the mapping of blocks to files and file system properties, is stored in a file called the Fs Image. The Fs Image is stored as a file in the Name Node’s local file system.

7.2) Real Time Example:

Requirment:

Operating System:Linux

Applications Installed:Hadoop

Problem Statement: - creating and running map/reduce jobs with any executable or script as the mapper and/or the reducer which is also called Hadoop Streaming.

Run:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar

-input myInputDirs

-output myOutputDir

-mapper myPythonScript.py

-reducer /bin/wc

Explanation:

myInputDirs is the path of the input directories.

myOuntputDir is the path for the output directories.

Both the mapper and the reducer are executables that read the input from stdin line by line and emit the output to stdout. The usefulness will create a map/reduce job, submit the job to an appropriate cluster, and monitor the development of the job until it finishes.

When an executable is specified for mappers, each mapper task will launch the executable as a separate process when the mapper is set. As the mapper task runs, it converts its inputs into lines and feed the lines to the stdin of the process. In the meantime, the mapper collects the line oriented outputs from the stdout of the process and converts each line into a key/value pair, which is collected as the output of the mapper. Theprefix of a line up to the first tab characteris thekeyand the the remaining of the line which rejects the tab character will be thevalue.

When an executable is identified for reducers, each reducer task will launch the executable as a separate process then the reducer is initialized. As the reducer task runs, it changes its input key/values pairs into lines and gives the lines to the stdin of the process. In the time being, the reducer collects the line oriented outputs from the stdout of the process, converts each line into a key/value pair, which is collected as the output of the reducer. By default, the prefix of a line up to the first tab character is the key and the remaining of the line which ignores the tab character is the value.

Example 2:

Requirmets:-

JDK tool Kit,LINUX,Hadoop

Code on java window:

1. package org.myorg;

2.

3. import java.io.Exception;

4. import java.util.*;

5.

6. import org.apache.hadoop.fs.Path;

7. import org.apache.hadoop.conf.*;

8. import org.apache.hadoop.io.*;

9. import org.apache.hadoop.mapred.*;

10. import org.apache.hadoop.util.*;

11.

12. public class WordCount {

13.

14. public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

15. private final static IntWritable one = new IntWritable(1);

16. private Text word = new Text();

17.

18. public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

19. String line = value.toString();

20. StringTokenizer tokenizer = new StringTokenizer(line);

21. while (tokenizer.hasMoreTokens()) {

22. word.set(tokenizer.nextToken());

23. output.collect(word, one);

24. }

25. }

26. }

27.

28. public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

29. public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

30. int sum = 0;

31. while (values.hasNext()) {

32. sum += values.next().get();

33. }

34. output.collect(key, new IntWritable(sum));

35. }

36. }

37.

38. public static void main(String[] args) throws Exception {

39. JobConf conf = new JobConf(WordCount.class);

40. conf.setJobName("wordcount");

41.

42. conf.setOutputKeyClass(Text.class);

43. conf.setOutputValueClass(IntWritable.class);

44.

45. conf.setMapperClass(MapClass.class);

46. conf.setCombinerClass(Reduce.class);

47. conf.setReducerClass(Reduce.class);

48.

49. conf.setInputFormat(TextInputFormat.class);

50. conf.setOutputFormat(TextOutputFormat.class);

51.

52. conf.setInputPath(new Path(args[1]));

53. conf.setOutputPath(new Path(args[2]));

54.

55. JobClient.runJob(conf);

57. }

58. }

Creating a jar assuming that HADOOP_HOMEis the root of the installation andHADOOP_VERSIONis the Hadoop version installed and compile a file named WordCount.java

$ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar WordCount.java

$ jar -cvf /usr/joe/wordcount.jar WordCount.class

where-

/usr/joe/wordcount/input- input directory in HDFS

/usr/joe/wordcount/output- output directory in HDFS

Sample text-files as input:

$ bin/hadoop dfs -ls /usr/joe/wordcount/input/

/usr/joe/wordcount/input/file01

/usr/joe/wordcount/input/file02

//where file01 is one file and file02 is another file.//

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01

abc def ghi *//Content of the File01//*

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02

def ghi jkl *//Content of the File02//*

Run the application:

$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output

This gives us an Output:

Output:

$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000

ghi 2

abc 1

def 2

jkl 1

Explanation:

We are Using interface that implements mapper and Reduce which counts the words ghi,abc,def,jkl and gives us the total word count of both the files.

TheMapperimplementation (lines 14-26), via themapmethod (lines 18-25), processes one line at a time, as provided by the specifiedTextInputFormat(line 49). It then splits the line into tokens separated by whitespaces, via theStringTokenizer, and emits a key-value pair of< <word>, 1>.

For the given sample input the first map emits:

< abc, 1>

< def, 1>

< ghi, 1>

The second map emits:

<def, 1>

< ghi, 1>

< jkl, 1>

WordCountalso specifies acombiner(line 46). Hence, the output of each map is passed through the local combiner (which is same as theReduceras per the job configuration) for local aggregation, after being sorted on thekeys.

The output of the first map:

< abc, 1>

< def, 1>

< ghi, 1>

The output of the second map:

<def, 1>

< ghi, 1>

< jkl, 1>

TheReducerimplementation (lines 28-36), via thereducemethod (lines 29-35) just sums up the values, which are the occurence counts for each word(key).

Thus the output of the job is:

<ghi 2>

<abc 1>

<def 2>

<jkl 1>

Therunmethod specifies various facets of the job, such as the input/output paths (passed via the command line), key/value types, input/output formats etc., in theJobConf. It then calls theJobClient.runJob(line 55) to submit the and monitor its progress.

This is the Output on the Console:

7.3) Hadoop with Facebook

Facebook has many Hadoop clusters deployed now - with the biggest having about 2500 cpu cores and 1 PetaByte of disk space. We are loading over 400 gigabytes of compressed data (over 2 terabytes uncompressed) into the Hadoop file system each day and have hundreds of jobs running each day against these data sets.

Implementation of Hadoop on Facebook is for couple of factors:

  • First, developers are free to write map-reduce programs in the language of their selection.
  • Second, we have embraced SQL as a familiar paradigm to address and work on large data sets.
  • Most data stored in Hadoop file system is available as Tables. Developers can search the schemas and data of these tables much like they do in other database which makes it Developer Friendly. When they want to function on these data sets, they can use a minor subset of SQL to specify the required dataset.
  • Actions on datasets can be written as map and reduce scripts or using standard query operators (like joins and group-bys) or as a mixture of the two. It also has standard data warehouse structures like partitioning, sampling and indexing to this situation. This in-house data warehousing layer over Hadoop is entitled Hive.
  • At Facebook, it is extremely significant that we use the information produced by and from our users to make decisions about developments to the product. Hadoop has permitted us to make better use of the data at our clearance.

8.) Conclusion:

So we can say that Map Reduce has proven to be a useful abstraction and greatly Simplifies Distributed Computing. Its Implementation is Hadoop which deals with the storage of large dataset. Technology like this is used by all the upcoming Companies like Yahoo, Facebook, Twitter, Amazon and is helping a lot to the developers. The property of Distributed Computing made it better than other Technologies.

But this is just the beginning. We need highly diverse, talented and motivated group of people to support and bring enhancements in this genuinely innovative platform so that the comprehension of deluge data can be made easier and can consolidate, combine and understand the data better. We need cutting edge innovations for the faster and reliable analysis of data as today’s enterprises are generating data at the light year speed and Hadoop is one such step towards it.

We can implement this distributed model any where say financial services, Life and Health services, web and digital media services, retail etc. It is a very vast area and be researched more. We can implement various parallel programming model on Hadoop say MIPS, PVM.

Did you like this example?

Cite this page

What is distributed computing system ?. (2017, Jun 26). Retrieved January 23, 2025 , from
https://studydriver.com/what-is-distributed-computing-system/

Save time with Studydriver!

Get in touch with our top writers for a non-plagiarized essays written to satisfy your needs

Get custom essay

Stuck on ideas? Struggling with a concept?

A professional writer will make a clear, mistake-free paper for you!

Get help with your assignment
Leave your email and we will send a sample to you.
Stop wasting your time searching for samples!
You can find a skilled professional who can write any paper for you.
Get unique paper

Hi!
I'm Amy :)

I can help you save hours on your homework. Let's start by finding a writer.

Find Writer